Details
diff --git a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
index c4671ea..587a72a 100644
--- a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
+++ b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
@@ -30,16 +30,16 @@ import com.ning.billing.account.api.user.DefaultAccountChangeNotification;
import com.ning.billing.account.api.user.DefaultAccountCreationEvent;
import com.ning.billing.util.customfield.CustomField;
import com.ning.billing.util.customfield.dao.FieldStoreDao;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
import com.ning.billing.util.tag.Tag;
import com.ning.billing.util.tag.dao.TagStoreDao;
public class DefaultAccountDao implements AccountDao {
private final AccountSqlDao accountDao;
- private final EventBus eventBus;
+ private final Bus eventBus;
@Inject
- public DefaultAccountDao(IDBI dbi, EventBus eventBus) {
+ public DefaultAccountDao(IDBI dbi, Bus eventBus) {
this.eventBus = eventBus;
this.accountDao = dbi.onDemand(AccountSqlDao.class);
}
diff --git a/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java b/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
index b18a41b..d205439 100644
--- a/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
+++ b/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
@@ -21,7 +21,7 @@ import com.google.inject.Injector;
import com.google.inject.Stage;
import com.ning.billing.account.glue.AccountModuleMock;
import com.ning.billing.util.eventbus.DefaultEventBusService;
-import com.ning.billing.util.eventbus.EventBusService;
+import com.ning.billing.util.eventbus.BusService;
import org.apache.commons.io.IOUtils;
import org.skife.jdbi.v2.IDBI;
import org.testng.annotations.AfterClass;
@@ -56,7 +56,7 @@ public abstract class AccountDaoTestBase {
accountDao = injector.getInstance(AccountDao.class);
accountDao.test();
- EventBusService busService = injector.getInstance(EventBusService.class);
+ BusService busService = injector.getInstance(BusService.class);
((DefaultEventBusService) busService).startBus();
}
catch (Throwable t) {
diff --git a/analytics/src/main/java/com/ning/billing/analytics/api/AnalyticsService.java b/analytics/src/main/java/com/ning/billing/analytics/api/AnalyticsService.java
index a08e3ab..e537841 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/api/AnalyticsService.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/api/AnalyticsService.java
@@ -19,7 +19,7 @@ package com.ning.billing.analytics.api;
import com.google.inject.Inject;
import com.ning.billing.analytics.AnalyticsListener;
import com.ning.billing.lifecycle.LifecycleHandlerType;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,10 +30,10 @@ public class AnalyticsService implements IAnalyticsService
private static final String ANALYTICS_SERVICE = "analytics-service";
private final AnalyticsListener listener;
- private final EventBus eventBus;
+ private final Bus eventBus;
@Inject
- public AnalyticsService(final AnalyticsListener listener, final EventBus eventBus)
+ public AnalyticsService(final AnalyticsListener listener, final Bus eventBus)
{
this.listener = listener;
this.eventBus = eventBus;
@@ -51,7 +51,7 @@ public class AnalyticsService implements IAnalyticsService
try {
eventBus.register(listener);
}
- catch (EventBus.EventBusException e) {
+ catch (Bus.EventBusException e) {
log.error("Unable to register to the EventBus!", e);
}
}
diff --git a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
index c146de7..2feffda 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
@@ -47,7 +47,7 @@ import com.ning.billing.entitlement.api.user.SubscriptionTransition;
import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
import com.ning.billing.entitlement.events.EntitlementEvent;
import com.ning.billing.entitlement.events.user.ApiEventType;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
import com.ning.billing.util.tag.DefaultTag;
import com.ning.billing.util.tag.DefaultTagDescription;
import com.ning.billing.util.tag.Tag;
@@ -90,7 +90,7 @@ public class TestAnalyticsService
private AnalyticsService service;
@Inject
- private EventBus bus;
+ private Bus bus;
@Inject
private BusinessSubscriptionTransitionDao subscriptionDao;
diff --git a/api/src/main/java/com/ning/billing/account/api/AccountChangeNotification.java b/api/src/main/java/com/ning/billing/account/api/AccountChangeNotification.java
index 9e5f254..2bc40f8 100644
--- a/api/src/main/java/com/ning/billing/account/api/AccountChangeNotification.java
+++ b/api/src/main/java/com/ning/billing/account/api/AccountChangeNotification.java
@@ -16,12 +16,12 @@
package com.ning.billing.account.api;
-import com.ning.billing.util.eventbus.EventBusNotification;
+import com.ning.billing.util.eventbus.BusEvent;
import java.util.List;
import java.util.UUID;
-public interface AccountChangeNotification extends EventBusNotification {
+public interface AccountChangeNotification extends BusEvent {
public UUID getAccountId();
public List<ChangedField> getChangedFields();
diff --git a/api/src/main/java/com/ning/billing/account/api/AccountCreationNotification.java b/api/src/main/java/com/ning/billing/account/api/AccountCreationNotification.java
index bc2c065..22a1752 100644
--- a/api/src/main/java/com/ning/billing/account/api/AccountCreationNotification.java
+++ b/api/src/main/java/com/ning/billing/account/api/AccountCreationNotification.java
@@ -16,11 +16,11 @@
package com.ning.billing.account.api;
-import com.ning.billing.util.eventbus.EventBusNotification;
+import com.ning.billing.util.eventbus.BusEvent;
import java.util.UUID;
-public interface AccountCreationNotification extends EventBusNotification {
+public interface AccountCreationNotification extends BusEvent {
public UUID getId();
public AccountData getData();
diff --git a/api/src/main/java/com/ning/billing/config/InvoiceConfig.java b/api/src/main/java/com/ning/billing/config/InvoiceConfig.java
new file mode 100644
index 0000000..a2b4270
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/config/InvoiceConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.config;
+
+import org.skife.config.Config;
+import org.skife.config.Default;
+
+public interface InvoiceConfig {
+
+ @Config("killbill.invoice.dao.claim.time")
+ @Default("60000")
+ public long getDaoClaimTimeMs();
+
+ @Config("killbill.invoice.dao.ready.max")
+ @Default("10")
+ public int getDaoMaxReadyEvents();
+
+ @Config("killbill.invoice.engine.notifications.sleep")
+ @Default("500")
+ public long getNotificationSleepTimeMs();
+
+ @Config("killbill.invoice.engine.events.off")
+ @Default("false")
+ public boolean isEventProcessingOff();
+}
diff --git a/api/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionTransition.java b/api/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionTransition.java
index 26ce81f..449c368 100644
--- a/api/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionTransition.java
+++ b/api/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionTransition.java
@@ -19,12 +19,12 @@ package com.ning.billing.entitlement.api.user;
import com.ning.billing.catalog.api.Plan;
import com.ning.billing.catalog.api.PlanPhase;
import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
-import com.ning.billing.util.eventbus.EventBusNotification;
+import com.ning.billing.util.eventbus.BusEvent;
import org.joda.time.DateTime;
import java.util.UUID;
-public interface SubscriptionTransition extends EventBusNotification {
+public interface SubscriptionTransition extends BusEvent {
public enum SubscriptionTransitionType {
MIGRATE_ENTITLEMENT,
diff --git a/api/src/main/java/com/ning/billing/invoice/api/InvoiceCreationNotification.java b/api/src/main/java/com/ning/billing/invoice/api/InvoiceCreationNotification.java
index 89c0d87..7903647 100644
--- a/api/src/main/java/com/ning/billing/invoice/api/InvoiceCreationNotification.java
+++ b/api/src/main/java/com/ning/billing/invoice/api/InvoiceCreationNotification.java
@@ -17,13 +17,13 @@
package com.ning.billing.invoice.api;
import com.ning.billing.catalog.api.Currency;
-import com.ning.billing.util.eventbus.EventBusNotification;
+import com.ning.billing.util.eventbus.BusEvent;
import org.joda.time.DateTime;
import java.math.BigDecimal;
import java.util.UUID;
-public interface InvoiceCreationNotification extends EventBusNotification {
+public interface InvoiceCreationNotification extends BusEvent {
public UUID getInvoiceId();
public UUID getAccountId();
public BigDecimal getAmountOwed();
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 7bd1124..86fd78b 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -18,7 +18,6 @@ package com.ning.billing.entitlement.engine.core;
import java.util.UUID;
-
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,8 +45,8 @@ import com.ning.billing.entitlement.exceptions.EntitlementError;
import com.ning.billing.lifecycle.LifecycleHandlerType;
import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.eventbus.EventBus;
-import com.ning.billing.util.eventbus.EventBus.EventBusException;
+import com.ning.billing.util.eventbus.Bus;
+import com.ning.billing.util.eventbus.Bus.EventBusException;
import com.ning.billing.util.notificationq.NotificationConfig;
import com.ning.billing.util.notificationq.NotificationQueue;
import com.ning.billing.util.notificationq.NotificationQueueService;
@@ -59,10 +58,6 @@ public class Engine implements EventListener, EntitlementService {
public static final String NOTIFICATION_QUEUE_NAME = "subscription-events";
public static final String ENTITLEMENT_SERVICE_NAME = "entitlement-service";
- private final long MAX_NOTIFICATION_THREAD_WAIT_MS = 10000; // 10 secs
- private final long NOTIFICATION_THREAD_WAIT_INCREMENT_MS = 1000; // 1 sec
- private final long NANO_TO_MS = (1000 * 1000);
-
private final static Logger log = LoggerFactory.getLogger(Engine.class);
private final Clock clock;
@@ -72,19 +67,17 @@ public class Engine implements EventListener, EntitlementService {
private final EntitlementBillingApi billingApi;
private final EntitlementTestApi testApi;
private final EntitlementMigrationApi migrationApi;
- private final EventBus eventBus;
+ private final Bus eventBus;
private final EntitlementConfig config;
private final NotificationQueueService notificationQueueService;
- private boolean startedNotificationThread;
- private boolean stoppedNotificationThread;
private NotificationQueue subscritionEventQueue;
@Inject
public Engine(Clock clock, EntitlementDao dao, PlanAligner planAligner,
EntitlementConfig config, DefaultEntitlementUserApi userApi,
DefaultEntitlementBillingApi billingApi, DefaultEntitlementTestApi testApi,
- DefaultEntitlementMigrationApi migrationApi, EventBus eventBus,
+ DefaultEntitlementMigrationApi migrationApi, Bus eventBus,
NotificationQueueService notificationQueueService) {
super();
this.clock = clock;
@@ -108,8 +101,6 @@ public class Engine implements EventListener, EntitlementService {
public void initialize() {
try {
- this.stoppedNotificationThread = false;
- this.startedNotificationThread = false;
subscritionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
NOTIFICATION_QUEUE_NAME,
new NotificationQueueHandler() {
@@ -122,21 +113,6 @@ public class Engine implements EventListener, EntitlementService {
processEventReady(event);
}
}
-
- @Override
- public void completedQueueStop() {
- synchronized (this) {
- stoppedNotificationThread = true;
- this.notifyAll();
- }
- }
- @Override
- public void completedQueueStart() {
- synchronized (this) {
- startedNotificationThread = true;
- this.notifyAll();
- }
- }
},
new NotificationConfig() {
@Override
@@ -164,16 +140,13 @@ public class Engine implements EventListener, EntitlementService {
@LifecycleHandlerType(LifecycleLevel.START_SERVICE)
public void start() {
subscritionEventQueue.startQueue();
- waitForNotificationStartCompletion();
}
@LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
public void stop() {
if (subscritionEventQueue != null) {
subscritionEventQueue.stopQueue();
- waitForNotificationStopCompletion();
- }
- startedNotificationThread = false;
+ }
}
@Override
@@ -218,43 +191,6 @@ public class Engine implements EventListener, EntitlementService {
}
}
- private void waitForNotificationStartCompletion() {
- waitForNotificationEventCompletion(true);
- }
-
- private void waitForNotificationStopCompletion() {
- waitForNotificationEventCompletion(false);
- }
-
- private void waitForNotificationEventCompletion(boolean startEvent) {
-
- long ini = System.nanoTime();
- synchronized(this) {
- do {
- if ((startEvent ? startedNotificationThread : stoppedNotificationThread)) {
- break;
- }
- try {
- this.wait(NOTIFICATION_THREAD_WAIT_INCREMENT_MS);
- } catch (InterruptedException e ) {
- Thread.currentThread().interrupt();
- throw new EntitlementError(e);
- }
- } while (!(startEvent ? startedNotificationThread : stoppedNotificationThread) &&
- (System.nanoTime() - ini) / NANO_TO_MS < MAX_NOTIFICATION_THREAD_WAIT_MS);
-
- if (!(startEvent ? startedNotificationThread : stoppedNotificationThread)) {
- log.error("Could not {} notification thread in {} msec !!!",
- (startEvent ? "start" : "stop"),
- MAX_NOTIFICATION_THREAD_WAIT_MS);
- throw new EntitlementError("Failed to start service!!");
- }
- log.info("Notification thread has been {} in {} ms",
- (startEvent ? "started" : "stopped"),
- (System.nanoTime() - ini) / NANO_TO_MS);
- }
- }
-
private void insertNextPhaseEvent(SubscriptionData subscription) {
try {
DateTime now = clock.getUTCNow();
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/ApiTestListener.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/ApiTestListener.java
index 6bee471..7241611 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/ApiTestListener.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/ApiTestListener.java
@@ -19,7 +19,7 @@ package com.ning.billing.entitlement.api;
import com.google.common.base.Joiner;
import com.google.common.eventbus.Subscribe;
import com.ning.billing.entitlement.api.user.SubscriptionTransition;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +45,7 @@ public class ApiTestListener {
PHASE
}
- public ApiTestListener(EventBus eventBus) {
+ public ApiTestListener(Bus eventBus) {
this.nextExpectedEvent = new Stack<NextEvent>();
this.completed = false;
}
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
index 9a9d80b..adcf53f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
@@ -62,7 +62,7 @@ import com.ning.billing.lifecycle.KillbillService.ServiceException;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
import com.ning.billing.util.eventbus.DefaultEventBusService;
-import com.ning.billing.util.eventbus.EventBusService;
+import com.ning.billing.util.eventbus.BusService;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -86,7 +86,7 @@ public abstract class TestApiBase {
protected EntitlementConfig config;
protected EntitlementDao dao;
protected ClockMock clock;
- protected EventBusService busService;
+ protected BusService busService;
protected AccountData accountData;
protected Catalog catalog;
@@ -108,7 +108,7 @@ public abstract class TestApiBase {
@AfterClass(groups={"setup"})
public void tearDown() {
try {
- busService.getEventBus().register(testListener);
+ busService.getBus().register(testListener);
((DefaultEventBusService) busService).stopBus();
} catch (Exception e) {
log.warn("Failed to tearDown test properly ", e);
@@ -124,7 +124,7 @@ public abstract class TestApiBase {
entitlementService = g.getInstance(EntitlementService.class);
catalogService = g.getInstance(CatalogService.class);
- busService = g.getInstance(EventBusService.class);
+ busService = g.getInstance(BusService.class);
config = g.getInstance(EntitlementConfig.class);
dao = g.getInstance(EntitlementDao.class);
clock = (ClockMock) g.getInstance(Clock.class);
@@ -151,7 +151,7 @@ public abstract class TestApiBase {
assertNotNull(catalog);
- testListener = new ApiTestListener(busService.getEventBus());
+ testListener = new ApiTestListener(busService.getBus());
entitlementApi = entitlementService.getUserApi();
billingApi = entitlementService.getBillingApi();
migrationApi = entitlementService.getMigrationApi();
@@ -169,7 +169,7 @@ public abstract class TestApiBase {
clock.resetDeltaFromReality();
((MockEntitlementDao) dao).reset();
try {
- busService.getEventBus().register(testListener);
+ busService.getBus().register(testListener);
UUID accountId = UUID.randomUUID();
bundle = entitlementApi.createBundleForAccount(accountId, "myDefaultBundle");
} catch (Exception e) {
diff --git a/entitlement/src/test/resources/entitlement.properties b/entitlement/src/test/resources/entitlement.properties
index d149d78..227aa7e 100644
--- a/entitlement/src/test/resources/entitlement.properties
+++ b/entitlement/src/test/resources/entitlement.properties
@@ -1,4 +1,4 @@
-killbill.catalog.uri=file:src/test/resources/testInput.xml
+killbill.catalog.uri=file:src/test/resources/versionedCatalog
killbill.entitlement.dao.claim.time=60000
killbill.entitlement.dao.ready.max=1
killbill.entitlement.engine.notifications.sleep=500
diff --git a/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-1.xml b/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-1.xml
new file mode 100644
index 0000000..c21aac1
--- /dev/null
+++ b/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-1.xml
@@ -0,0 +1,159 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+ ~ Copyright 2010-2011 Ning, Inc.
+ ~
+ ~ Ning licenses this file to you under the Apache License, version 2.0
+ ~ (the "License"); you may not use this file except in compliance with the
+ ~ License. You may obtain a copy of the License at:
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ ~ License for the specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<catalog xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:noNamespaceSchemaLocation="../CatalogSchema.xsd ">
+
+ <effectiveDate>2011-01-01T00:00:00+00:00</effectiveDate>
+ <catalogName>WeaponsHireSmall</catalogName>
+
+ <currencies>
+ <currency>USD</currency>
+ <currency>EUR</currency>
+ <currency>GBP</currency>
+ </currencies>
+
+ <products>
+ <product name="Pistol">
+ <category>BASE</category>
+ </product>
+ <product name="Shotgun">
+ <category>BASE</category>
+ </product>
+ <product name="Laser-Scope">
+ <category>ADD_ON</category>
+ </product>
+ </products>
+
+ <rules>
+ <changePolicy>
+ <changePolicyCase>
+ <fromBillingPeriod>MONTHLY</fromBillingPeriod>
+ <toProduct>Shotgun</toProduct>
+ <toBillingPeriod>MONTHLY</toBillingPeriod>
+ <policy>END_OF_TERM</policy>
+ </changePolicyCase>
+ <changePolicyCase>
+ <phaseType>TRIAL</phaseType>
+ <policy>IMMEDIATE</policy>
+ </changePolicyCase>
+ </changePolicy>
+ <changeAlignment>
+ <changeAlignmentCase>
+ <alignment>START_OF_SUBSCRIPTION</alignment>
+ </changeAlignmentCase>
+ </changeAlignment>
+ <createAlignment>
+ <createAlignmentCase>
+ <product>Laser-Scope</product>
+ <alignment>START_OF_SUBSCRIPTION</alignment>
+ </createAlignmentCase>
+ <createAlignmentCase>
+ <alignment>START_OF_BUNDLE</alignment>
+ </createAlignmentCase>
+ </createAlignment>
+ </rules>
+
+
+ <plans>
+ <plan name="pistol-monthly">
+ <product>Pistol</product>
+ <initialPhases>
+ <phase type="TRIAL">
+ <duration>
+ <unit>DAYS</unit>
+ <number>30</number>
+ </duration>
+ <billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+ <fixedPrice> <!-- empty price implies $0 -->
+ </fixedPrice>
+ </phase>
+ </initialPhases>
+ <finalPhase type="EVERGREEN">
+ <duration>
+ <unit>UNLIMITED</unit>
+ </duration>
+ <billingPeriod>MONTHLY</billingPeriod>
+ <recurringPrice>
+ <price><currency>GBP</currency><value>1.0</value></price>
+ <price><currency>EUR</currency><value>1.0</value></price>
+ <price><currency>USD</currency><value>1.0</value></price>
+ </recurringPrice>
+ </finalPhase>
+ </plan>
+ <plan name="shotgun-monthly">
+ <product>Shotgun</product>
+ <initialPhases>
+ <phase type="TRIAL">
+ <duration>
+ <unit>DAYS</unit>
+ <number>30</number>
+ </duration>
+ <billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+ <fixedPrice></fixedPrice>
+ <!-- no price implies $0 -->
+ </phase>
+ </initialPhases>
+ <finalPhase type="EVERGREEN">
+ <duration>
+ <unit>UNLIMITED</unit>
+ <number>-1</number>
+ </duration>
+ <billingPeriod>MONTHLY</billingPeriod>
+ <recurringPrice>
+ <price><currency>USD</currency><value>249.95</value></price>
+ <price><currency>EUR</currency><value>149.95</value></price>
+ <price><currency>GBP</currency><value>169.95</value></price>
+ </recurringPrice>
+ </finalPhase>
+ </plan>
+ <plan name="shotgun-annual">
+ <product>Shotgun</product>
+ <initialPhases>
+ <phase type="TRIAL">
+ <duration>
+ <unit>DAYS</unit>
+ <number>30</number>
+ </duration>
+ <billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+ <fixedPrice>
+ </fixedPrice>
+ </phase>
+ </initialPhases>
+ <finalPhase type="EVERGREEN">
+ <duration>
+ <unit>UNLIMITED</unit>
+ </duration>
+ <billingPeriod>ANNUAL</billingPeriod>
+ <recurringPrice>
+ <price><currency>USD</currency><value>2399.95</value></price>
+ <price><currency>EUR</currency><value>1499.95</value></price>
+ <price><currency>GBP</currency><value>1699.95</value></price>
+ </recurringPrice>
+ </finalPhase>
+ </plan>
+ </plans>
+ <priceLists>
+ <defaultPriceList name="DEFAULT">
+ <plans>
+ <plan>pistol-monthly</plan>
+ <plan>shotgun-monthly</plan>
+ <plan>shotgun-annual</plan>
+ </plans>
+ </defaultPriceList>
+ </priceLists>
+</catalog>
diff --git a/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-2.xml b/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-2.xml
new file mode 100644
index 0000000..0e650f1
--- /dev/null
+++ b/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-2.xml
@@ -0,0 +1,159 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+ ~ Copyright 2010-2011 Ning, Inc.
+ ~
+ ~ Ning licenses this file to you under the Apache License, version 2.0
+ ~ (the "License"); you may not use this file except in compliance with the
+ ~ License. You may obtain a copy of the License at:
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ ~ License for the specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<catalog xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:noNamespaceSchemaLocation="../CatalogSchema.xsd ">
+
+ <effectiveDate>2011-02-02T00:00:00+00:00</effectiveDate>
+ <catalogName>WeaponsHireSmall</catalogName>
+
+ <currencies>
+ <currency>USD</currency>
+ <currency>EUR</currency>
+ <currency>GBP</currency>
+ </currencies>
+
+ <products>
+ <product name="Pistol">
+ <category>BASE</category>
+ </product>
+ <product name="Shotgun">
+ <category>BASE</category>
+ </product>
+ <product name="Laser-Scope">
+ <category>ADD_ON</category>
+ </product>
+ </products>
+
+ <rules>
+ <changePolicy>
+ <changePolicyCase>
+ <fromBillingPeriod>MONTHLY</fromBillingPeriod>
+ <toProduct>Shotgun</toProduct>
+ <toBillingPeriod>MONTHLY</toBillingPeriod>
+ <policy>END_OF_TERM</policy>
+ </changePolicyCase>
+ <changePolicyCase>
+ <phaseType>TRIAL</phaseType>
+ <policy>IMMEDIATE</policy>
+ </changePolicyCase>
+ </changePolicy>
+ <changeAlignment>
+ <changeAlignmentCase>
+ <alignment>START_OF_SUBSCRIPTION</alignment>
+ </changeAlignmentCase>
+ </changeAlignment>
+ <createAlignment>
+ <createAlignmentCase>
+ <product>Laser-Scope</product>
+ <alignment>START_OF_SUBSCRIPTION</alignment>
+ </createAlignmentCase>
+ <createAlignmentCase>
+ <alignment>START_OF_BUNDLE</alignment>
+ </createAlignmentCase>
+ </createAlignment>
+ </rules>
+
+ <plans>
+ <plan name="pistol-monthly">
+ <effectiveDateForExistingSubscriptons>2011-02-14T00:00:00+00:00</effectiveDateForExistingSubscriptons>
+ <product>Pistol</product>
+ <initialPhases>
+ <phase type="TRIAL">
+ <duration>
+ <unit>DAYS</unit>
+ <number>30</number>
+ </duration>
+ <billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+ <fixedPrice> <!-- empty price implies $0 -->
+ </fixedPrice>
+ </phase>
+ </initialPhases>
+ <finalPhase type="EVERGREEN">
+ <duration>
+ <unit>UNLIMITED</unit>
+ </duration>
+ <billingPeriod>MONTHLY</billingPeriod>
+ <recurringPrice>
+ <price><currency>GBP</currency><value>2.0</value></price>
+ <price><currency>EUR</currency><value>2.0</value></price>
+ <price><currency>USD</currency><value>2.0</value></price>
+ </recurringPrice>
+ </finalPhase>
+ </plan>
+ <plan name="shotgun-monthly">
+ <product>Shotgun</product>
+ <initialPhases>
+ <phase type="TRIAL">
+ <duration>
+ <unit>DAYS</unit>
+ <number>30</number>
+ </duration>
+ <billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+ <fixedPrice></fixedPrice>
+ <!-- no price implies $0 -->
+ </phase>
+ </initialPhases>
+ <finalPhase type="EVERGREEN">
+ <duration>
+ <unit>UNLIMITED</unit>
+ <number>-1</number>
+ </duration>
+ <billingPeriod>MONTHLY</billingPeriod>
+ <recurringPrice>
+ <price><currency>USD</currency><value>249.95</value></price>
+ <price><currency>EUR</currency><value>149.95</value></price>
+ <price><currency>GBP</currency><value>169.95</value></price>
+ </recurringPrice>
+ </finalPhase>
+ </plan>
+ <plan name="shotgun-annual">
+ <product>Shotgun</product>
+ <initialPhases>
+ <phase type="TRIAL">
+ <duration>
+ <unit>DAYS</unit>
+ <number>30</number>
+ </duration>
+ <billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+ <fixedPrice>
+ </fixedPrice>
+ </phase>
+ </initialPhases>
+ <finalPhase type="EVERGREEN">
+ <duration>
+ <unit>UNLIMITED</unit>
+ </duration>
+ <billingPeriod>ANNUAL</billingPeriod>
+ <recurringPrice>
+ <price><currency>USD</currency><value>2399.95</value></price>
+ <price><currency>EUR</currency><value>1499.95</value></price>
+ <price><currency>GBP</currency><value>1699.95</value></price>
+ </recurringPrice>
+ </finalPhase>
+ </plan>
+ </plans>
+ <priceLists>
+ <defaultPriceList name="DEFAULT">
+ <plans>
+ <plan>pistol-monthly</plan>
+ <plan>shotgun-monthly</plan>
+ <plan>shotgun-annual</plan>
+ </plans>
+ </defaultPriceList>
+ </priceLists>
+</catalog>
diff --git a/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-3.xml b/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-3.xml
new file mode 100644
index 0000000..f7ae066
--- /dev/null
+++ b/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-3.xml
@@ -0,0 +1,159 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+ ~ Copyright 2010-2011 Ning, Inc.
+ ~
+ ~ Ning licenses this file to you under the Apache License, version 2.0
+ ~ (the "License"); you may not use this file except in compliance with the
+ ~ License. You may obtain a copy of the License at:
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ ~ License for the specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<catalog xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:noNamespaceSchemaLocation="../CatalogSchema.xsd ">
+
+ <effectiveDate>2011-03-03T00:00:00+00:00</effectiveDate>
+ <catalogName>WeaponsHireSmall</catalogName>
+
+ <currencies>
+ <currency>USD</currency>
+ <currency>EUR</currency>
+ <currency>GBP</currency>
+ </currencies>
+
+ <products>
+ <product name="Pistol">
+ <category>BASE</category>
+ </product>
+ <product name="Shotgun">
+ <category>BASE</category>
+ </product>
+ <product name="Laser-Scope">
+ <category>ADD_ON</category>
+ </product>
+ </products>
+
+ <rules>
+ <changePolicy>
+ <changePolicyCase>
+ <fromBillingPeriod>MONTHLY</fromBillingPeriod>
+ <toProduct>Shotgun</toProduct>
+ <toBillingPeriod>MONTHLY</toBillingPeriod>
+ <policy>END_OF_TERM</policy>
+ </changePolicyCase>
+ <changePolicyCase>
+ <phaseType>TRIAL</phaseType>
+ <policy>IMMEDIATE</policy>
+ </changePolicyCase>
+ </changePolicy>
+ <changeAlignment>
+ <changeAlignmentCase>
+ <alignment>START_OF_SUBSCRIPTION</alignment>
+ </changeAlignmentCase>
+ </changeAlignment>
+ <createAlignment>
+ <createAlignmentCase>
+ <product>Laser-Scope</product>
+ <alignment>START_OF_SUBSCRIPTION</alignment>
+ </createAlignmentCase>
+ <createAlignmentCase>
+ <alignment>START_OF_BUNDLE</alignment>
+ </createAlignmentCase>
+ </createAlignment>
+ </rules>
+
+
+ <plans>
+ <plan name="pistol-monthly">
+ <product>Pistol</product>
+ <initialPhases>
+ <phase type="TRIAL">
+ <duration>
+ <unit>DAYS</unit>
+ <number>30</number>
+ </duration>
+ <billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+ <fixedPrice> <!-- empty price implies $0 -->
+ </fixedPrice>
+ </phase>
+ </initialPhases>
+ <finalPhase type="EVERGREEN">
+ <duration>
+ <unit>UNLIMITED</unit>
+ </duration>
+ <billingPeriod>MONTHLY</billingPeriod>
+ <recurringPrice>
+ <price><currency>GBP</currency><value>3.0</value></price>
+ <price><currency>EUR</currency><value>3.0</value></price>
+ <price><currency>USD</currency><value>3.0</value></price>
+ </recurringPrice>
+ </finalPhase>
+ </plan>
+ <plan name="shotgun-monthly">
+ <product>Shotgun</product>
+ <initialPhases>
+ <phase type="TRIAL">
+ <duration>
+ <unit>DAYS</unit>
+ <number>30</number>
+ </duration>
+ <billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+ <fixedPrice></fixedPrice>
+ <!-- no price implies $0 -->
+ </phase>
+ </initialPhases>
+ <finalPhase type="EVERGREEN">
+ <duration>
+ <unit>UNLIMITED</unit>
+ <number>-1</number>
+ </duration>
+ <billingPeriod>MONTHLY</billingPeriod>
+ <recurringPrice>
+ <price><currency>USD</currency><value>249.95</value></price>
+ <price><currency>EUR</currency><value>149.95</value></price>
+ <price><currency>GBP</currency><value>169.95</value></price>
+ </recurringPrice>
+ </finalPhase>
+ </plan>
+ <plan name="shotgun-annual">
+ <product>Shotgun</product>
+ <initialPhases>
+ <phase type="TRIAL">
+ <duration>
+ <unit>DAYS</unit>
+ <number>30</number>
+ </duration>
+ <billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+ <fixedPrice>
+ </fixedPrice>
+ </phase>
+ </initialPhases>
+ <finalPhase type="EVERGREEN">
+ <duration>
+ <unit>UNLIMITED</unit>
+ </duration>
+ <billingPeriod>ANNUAL</billingPeriod>
+ <recurringPrice>
+ <price><currency>USD</currency><value>2399.95</value></price>
+ <price><currency>EUR</currency><value>1499.95</value></price>
+ <price><currency>GBP</currency><value>1699.95</value></price>
+ </recurringPrice>
+ </finalPhase>
+ </plan>
+ </plans>
+ <priceLists>
+ <defaultPriceList name="DEFAULT">
+ <plans>
+ <plan>pistol-monthly</plan>
+ <plan>shotgun-monthly</plan>
+ <plan>shotgun-annual</plan>
+ </plans>
+ </defaultPriceList>
+ </priceLists>
+</catalog>
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
index 22f4990..d9e29f0 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
@@ -22,7 +22,7 @@ import com.ning.billing.invoice.api.Invoice;
import com.ning.billing.invoice.api.InvoiceUserApi;
import com.ning.billing.invoice.dao.DefaultInvoiceDao;
import com.ning.billing.invoice.dao.InvoiceDao;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.IDBI;
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index 20f33c7..8612eb4 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -23,7 +23,7 @@ import com.ning.billing.invoice.api.Invoice;
import com.ning.billing.invoice.api.InvoiceCreationNotification;
import com.ning.billing.invoice.api.InvoiceItem;
import com.ning.billing.invoice.api.user.DefaultInvoiceCreationNotification;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
@@ -38,11 +38,11 @@ import java.util.UUID;
public class DefaultInvoiceDao implements InvoiceDao {
private final InvoiceSqlDao invoiceDao;
- private final EventBus eventBus;
+ private final Bus eventBus;
private final static Logger log = LoggerFactory.getLogger(DefaultInvoiceDao.class);
@Inject
- public DefaultInvoiceDao(final IDBI dbi, final EventBus eventBus) {
+ public DefaultInvoiceDao(final IDBI dbi, final Bus eventBus) {
this.invoiceDao = dbi.onDemand(InvoiceSqlDao.class);
this.eventBus = eventBus;
}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateEvent.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateEvent.java
new file mode 100644
index 0000000..59ec8a2
--- /dev/null
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.notification;
+
+import java.util.UUID;
+
+import com.ning.billing.util.eventbus.BusEvent;
+
+public class NextBillingDateEvent implements BusEvent{
+ private final UUID subscriptionId;
+
+ public NextBillingDateEvent(UUID subscriptionId) {
+ super();
+ this.subscriptionId = subscriptionId;
+ }
+
+ public UUID getSubscriptionId() {
+ return subscriptionId;
+ }
+}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
new file mode 100644
index 0000000..a8a1b9e
--- /dev/null
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.notification;
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.config.InvoiceConfig;
+import com.ning.billing.entitlement.exceptions.EntitlementError;
+import com.ning.billing.lifecycle.KillbillService;
+import com.ning.billing.lifecycle.LifecycleHandlerType;
+import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
+import com.ning.billing.util.eventbus.Bus;
+import com.ning.billing.util.eventbus.Bus.EventBusException;
+import com.ning.billing.util.notificationq.NotificationConfig;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotficationQueueAlreadyExists;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+
+public class NextBillingDateNotifier implements KillbillService {
+ private final static Logger log = LoggerFactory.getLogger(NextBillingDateNotifier.class);
+
+ private static final String NEXT_BILLING_DATE_NOTIFIER_SERVICE_NAME = "next-billing-date-notifier";
+ private static final String NEXT_BILLING_DATE_NOTIFIER_QUEUE = "next-billing-date-queue";
+
+ private final Bus eventBus;
+ private boolean stoppedNotificationThread;
+ private boolean startedNotificationThread;
+ private final NotificationQueueService notificationQueueService;
+ private NotificationQueue nextBillingQueue;
+ private InvoiceConfig config;
+
+ @Inject
+ public NextBillingDateNotifier(NotificationQueueService notificationQueueService, Bus eventBus, InvoiceConfig config){
+ this.notificationQueueService = notificationQueueService;
+ this.config = config;
+ this.eventBus = eventBus;
+ }
+
+
+ @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+ public void initialize() {
+ try {
+ this.stoppedNotificationThread = false;
+ this.startedNotificationThread = false;
+ nextBillingQueue = notificationQueueService.createNotificationQueue(NEXT_BILLING_DATE_NOTIFIER_SERVICE_NAME,
+ NEXT_BILLING_DATE_NOTIFIER_QUEUE,
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(String notificationKey) {
+ UUID subscriptionId;
+ try {
+ subscriptionId = UUID.fromString(notificationKey);
+ } catch (IllegalArgumentException e) {
+ log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID",e);
+ return;
+ }
+
+ processEventReady(subscriptionId);
+ }
+ },
+ new NotificationConfig() {
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return config.isEventProcessingOff();
+ }
+ @Override
+ public long getNotificationSleepTimeMs() {
+ return config.getNotificationSleepTimeMs();
+ }
+ @Override
+ public int getDaoMaxReadyEvents() {
+ return config.getDaoMaxReadyEvents();
+ }
+ @Override
+ public long getDaoClaimTimeMs() {
+ return config.getDaoMaxReadyEvents();
+ }
+ });
+ } catch (NotficationQueueAlreadyExists e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+ public void start() {
+ nextBillingQueue.startQueue();
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+ public void stop() {
+ if (nextBillingQueue != null) {
+ nextBillingQueue.stopQueue();
+ }
+ startedNotificationThread = false;
+ }
+
+ @Override
+ public String getName() {
+ return NEXT_BILLING_DATE_NOTIFIER_SERVICE_NAME;
+ }
+
+ public void processEventReady(UUID subscriptionId) {
+ try {
+ eventBus.post(new NextBillingDateEvent(subscriptionId));
+ } catch (EventBusException e) {
+ log.error("Failed to post entitlement event " + subscriptionId, e);
+ }
+ }
+
+}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
index 268aaf5..7d41ed8 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
@@ -24,7 +24,7 @@ import com.google.inject.Injector;
import com.google.inject.Stage;
import com.ning.billing.invoice.glue.InvoiceModuleMock;
import com.ning.billing.util.eventbus.DefaultEventBusService;
-import com.ning.billing.util.eventbus.EventBusService;
+import com.ning.billing.util.eventbus.BusService;
import static org.testng.Assert.fail;
@@ -47,7 +47,7 @@ public abstract class InvoiceDaoTestBase {
invoiceItemDao = module.getInvoiceItemDao();
- EventBusService busService = injector.getInstance(EventBusService.class);
+ BusService busService = injector.getInstance(BusService.class);
((DefaultEventBusService) busService).startBus();
}
catch (Throwable t) {
diff --git a/util/src/main/java/com/ning/billing/util/eventbus/DefaultEventBusService.java b/util/src/main/java/com/ning/billing/util/eventbus/DefaultEventBusService.java
index afcae5e..01ad9b8 100644
--- a/util/src/main/java/com/ning/billing/util/eventbus/DefaultEventBusService.java
+++ b/util/src/main/java/com/ning/billing/util/eventbus/DefaultEventBusService.java
@@ -20,14 +20,14 @@ import com.google.inject.Inject;
import com.ning.billing.lifecycle.LifecycleHandlerType;
import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
-public class DefaultEventBusService implements EventBusService {
+public class DefaultEventBusService implements BusService {
private final static String EVENT_BUS_SERVICE = "eventbus-service";
- private final EventBus eventBus;
+ private final Bus eventBus;
@Inject
- public DefaultEventBusService(EventBus eventBus) {
+ public DefaultEventBusService(Bus eventBus) {
this.eventBus = eventBus;
}
@@ -47,7 +47,7 @@ public class DefaultEventBusService implements EventBusService {
}
@Override
- public EventBus getEventBus() {
+ public Bus getBus() {
return eventBus;
}
diff --git a/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java b/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
index 259736d..7312e5b 100644
--- a/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
+++ b/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
@@ -26,7 +26,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
-public class MemoryEventBus implements EventBus {
+public class MemoryEventBus implements Bus {
// STEPH config ?
private final static int MAX_EVENT_THREADS = 1;
@@ -92,13 +92,13 @@ public class MemoryEventBus implements EventBus {
}
@Override
- public void post(EventBusNotification event) throws EventBusException {
+ public void post(BusEvent event) throws EventBusException {
checkInitialized("post");
delegate.post(event);
}
@Override
- public void postFromTransaction(EventBusNotification event, Transmogrifier dao) throws EventBusException {
+ public void postFromTransaction(BusEvent event, Transmogrifier dao) throws EventBusException {
checkInitialized("postFromTransaction");
delegate.post(event);
}
diff --git a/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java b/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java
index 078c331..8e6830e 100644
--- a/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java
@@ -18,16 +18,16 @@ package com.ning.billing.util.glue;
import com.google.inject.AbstractModule;
import com.ning.billing.util.eventbus.DefaultEventBusService;
-import com.ning.billing.util.eventbus.EventBus;
-import com.ning.billing.util.eventbus.EventBusService;
+import com.ning.billing.util.eventbus.Bus;
+import com.ning.billing.util.eventbus.BusService;
import com.ning.billing.util.eventbus.MemoryEventBus;
public class EventBusModule extends AbstractModule {
@Override
protected void configure() {
- bind(EventBusService.class).to(DefaultEventBusService.class);
- bind(EventBus.class).to(MemoryEventBus.class).asEagerSingleton();
+ bind(BusService.class).to(DefaultEventBusService.class);
+ bind(Bus.class).to(MemoryEventBus.class).asEagerSingleton();
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 80f7385..2f18379 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -40,6 +40,8 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
this.dao = dbi.onDemand(NotificationSqlDao.class);
}
+
+
@Override
protected void doProcessEvents(int sequenceId) {
List<Notification> notifications = getReadyNotifications(sequenceId);
@@ -116,4 +118,5 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
}
return result;
}
+
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationError.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationError.java
new file mode 100644
index 0000000..4e771ba
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationError.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+public class NotificationError extends Error {
+
+ private static final long serialVersionUID = 131398536;
+
+ public NotificationError() {
+ super();
+ }
+
+ public NotificationError(String msg, Throwable arg1) {
+ super(msg, arg1);
+ }
+
+ public NotificationError(String msg) {
+ super(msg);
+ }
+
+ public NotificationError(Throwable msg) {
+ super(msg);
+ }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 23f0de0..4ea38f7 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -42,18 +42,17 @@ public interface NotificationQueue {
public void processReadyNotification();
/**
- * Stops the queue.
+ * Stops the queue. Blocks until queue is completely stopped.
*
* @see NotificationQueueHandler.completedQueueStop to be notified when the notification thread exited
*/
public void stopQueue();
/**
- * Starts the queue.
+ * Starts the queue. Blocks until queue has completely started.
*
* @see NotificationQueueHandler.completedQueueStart to be notified when the notification thread started
*/
public void startQueue();
-
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index 6eaf33f..6a2810d 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -17,34 +17,28 @@
package com.ning.billing.util.notificationq;
import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.joda.time.DateTime;
-import org.skife.jdbi.v2.DBI;
-import org.skife.jdbi.v2.Transaction;
-import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ning.billing.util.Hostname;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
-import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
public abstract class NotificationQueueBase implements NotificationQueue {
protected final static Logger log = LoggerFactory.getLogger(NotificationQueueBase.class);
+ private static final long MAX_NOTIFICATION_THREAD_WAIT_MS = 10000; // 10 secs
+ private static final long NOTIFICATION_THREAD_WAIT_INCREMENT_MS = 1000; // 1 sec
+ private static final long NANO_TO_MS = (1000 * 1000);
+
protected static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
protected final long STOP_WAIT_TIMEOUT_MS = 60000;
@@ -63,7 +57,10 @@ public abstract class NotificationQueueBase implements NotificationQueue {
// Use this object's monitor for synchronization (no need for volatile)
protected boolean isProcessingEvents;
-
+
+ private boolean startedComplete = false;
+ private boolean stoppedComplete = false;
+
// Package visibility on purpose
NotificationQueueBase(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
this.clock = clock;
@@ -99,7 +96,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
@Override
public void stopQueue() {
if (config.isNotificationProcessingOff()) {
- handler.completedQueueStop();
+ completedQueueStop();
return;
}
@@ -113,7 +110,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
log.warn("NotificationQueue got interrupted exception when stopping notifications", e);
}
}
-
+ waitForNotificationStopCompletion();
}
@Override
@@ -125,7 +122,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
if (config.isNotificationProcessingOff()) {
log.warn(String.format("KILLBILL NOTIFICATION PROCESSING FOR SVC %s IS OFF !!!", getFullQName()));
- handler.completedQueueStart();
+ completedQueueStart();
return;
}
final NotificationQueueBase notificationQueue = this;
@@ -139,7 +136,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
Thread.currentThread().getId()));
// Thread is now started, notify the listener
- handler.completedQueueStart();
+ completedQueueStart();
try {
while (true) {
@@ -171,7 +168,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
// Just to make it really obvious in the log
e.printStackTrace();
} finally {
- handler.completedQueueStop();
+ completedQueueStop();
log.info(String.format("NotificationQueue thread %s [%d] exited...",
Thread.currentThread().getName(),
Thread.currentThread().getId()));
@@ -182,8 +179,59 @@ public abstract class NotificationQueueBase implements NotificationQueue {
Thread.sleep(config.getNotificationSleepTimeMs());
}
});
+ waitForNotificationStartCompletion();
+ }
+
+ private void completedQueueStop() {
+ synchronized (this) {
+ stoppedComplete = true;
+ this.notifyAll();
+ }
+ }
+
+ private void completedQueueStart() {
+ synchronized (this) {
+ startedComplete = true;
+ this.notifyAll();
+ }
}
+ private void waitForNotificationStartCompletion() {
+ waitForNotificationEventCompletion(true);
+ }
+
+ private void waitForNotificationStopCompletion() {
+ waitForNotificationEventCompletion(false);
+ }
+
+ private void waitForNotificationEventCompletion(boolean startEvent) {
+
+ long ini = System.nanoTime();
+ synchronized(this) {
+ do {
+ if ((startEvent ? startedComplete : stoppedComplete)) {
+ break;
+ }
+ try {
+ this.wait(NOTIFICATION_THREAD_WAIT_INCREMENT_MS);
+ } catch (InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ throw new NotificationError(e);
+ }
+ } while (!(startEvent ? startedComplete : stoppedComplete) &&
+ (System.nanoTime() - ini) / NANO_TO_MS < MAX_NOTIFICATION_THREAD_WAIT_MS);
+
+ if (!(startEvent ? startedComplete : stoppedComplete)) {
+ log.error("Could not {} notification thread in {} msec !!!",
+ (startEvent ? "start" : "stop"),
+ MAX_NOTIFICATION_THREAD_WAIT_MS);
+ throw new NotificationError("Failed to start service!!");
+ }
+ log.info("Notification thread has been {} in {} ms",
+ (startEvent ? "started" : "stopped"),
+ (System.nanoTime() - ini) / NANO_TO_MS);
+ }
+ }
protected String getFullQName() {
return svcName + ":" + queueName;
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index a18906b..c1feca1 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -23,21 +23,12 @@ public interface NotificationQueueService {
public interface NotificationQueueHandler {
/**
- * Called when the Notification thread has been started
- */
- public void completedQueueStart();
-
- /**
* Called for each notification ready
*
* @param key the notification key associated to that notification entry
*/
public void handleReadyNotification(String notificationKey);
- /**
- * Called right before the Notification thread is about to exit
- */
- public void completedQueueStop();
- }
+ }
public static final class NotficationQueueAlreadyExists extends Exception {
private static final long serialVersionUID = 1541281L;
diff --git a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
index 4b0f4a2..0091ab6 100644
--- a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
+++ b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
@@ -28,7 +28,7 @@ public class TestEventBus {
private static final Logger log = LoggerFactory.getLogger(TestEventBus.class);
- private EventBus eventBus;
+ private Bus eventBus;
@BeforeClass
@@ -42,7 +42,7 @@ public class TestEventBus {
eventBus.stop();
}
- public static final class MyEvent implements EventBusNotification {
+ public static final class MyEvent implements BusEvent {
String name;
Long value;
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 2e3bb3c..8058561 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -17,7 +17,6 @@
package com.ning.billing.util.notificationq;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.sql.SQLException;
@@ -28,18 +27,11 @@ import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.joda.time.DateTime;
-import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.tweak.HandleCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.testng.annotations.AfterSuite;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Guice;
@@ -49,8 +41,6 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
-import com.ning.billing.dbi.DBIProvider;
-import com.ning.billing.dbi.DbiConfig;
import com.ning.billing.dbi.MysqlTestingHelper;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
@@ -59,370 +49,243 @@ import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
@Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
public class TestNotificationQueue {
-
- private final static Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
-
- @Inject
- private DBI dbi;
-
- @Inject
- MysqlTestingHelper helper;
-
- @Inject
- private Clock clock;
-
- private DummySqlTest dao;
-
- // private NotificationQueue queue;
-
- private void startMysql() throws IOException, ClassNotFoundException, SQLException {
- final String ddl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
- final String testDdl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl_test.sql"));
- helper.startMysql();
- helper.initDb(ddl);
- helper.initDb(testDdl);
- }
-
- @BeforeSuite(alwaysRun = true)
- public void setup() throws Exception {
- startMysql();
- dao = dbi.onDemand(DummySqlTest.class);
- }
-
- @BeforeTest
- public void beforeTest() {
- dbi.withHandle(new HandleCallback<Void>() {
-
- @Override
- public Void withHandle(Handle handle) throws Exception {
- handle.execute("delete from notifications");
- handle.execute("delete from claimed_notifications");
- handle.execute("delete from dummy");
- return null;
- }
- });
- // Reset time to real value
- ((ClockMock) clock).resetDeltaFromReality();
- }
-
-
-
- /**
- * Verify that we can call start/stop on a disabled queue and that both start/stop callbacks are called
- *
- * @throws InterruptedException
- */
- @Test
- public void testSimpleQueueDisabled() throws InterruptedException {
-
- final TestStartStop testStartStop = new TestStartStop(false, false);
- DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "dead",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(String notificationKey) {
- }
- @Override
- public void completedQueueStop() {
- testStartStop.stopped();
- }
- @Override
- public void completedQueueStart() {
- testStartStop.started();
- }
- },
- getNotificationConfig(true, 100, 1, 10000));
-
- executeTest(testStartStop, queue, new WithTest() {
- @Override
- public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
- // Do nothing
- }
- });
- assertTrue(true);
- }
-
- /**
- * Test that we can post a notification in the future from a transaction and get the notification
- * callback with the correct key when the time is ready
- *
- * @throws InterruptedException
- */
- @Test
- public void testSimpleNotification() throws InterruptedException {
-
- final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
-
- final TestStartStop testStartStop = new TestStartStop(false, false);
- DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(String notificationKey) {
- synchronized (expectedNotifications) {
- expectedNotifications.put(notificationKey, Boolean.TRUE);
- expectedNotifications.notify();
- }
- }
- @Override
- public void completedQueueStop() {
- testStartStop.stopped();
- }
- @Override
- public void completedQueueStart() {
- testStartStop.started();
- }
- },
- getNotificationConfig(false, 100, 1, 10000));
-
-
- executeTest(testStartStop, queue, new WithTest() {
- @Override
- public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
-
- final UUID key = UUID.randomUUID();
- final DummyObject obj = new DummyObject("foo", key);
- final DateTime now = new DateTime();
- final DateTime readyTime = now.plusMillis(2000);
- final NotificationKey notificationKey = new NotificationKey() {
- @Override
- public String toString() {
- return key.toString();
- }
- };
- expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
-
-
- // Insert dummy to be processed in 2 sec'
- dao.inTransaction(new Transaction<Void, DummySqlTest>() {
- @Override
- public Void inTransaction(DummySqlTest transactional,
- TransactionStatus status) throws Exception {
-
- transactional.insertDummy(obj);
- readyQueue.recordFutureNotificationFromTransaction(transactional,
- readyTime, notificationKey);
- return null;
- }
- });
-
- // Move time in the future after the notification effectiveDate
- ((ClockMock) clock).setDeltaFromReality(3000);
-
- // Notification should have kicked but give it at least a sec' for thread scheduling
- int nbTry = 1;
- boolean success = false;
- do {
- synchronized(expectedNotifications) {
- if (expectedNotifications.get(notificationKey.toString())) {
- success = true;
- break;
- }
- expectedNotifications.wait(1000);
- }
- } while (nbTry-- > 0);
- assertEquals(success, true);
- }
- });
- }
-
- @Test
- public void testManyNotifications() throws InterruptedException {
- final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
-
- final TestStartStop testStartStop = new TestStartStop(false, false);
- DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(String notificationKey) {
- synchronized (expectedNotifications) {
- expectedNotifications.put(notificationKey, Boolean.TRUE);
- expectedNotifications.notify();
- }
- }
- @Override
- public void completedQueueStop() {
- testStartStop.stopped();
- }
- @Override
- public void completedQueueStart() {
- testStartStop.started();
- }
- },
- getNotificationConfig(false, 100, 10, 10000));
-
-
- executeTest(testStartStop, queue, new WithTest() {
- @Override
- public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
-
- final DateTime now = clock.getUTCNow();
- final int MAX_NOTIFICATIONS = 100;
- for (int i = 0; i < MAX_NOTIFICATIONS; i++) {
-
- final int nextReadyTimeIncrementMs = 1000;
-
- final UUID key = UUID.randomUUID();
- final DummyObject obj = new DummyObject("foo", key);
- final int currentIteration = i;
-
- final NotificationKey notificationKey = new NotificationKey() {
- @Override
- public String toString() {
- return key.toString();
- }
- };
- expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
-
- dao.inTransaction(new Transaction<Void, DummySqlTest>() {
- @Override
- public Void inTransaction(DummySqlTest transactional,
- TransactionStatus status) throws Exception {
-
- transactional.insertDummy(obj);
- readyQueue.recordFutureNotificationFromTransaction(transactional,
- now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), notificationKey);
- return null;
- }
- });
-
- // Move time in the future after the notification effectiveDate
- if (i == 0) {
- ((ClockMock) clock).setDeltaFromReality(nextReadyTimeIncrementMs);
- } else {
- ((ClockMock) clock).addDeltaFromReality(nextReadyTimeIncrementMs);
- }
- }
-
- // Wait a little longer since there are a lot of callback that need to happen
- int nbTry = MAX_NOTIFICATIONS + 1;
- boolean success = false;
- do {
- synchronized(expectedNotifications) {
-
- Collection<Boolean> completed = Collections2.filter(expectedNotifications.values(), new Predicate<Boolean>() {
- @Override
- public boolean apply(Boolean input) {
- return input;
- }
- });
-
- if (completed.size() == MAX_NOTIFICATIONS) {
- success = true;
- break;
- }
- //log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
- expectedNotifications.wait(1000);
- }
- } while (nbTry-- > 0);
- assertEquals(success, true);
- }
- });
- }
-
-
- NotificationConfig getNotificationConfig(final boolean off,
- final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
- return new NotificationConfig() {
- @Override
- public boolean isNotificationProcessingOff() {
- return off;
- }
- @Override
- public long getNotificationSleepTimeMs() {
- return sleepTime;
- }
- @Override
- public int getDaoMaxReadyEvents() {
- return maxReadyEvents;
- }
- @Override
- public long getDaoClaimTimeMs() {
- return claimTimeMs;
- }
- };
- }
-
- private static class TestStartStop {
- private boolean started;
- private boolean stopped;
-
- public TestStartStop(boolean started, boolean stopped) {
- super();
- this.started = started;
- this.stopped = stopped;
- }
-
- public void started() {
- synchronized(this) {
- started = true;
- notify();
- }
- }
-
- public void stopped() {
- synchronized(this) {
- stopped = true;
- notify();
- }
- }
-
- public boolean waitForStartComplete(int timeoutMs) throws InterruptedException {
- return waitForEventCompletion(timeoutMs, true);
- }
-
- public boolean waitForStopComplete(int timeoutMs) throws InterruptedException {
- return waitForEventCompletion(timeoutMs, false);
- }
-
- private boolean waitForEventCompletion(int timeoutMs, boolean start) throws InterruptedException {
- DateTime init = new DateTime();
- synchronized(this) {
- while (! ((start ? started : stopped))) {
- wait(timeoutMs);
- if (init.plusMillis(timeoutMs).isAfterNow()) {
- break;
- }
- }
- }
- return (start ? started : stopped);
- }
- }
-
- private interface WithTest {
- public void test(DefaultNotificationQueue readyQueue) throws InterruptedException;
- }
-
- private void executeTest(final TestStartStop testStartStop,
- DefaultNotificationQueue queue, WithTest test) throws InterruptedException{
-
- queue.startQueue();
- boolean started = testStartStop.waitForStartComplete(3000);
- assertEquals(started, true);
-
- test.test(queue);
-
- queue.stopQueue();
- boolean stopped = testStartStop.waitForStopComplete(3000);
- assertEquals(stopped, true);
- }
-
-
- public static class TestNotificationQueueModule extends AbstractModule {
- @Override
- protected void configure() {
-
- bind(Clock.class).to(ClockMock.class);
-
- final MysqlTestingHelper helper = new MysqlTestingHelper();
- bind(MysqlTestingHelper.class).toInstance(helper);
- DBI dbi = helper.getDBI();
- bind(DBI.class).toInstance(dbi);
- /*
+ @Inject
+ private DBI dbi;
+
+ @Inject
+ MysqlTestingHelper helper;
+
+ @Inject
+ private Clock clock;
+
+ private DummySqlTest dao;
+
+ // private NotificationQueue queue;
+
+ private void startMysql() throws IOException, ClassNotFoundException, SQLException {
+ final String ddl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
+ final String testDdl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl_test.sql"));
+ helper.startMysql();
+ helper.initDb(ddl);
+ helper.initDb(testDdl);
+ }
+
+ @BeforeSuite(alwaysRun = true)
+ public void setup() throws Exception {
+ startMysql();
+ dao = dbi.onDemand(DummySqlTest.class);
+ }
+
+ @BeforeTest
+ public void beforeTest() {
+ dbi.withHandle(new HandleCallback<Void>() {
+
+ @Override
+ public Void withHandle(Handle handle) throws Exception {
+ handle.execute("delete from notifications");
+ handle.execute("delete from claimed_notifications");
+ handle.execute("delete from dummy");
+ return null;
+ }
+ });
+ // Reset time to real value
+ ((ClockMock) clock).resetDeltaFromReality();
+ }
+
+
+
+ /**
+ * Test that we can post a notification in the future from a transaction and get the notification
+ * callback with the correct key when the time is ready
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testSimpleNotification() throws InterruptedException {
+
+ final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+
+ final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(String notificationKey) {
+ synchronized (expectedNotifications) {
+ expectedNotifications.put(notificationKey, Boolean.TRUE);
+ expectedNotifications.notify();
+ }
+ }
+ },
+ getNotificationConfig(false, 100, 1, 10000));
+
+
+ queue.startQueue();
+
+ final UUID key = UUID.randomUUID();
+ final DummyObject obj = new DummyObject("foo", key);
+ final DateTime now = new DateTime();
+ final DateTime readyTime = now.plusMillis(2000);
+ final NotificationKey notificationKey = new NotificationKey() {
+ @Override
+ public String toString() {
+ return key.toString();
+ }
+ };
+ expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+
+
+ // Insert dummy to be processed in 2 sec'
+ dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+ @Override
+ public Void inTransaction(DummySqlTest transactional,
+ TransactionStatus status) throws Exception {
+
+ transactional.insertDummy(obj);
+ queue.recordFutureNotificationFromTransaction(transactional,
+ readyTime, notificationKey);
+ return null;
+ }
+ });
+
+ // Move time in the future after the notification effectiveDate
+ ((ClockMock) clock).setDeltaFromReality(3000);
+
+ // Notification should have kicked but give it at least a sec' for thread scheduling
+ int nbTry = 1;
+ boolean success = false;
+ do {
+ synchronized(expectedNotifications) {
+ if (expectedNotifications.get(notificationKey.toString())) {
+ success = true;
+ break;
+ }
+ expectedNotifications.wait(1000);
+ }
+ } while (nbTry-- > 0);
+ assertEquals(success, true);
+ }
+
+ @Test
+ public void testManyNotifications() throws InterruptedException {
+ final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+
+ final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(String notificationKey) {
+ synchronized (expectedNotifications) {
+ expectedNotifications.put(notificationKey, Boolean.TRUE);
+ expectedNotifications.notify();
+ }
+ }
+ },
+ getNotificationConfig(false, 100, 10, 10000));
+
+
+ queue.startQueue();
+
+ final DateTime now = clock.getUTCNow();
+ final int MAX_NOTIFICATIONS = 100;
+ for (int i = 0; i < MAX_NOTIFICATIONS; i++) {
+
+ final int nextReadyTimeIncrementMs = 1000;
+
+ final UUID key = UUID.randomUUID();
+ final DummyObject obj = new DummyObject("foo", key);
+ final int currentIteration = i;
+
+ final NotificationKey notificationKey = new NotificationKey() {
+ @Override
+ public String toString() {
+ return key.toString();
+ }
+ };
+ expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+
+ dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+ @Override
+ public Void inTransaction(DummySqlTest transactional,
+ TransactionStatus status) throws Exception {
+
+ transactional.insertDummy(obj);
+ queue.recordFutureNotificationFromTransaction(transactional,
+ now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), notificationKey);
+ return null;
+ }
+ });
+
+ // Move time in the future after the notification effectiveDate
+ if (i == 0) {
+ ((ClockMock) clock).setDeltaFromReality(nextReadyTimeIncrementMs);
+ } else {
+ ((ClockMock) clock).addDeltaFromReality(nextReadyTimeIncrementMs);
+ }
+ }
+
+ // Wait a little longer since there are a lot of callback that need to happen
+ int nbTry = MAX_NOTIFICATIONS + 1;
+ boolean success = false;
+ do {
+ synchronized(expectedNotifications) {
+
+ Collection<Boolean> completed = Collections2.filter(expectedNotifications.values(), new Predicate<Boolean>() {
+ @Override
+ public boolean apply(Boolean input) {
+ return input;
+ }
+ });
+
+ if (completed.size() == MAX_NOTIFICATIONS) {
+ success = true;
+ break;
+ }
+ //log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
+ expectedNotifications.wait(1000);
+ }
+ } while (nbTry-- > 0);
+ assertEquals(success, true);
+
+ }
+
+ NotificationConfig getNotificationConfig(final boolean off,
+ final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
+ return new NotificationConfig() {
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return off;
+ }
+ @Override
+ public long getNotificationSleepTimeMs() {
+ return sleepTime;
+ }
+ @Override
+ public int getDaoMaxReadyEvents() {
+ return maxReadyEvents;
+ }
+ @Override
+ public long getDaoClaimTimeMs() {
+ return claimTimeMs;
+ }
+ };
+ }
+
+
+ public static class TestNotificationQueueModule extends AbstractModule {
+ @Override
+ protected void configure() {
+
+ bind(Clock.class).to(ClockMock.class);
+
+ final MysqlTestingHelper helper = new MysqlTestingHelper();
+ bind(MysqlTestingHelper.class).toInstance(helper);
+ DBI dbi = helper.getDBI();
+ bind(DBI.class).toInstance(dbi);
+ /*
bind(DBI.class).toProvider(DBIProvider.class).asEagerSingleton();
final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
bind(DbiConfig.class).toInstance(config);
- */
- }
- }
+ */
+ }
+ }
}