killbill-aplcache
Changes
overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java 112(+112 -0)
overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java 73(+73 -0)
overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicatorBundle.java 84(+0 -84)
Details
diff --git a/api/src/main/java/com/ning/billing/overdue/OverdueService.java b/api/src/main/java/com/ning/billing/overdue/OverdueService.java
index 068bc63..8841046 100644
--- a/api/src/main/java/com/ning/billing/overdue/OverdueService.java
+++ b/api/src/main/java/com/ning/billing/overdue/OverdueService.java
@@ -16,9 +16,6 @@
package com.ning.billing.overdue;
-import com.ning.billing.catalog.api.overdue.OverdueError;
-import com.ning.billing.catalog.api.overdue.OverdueState;
-import com.ning.billing.catalog.api.overdue.Overdueable;
import com.ning.billing.lifecycle.KillbillService;
public interface OverdueService extends KillbillService {
@@ -26,5 +23,4 @@ public interface OverdueService extends KillbillService {
public OverdueUserApi getUserApi();
- public <T extends Overdueable> OverdueState<T> refresh(T overdueable) throws OverdueError;
}
diff --git a/api/src/main/java/com/ning/billing/overdue/OverdueUserApi.java b/api/src/main/java/com/ning/billing/overdue/OverdueUserApi.java
index e2ead75..0cfbb6b 100644
--- a/api/src/main/java/com/ning/billing/overdue/OverdueUserApi.java
+++ b/api/src/main/java/com/ning/billing/overdue/OverdueUserApi.java
@@ -23,8 +23,6 @@ import com.ning.billing.catalog.api.overdue.Overdueable;
public interface OverdueUserApi {
- public <T extends Overdueable> OverdueState<T> getOverdueStateFor(T overdueable) throws OverdueError;
-
public <T extends Overdueable> OverdueState<T> refreshOverdueStateFor(T overdueable) throws OverdueError;
public <T extends Overdueable> void setOverrideBillingStateForAccount(T overdueable, BillingState<T> state) throws OverdueError;
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
new file mode 100644
index 0000000..cbcbd9f
--- /dev/null
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.ovedue.notification;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.config.InvoiceConfig;
+import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.util.notificationq.NotificationConfig;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+
+public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
+
+ private final static Logger log = LoggerFactory.getLogger(DefaultOverdueCheckNotifier.class);
+
+ public static final String OVERDUE_CHECK_NOTIFIER_QUEUE = "overdue-check-queue";
+
+ private final NotificationQueueService notificationQueueService;
+ private final InvoiceConfig config;
+
+ private NotificationQueue overdueQueue;
+ private final OverdueListener listener;
+
+ @Inject
+ public DefaultOverdueCheckNotifier(NotificationQueueService notificationQueueService,
+ InvoiceConfig config, OverdueListener listener){
+ this.notificationQueueService = notificationQueueService;
+ this.config = config;
+ this.listener = listener;
+ }
+
+ @Override
+ public void initialize() {
+ try {
+ overdueQueue = notificationQueueService.createNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
+ OVERDUE_CHECK_NOTIFIER_QUEUE,
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(String notificationKey, DateTime eventDate) {
+ try {
+ UUID key = UUID.fromString(notificationKey);
+ processEvent(key , eventDate);
+ } catch (IllegalArgumentException e) {
+ log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID", e);
+ return;
+ }
+
+ }
+ },
+ new NotificationConfig() {
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return config.isEventProcessingOff();
+ }
+ @Override
+ public long getNotificationSleepTimeMs() {
+ return config.getNotificationSleepTimeMs();
+ }
+ @Override
+ public int getDaoMaxReadyEvents() {
+ return config.getDaoMaxReadyEvents();
+ }
+ @Override
+ public long getDaoClaimTimeMs() {
+ return config.getDaoClaimTimeMs();
+ }
+ });
+ } catch (NotificationQueueAlreadyExists e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void start() {
+ overdueQueue.startQueue();
+ }
+
+ @Override
+ public void stop() {
+ if (overdueQueue != null) {
+ overdueQueue.stopQueue();
+ }
+ }
+
+ private void processEvent(UUID overdueableId, DateTime eventDateTime) {
+ listener.handleNextOverdueCheck(overdueableId, eventDateTime);
+ }
+
+
+}
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
new file mode 100644
index 0000000..e699414
--- /dev/null
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.ovedue.notification;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.catalog.api.overdue.Overdueable;
+import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
+public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
+ private final static Logger log = LoggerFactory.getLogger(DefaultOverdueCheckNotifier.class);
+
+ private final NotificationQueueService notificationQueueService;
+
+ @Inject
+ public DefaultOverdueCheckPoster(
+ NotificationQueueService notificationQueueService) {
+ super();
+ this.notificationQueueService = notificationQueueService;
+ }
+
+ @Override
+ public void insertOverdueCheckNotification(final Transmogrifier transactionalDao, final Overdueable overdueable, final DateTime futureNotificationTime) {
+ NotificationQueue checkOverdueQueue;
+ try {
+ checkOverdueQueue = notificationQueueService.getNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
+ DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
+ log.info("Queuing overdue check notification. id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
+
+ checkOverdueQueue.recordFutureNotificationFromTransaction(transactionalDao, futureNotificationTime, new NotificationKey(){
+ @Override
+ public String toString() {
+ return overdueable.getId().toString();
+ }
+ });
+ } catch (NoSuchNotificationQueue e) {
+ log.error("Attempting to put items on a non-existent queue (DefaultOverdueCheck).", e);
+ }
+ }
+
+ public void clearNotificationEventsFor(final Overdueable overdueable) {
+ NotificationQueue checkOverdueQueue;
+ try {
+ checkOverdueQueue = notificationQueueService.getNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
+ DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
+ checkOverdueQueue.clearNotificationsFor(overdueable.getId());
+ } catch (NoSuchNotificationQueue e) {
+ log.error("Attempting to clear items from a non-existent queue (DefaultOverdueCheck).", e);
+ }
+ }
+}
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckNotifier.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckNotifier.java
new file mode 100644
index 0000000..7ef6ab8
--- /dev/null
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckNotifier.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.ovedue.notification;
+
+
+public interface OverdueCheckNotifier {
+
+ public void initialize();
+
+ public void start();
+
+ public void stop();
+
+}
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckPoster.java
new file mode 100644
index 0000000..8a0ad68
--- /dev/null
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckPoster.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.ovedue.notification;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+import com.ning.billing.catalog.api.overdue.Overdueable;
+
+public interface OverdueCheckPoster {
+
+ void insertOverdueCheckNotification(Transmogrifier transactionalDao,
+ Overdueable overdueable, DateTime futureNotificationTime);
+
+}
\ No newline at end of file
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueListener.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueListener.java
new file mode 100644
index 0000000..e3ddcf3
--- /dev/null
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueListener.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.ovedue.notification;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+
+public class OverdueListener {
+
+ public void handleNextOverdueCheck(UUID subscriptionId, DateTime eventDateTime) {
+ //TODO
+
+ }
+
+}
diff --git a/overdue/src/main/java/com/ning/billing/overdue/api/DefaultOverdueUserApi.java b/overdue/src/main/java/com/ning/billing/overdue/api/DefaultOverdueUserApi.java
index cec80ad..fd72cee 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/api/DefaultOverdueUserApi.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/api/DefaultOverdueUserApi.java
@@ -31,38 +31,26 @@ import com.ning.billing.catalog.api.overdue.Overdueable;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.overdue.OverdueService;
import com.ning.billing.overdue.OverdueUserApi;
+import com.ning.billing.overdue.wrapper.OverdueWrapper;
+import com.ning.billing.overdue.wrapper.OverdueWrapperFactory;
import com.ning.billing.util.overdue.dao.OverdueAccessDao;
public class DefaultOverdueUserApi implements OverdueUserApi{
- private OverdueService service;
- private CatalogService catalogService;
- private OverdueAccessDao accessDao;
-
+
+ private final OverdueWrapperFactory factory;
+
@Inject
- public DefaultOverdueUserApi(OverdueService service, CatalogService catalogService, OverdueAccessDao accessDao) {
- this.service = service;
- this.catalogService = catalogService;
- this.accessDao = accessDao;
+ public DefaultOverdueUserApi(OverdueWrapperFactory factory) {
+ this.factory = factory;
}
- @SuppressWarnings("unchecked")
- @Override
- public <T extends Overdueable> OverdueState<T> getOverdueStateFor(T overdueable) throws OverdueError {
- try {
- String stateName = accessDao.getOverdueStateNameFor(overdueable);
- StaticCatalog catalog = catalogService.getCurrentCatalog();
- OverdueStateSet<SubscriptionBundle> states = catalog.currentBundleOverdueStateSet();
- return (OverdueState<T>) states.findState(stateName);
- } catch (CatalogApiException e) {
- throw new OverdueError(e, ErrorCode.OVERDUE_CAT_ERROR_ENCOUNTERED,overdueable.getId(), overdueable.getClass().getSimpleName());
- }
- }
-
@Override
public <T extends Overdueable> OverdueState<T> refreshOverdueStateFor(T overdueable) throws OverdueError {
- return service.refresh(overdueable);
+ OverdueWrapper<T> wrapper = factory.createOverdueWrapperFor(overdueable);
+ return wrapper.refresh();
}
+
@Override
public <T extends Overdueable> void setOverrideBillingStateForAccount(
diff --git a/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java b/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
index d49d0e6..1323e55 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
@@ -16,12 +16,64 @@
package com.ning.billing.overdue.applicator;
+import org.apache.commons.lang.NotImplementedException;
import org.joda.time.DateTime;
+import com.google.inject.Inject;
import com.ning.billing.catalog.api.overdue.OverdueState;
import com.ning.billing.catalog.api.overdue.Overdueable;
+import com.ning.billing.overdue.dao.OverdueDao;
+import com.ning.billing.util.clock.Clock;
-public interface OverdueStateApplicator<T extends Overdueable>{
+public class OverdueStateApplicator<T extends Overdueable>{
+
+ private final OverdueDao overdueDao;
+ private final Clock clock;
+
+
+
+ @Inject
+ public OverdueStateApplicator(OverdueDao overdueDao, Clock clock) {
+ this.overdueDao = overdueDao;
+ this.clock = clock;
+ }
+
+ public void apply(T overdueable, OverdueState<T> previousOverdueState, OverdueState<T> nextOverdueState, DateTime timeOfNextCheck) {
+ if(previousOverdueState.getName().equals(nextOverdueState.getName())) {
+ return; // nothing to do
+ }
+
+ storeNewState(overdueable, nextOverdueState);
+
+ if(timeOfNextCheck != null && !nextOverdueState.isClearState()) {
+ createFutureNotification(overdueable, timeOfNextCheck);
+ }
+
+ if(nextOverdueState.isClearState()) {
+ clear(overdueable);
+ }
+
+ //If new state is clear state reset next events and override table
+ throw new NotImplementedException();
+ }
+
+
+ protected void storeNewState(T overdueable, OverdueState<T> nextOverdueState) {
+ overdueDao.setOverdueState(overdueable, nextOverdueState, clock);
+ }
+
+ protected void createFutureNotification(T overdueable,
+ DateTime timeOfNextCheck) {
+ // TODO Auto-generated method stub
+
+ }
+
+
+
+ protected void clear(T overdueable) {
+ // Clear future notification checks
+ // Clear any overrides
+
+ }
- public void apply(T overdueable, OverdueState<T> previousOverdueState, OverdueState<T> nextOverdueState, DateTime timeOfNextCheck);
}
diff --git a/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueDao.java b/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueDao.java
index 75286de..fe69e34 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueDao.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueDao.java
@@ -22,6 +22,6 @@ import com.ning.billing.util.clock.Clock;
public interface OverdueDao {
- <T extends Overdueable> void setOverdueStateForBundle(T overdueable, OverdueState<T> newOverdueState, Clock clock);
+ <T extends Overdueable> void setOverdueState(T overdueable, OverdueState<T> newOverdueState, Clock clock);
}
\ No newline at end of file
diff --git a/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueSqlDao.java b/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueSqlDao.java
index dac4050..bd77283 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueSqlDao.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueSqlDao.java
@@ -32,7 +32,7 @@ public interface OverdueSqlDao extends OverdueDao {
@Override
@SqlUpdate
- public abstract <T extends Overdueable> void setOverdueStateForBundle(
+ public abstract <T extends Overdueable> void setOverdueState(
@Bind(binder = OverdueableBinder.class) T overdueable,
@Bind(binder = OverdueStateBinder.class) OverdueState<T> overdueState,
@Bind(binder = CurrentTimeBinder.class) Clock clock) ;
diff --git a/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java b/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java
index 78fc37d..57c69d2 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java
@@ -17,22 +17,16 @@
package com.ning.billing.overdue.service;
import com.google.inject.Inject;
-import com.ning.billing.catalog.api.overdue.OverdueError;
-import com.ning.billing.catalog.api.overdue.OverdueState;
-import com.ning.billing.catalog.api.overdue.Overdueable;
import com.ning.billing.overdue.OverdueService;
import com.ning.billing.overdue.OverdueUserApi;
-import com.ning.billing.overdue.wrapper.OverdueWrapper;
-import com.ning.billing.overdue.wrapper.OverdueWrapperFactory;
public class DefaultOverdueService implements OverdueService {
public static final String OVERDUE_SERVICE_NAME = "overdue-service";
-
- private final OverdueWrapperFactory factory;
+ private OverdueUserApi userApi;
@Inject
- public DefaultOverdueService(OverdueWrapperFactory factory) {
- this.factory = factory;
+ public DefaultOverdueService(OverdueUserApi userApi){
+ this.userApi = userApi;
}
@Override
@@ -42,13 +36,8 @@ public class DefaultOverdueService implements OverdueService {
@Override
public OverdueUserApi getUserApi() {
- return null;
+ return userApi;
}
- @Override
- public <T extends Overdueable> OverdueState<T> refresh(T overdueable) throws OverdueError {
- OverdueWrapper<T> wrapper = factory.createOverdueWrapperFor(overdueable);
- return wrapper.refresh();
- }
-
+
}
diff --git a/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapper.java b/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapper.java
index 1b7feeb..cc943c8 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapper.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapper.java
@@ -21,20 +21,20 @@ import com.ning.billing.catalog.api.overdue.OverdueError;
import com.ning.billing.catalog.api.overdue.OverdueState;
import com.ning.billing.catalog.api.overdue.OverdueStateSet;
import com.ning.billing.catalog.api.overdue.Overdueable;
-import com.ning.billing.overdue.OverdueUserApi;
import com.ning.billing.overdue.applicator.OverdueStateApplicator;
import com.ning.billing.overdue.calculator.BillingStateCalculator;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.overdue.OverdueAccessApi;
public class OverdueWrapper<T extends Overdueable> {
private final T overdueable;
- private final OverdueUserApi api;
+ private final OverdueAccessApi api;
private final Clock clock;
private final OverdueStateSet<T> overdueStateSet;
private final BillingStateCalculator<T> billingStateCalcuator;
private final OverdueStateApplicator<T> overdueStateApplicator;
- public OverdueWrapper(T overdueable, OverdueUserApi api,
+ public OverdueWrapper(T overdueable, OverdueAccessApi api,
OverdueStateSet<T> overdueStateSet,
Clock clock,
BillingStateCalculator<T> billingStateCalcuator,
@@ -49,7 +49,7 @@ public class OverdueWrapper<T extends Overdueable> {
public OverdueState<T> refresh() throws OverdueError {
BillingState<T> billingState = billingStateCalcuator.calculateBillingState(overdueable);
- OverdueState<T> previousOverdueStateName = api.getOverdueStateFor(overdueable);
+ String previousOverdueStateName = api.getOverdueStateNameFor(overdueable);
OverdueState<T> nextOverdueState = overdueStateSet.calculateOverdueState(billingState, clock.getUTCNow());
if(!previousOverdueStateName.equals(nextOverdueState.getName())) {
overdueStateApplicator.apply(overdueable, nextOverdueState, nextOverdueState, overdueStateSet.dateOfNextCheck(billingState, clock.getUTCNow()));
diff --git a/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapperFactory.java b/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapperFactory.java
index 76b3dd0..1df769d 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapperFactory.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapperFactory.java
@@ -23,21 +23,22 @@ import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.catalog.api.overdue.OverdueError;
import com.ning.billing.catalog.api.overdue.Overdueable;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
-import com.ning.billing.overdue.OverdueUserApi;
-import com.ning.billing.overdue.applicator.OverdueStateApplicatorBundle;
+import com.ning.billing.overdue.applicator.OverdueStateApplicator;
import com.ning.billing.overdue.calculator.BillingStateCalculatorBundle;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.overdue.OverdueAccessApi;
public class OverdueWrapperFactory {
private final CatalogService catalogService;
private final BillingStateCalculatorBundle billingStateCalcuatorBundle;
- private final OverdueStateApplicatorBundle overdueStateApplicatorBundle;
- private final OverdueUserApi api;
+ private final OverdueStateApplicator<SubscriptionBundle> overdueStateApplicatorBundle;
+ private final OverdueAccessApi api;
private final Clock clock;
@Inject
- public OverdueWrapperFactory(OverdueUserApi api, CatalogService catalogService, Clock clock, BillingStateCalculatorBundle billingStateCalcuatorBundle, OverdueStateApplicatorBundle overdueStateApplicatorBundle) {
+ public OverdueWrapperFactory(OverdueAccessApi api, CatalogService catalogService, Clock clock,
+ BillingStateCalculatorBundle billingStateCalcuatorBundle, OverdueStateApplicator<SubscriptionBundle> overdueStateApplicatorBundle) {
this.billingStateCalcuatorBundle = billingStateCalcuatorBundle;
this.overdueStateApplicatorBundle = overdueStateApplicatorBundle;
this.catalogService = catalogService;
@@ -45,7 +46,6 @@ public class OverdueWrapperFactory {
this.clock = clock;
}
-
@SuppressWarnings("unchecked")
public <T extends Overdueable> OverdueWrapper<T> createOverdueWrapperFor(T overdueable) throws OverdueError {
try {
diff --git a/overdue/src/test/java/com/ning/billing/overdue/dao/TestOverdueDao.java b/overdue/src/test/java/com/ning/billing/overdue/dao/TestOverdueDao.java
index b49a670..c0efba2 100644
--- a/overdue/src/test/java/com/ning/billing/overdue/dao/TestOverdueDao.java
+++ b/overdue/src/test/java/com/ning/billing/overdue/dao/TestOverdueDao.java
@@ -78,12 +78,12 @@ public class TestOverdueDao {
OverdueState<SubscriptionBundle> state = BrainDeadProxyFactory.createBrainDeadProxyFor(OverdueState.class);
((ZombieControl)state).addResult("getName", overdueStateName);
- dao.setOverdueStateForBundle(bundle, state, clock);
+ dao.setOverdueState(bundle, state, clock);
clock.setDeltaFromReality(1000 * 3600 * 24);
String overdueStateName2 = "NoReallyThisCantGoOn";
((ZombieControl)state).addResult("getName", overdueStateName2);
- dao.setOverdueStateForBundle(bundle, state, clock);
+ dao.setOverdueState(bundle, state, clock);
Assert.assertEquals(accessDao.getOverdueStateNameFor(bundle), overdueStateName2);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 9253e2d..f0a54f3 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -60,6 +60,9 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
public void clearNotification(@Bind("id") long id, @Bind("owner") String owner);
@SqlUpdate
+ public void removeNotificationsByKey(@Bind("notification_key") String key);
+
+ @SqlUpdate
public void insertNotification(@Bind(binder = NotificationSqlDaoBinder.class) Notification evt);
@SqlUpdate
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index c0c88fe..a0e1827 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -19,6 +19,7 @@ package com.ning.billing.util.notificationq;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.UUID;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.IDBI;
@@ -118,4 +119,10 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
log.debug(String.format("Thread %d [queue = %s] %s", Thread.currentThread().getId(), getFullQName(), realDebug));
}
}
+
+ @Override
+ public void removeNotificationsByKey(UUID key) {
+ dao.removeNotificationsByKey(key.toString());
+
+ }
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
index 3200424..32cba9d 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
@@ -24,7 +24,8 @@ public interface NotificationLifecycle {
public enum NotificationLifecycleState {
AVAILABLE,
IN_PROCESSING,
- PROCESSED
+ PROCESSED,
+ REMOVED
}
public String getOwner();
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index fb88d4c..14b68e0 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -16,6 +16,8 @@
package com.ning.billing.util.notificationq;
+import java.util.UUID;
+
import org.joda.time.DateTime;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
@@ -44,6 +46,13 @@ public interface NotificationQueue {
final DateTime futureNotificationTime, final NotificationKey notificationKey);
/**
+ * Remove all notifications associated with this key
+ *
+ * @param key
+ */
+ public void removeNotificationsByKey(UUID key);
+
+ /**
* This is only valid when the queue has been configured with isNotificationProcessingOff is true
* In which case, it will callback users for all the ready notifications.
*
@@ -71,4 +80,6 @@ public interface NotificationQueue {
*/
public String getFullQName();
+
+
}
diff --git a/util/src/main/java/com/ning/billing/util/overdue/DefaultOverdueAcessApi.java b/util/src/main/java/com/ning/billing/util/overdue/DefaultOverdueAcessApi.java
index 5b7e6be..eb1860e 100644
--- a/util/src/main/java/com/ning/billing/util/overdue/DefaultOverdueAcessApi.java
+++ b/util/src/main/java/com/ning/billing/util/overdue/DefaultOverdueAcessApi.java
@@ -17,7 +17,14 @@
package com.ning.billing.util.overdue;
import com.google.inject.Inject;
+import com.ning.billing.ErrorCode;
+import com.ning.billing.catalog.api.CatalogApiException;
+import com.ning.billing.catalog.api.StaticCatalog;
+import com.ning.billing.catalog.api.overdue.OverdueError;
+import com.ning.billing.catalog.api.overdue.OverdueState;
+import com.ning.billing.catalog.api.overdue.OverdueStateSet;
import com.ning.billing.catalog.api.overdue.Overdueable;
+import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.util.overdue.dao.OverdueAccessDao;
public class DefaultOverdueAcessApi implements OverdueAccessApi {
@@ -33,4 +40,16 @@ public class DefaultOverdueAcessApi implements OverdueAccessApi {
return dao.getOverdueStateNameFor(overdueable);
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends Overdueable> OverdueState<T> getOverdueStateFor(T overdueable, StaticCatalog catalog) throws OverdueError {
+ try {
+ String stateName = getOverdueStateNameFor(overdueable);
+ OverdueStateSet<SubscriptionBundle> states = catalog.currentBundleOverdueStateSet();
+ return (OverdueState<T>) states.findState(stateName);
+ } catch (CatalogApiException e) {
+ throw new OverdueError(e, ErrorCode.OVERDUE_CAT_ERROR_ENCOUNTERED,overdueable.getId(), overdueable.getClass().getSimpleName());
+ }
+ }
+
}
diff --git a/util/src/main/java/com/ning/billing/util/overdue/OverdueAccessApi.java b/util/src/main/java/com/ning/billing/util/overdue/OverdueAccessApi.java
index 4ca82d8..496a059 100644
--- a/util/src/main/java/com/ning/billing/util/overdue/OverdueAccessApi.java
+++ b/util/src/main/java/com/ning/billing/util/overdue/OverdueAccessApi.java
@@ -16,6 +16,9 @@
package com.ning.billing.util.overdue;
+import com.ning.billing.catalog.api.StaticCatalog;
+import com.ning.billing.catalog.api.overdue.OverdueError;
+import com.ning.billing.catalog.api.overdue.OverdueState;
import com.ning.billing.catalog.api.overdue.Overdueable;
public interface OverdueAccessApi {
@@ -23,5 +26,8 @@ public interface OverdueAccessApi {
public String getOverdueStateNameFor(Overdueable overdueable);
+ public <T extends Overdueable> OverdueState<T> getOverdueStateFor(T overdueable, StaticCatalog catalog)
+ throws OverdueError;
+
}
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 7a7ecab..899e828 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -16,6 +16,7 @@ getReadyNotifications(now, max) ::= <<
effective_dt \<= :now
and queue_name = :queue_name
and processing_state != 'PROCESSED'
+ and processing_state != 'REMOVED'
and (processing_owner IS NULL OR processing_available_dt \<= :now)
order by
effective_dt asc
@@ -35,6 +36,7 @@ claimNotification(owner, next_available, id, now) ::= <<
where
id = :id
and processing_state != 'PROCESSED'
+ and processing_state != 'REMOVED'
and (processing_owner IS NULL OR processing_available_dt \<= :now)
;
>>
@@ -48,6 +50,16 @@ clearNotification(id, owner) ::= <<
;
>>
+removeNotificationsByKey(notification_key) ::= <<
+ update notifications
+ set
+ processing_state = 'REMOVED'
+ where
+ notification_key = :notification_key
+ ;
+>>
+
+
insertNotification() ::= <<
insert into notifications (
notification_id
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 1c40988..c78c772 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -21,6 +21,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
+import java.util.UUID;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
@@ -109,4 +110,20 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
}
return result;
}
+
+ @Override
+ public void removeNotificationsByKey(UUID key) {
+ List<Notification> toClearNotifications = new ArrayList<Notification>();
+ for (Notification notification : notifications) {
+ if (notification.getNotificationKey().equals(key.toString())) {
+ toClearNotifications.add(notification);
+ }
+ }
+ synchronized(notifications) {
+ if (toClearNotifications.size() > 0) {
+ notifications.removeAll(toClearNotifications);
+ }
+ }
+
+ }
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index fefbcdb..6c60a8c 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -108,6 +108,7 @@ public class TestNotificationQueue {
});
// Reset time to real value
((ClockMock) clock).resetDeltaFromReality();
+ eventsReceived=0;
}
@@ -406,6 +407,82 @@ public class TestNotificationQueue {
}
};
}
+
+
+ @Test(groups="slow")
+ public void testRemoveNotifications() throws InterruptedException {
+
+ final UUID key = UUID.randomUUID();
+ final NotificationKey notificationKey = new NotificationKey() {
+ @Override
+ public String toString() {
+ return key.toString();
+ }
+ };
+ final UUID key2 = UUID.randomUUID();
+ final NotificationKey notificationKey2 = new NotificationKey() {
+ @Override
+ public String toString() {
+ return key2.toString();
+ }
+ };
+
+ final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(String key, DateTime eventDateTime) {
+ if(key.equals(notificationKey) || key.equals(notificationKey2)) { //ignore stray events from other tests
+ log.info("Received notification with key: " + notificationKey);
+ eventsReceived++;
+ }
+ }
+ },
+ getNotificationConfig(false, 100, 10, 10000));
+
+
+ queue.startQueue();
+
+ final DateTime start = clock.getUTCNow().plusHours(1);
+ final int nextReadyTimeIncrementMs = 1000;
+
+ // add 3 events
+
+ dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+ @Override
+ public Void inTransaction(DummySqlTest transactional,
+ TransactionStatus status) throws Exception {
+
+ queue.recordFutureNotificationFromTransaction(transactional,
+ start.plus(nextReadyTimeIncrementMs), notificationKey);
+ queue.recordFutureNotificationFromTransaction(transactional,
+ start.plus(2 *nextReadyTimeIncrementMs), notificationKey);
+ queue.recordFutureNotificationFromTransaction(transactional,
+ start.plus(3 * nextReadyTimeIncrementMs), notificationKey2);
+ return null;
+ }
+ });
+
+
+ queue.removeNotificationsByKey(key); // should remove 2 of the 3
+
+ // Move time in the future after the notification effectiveDate
+ ((ClockMock) clock).setDeltaFromReality(4000000 + nextReadyTimeIncrementMs * 3 );
+
+ try {
+ await().atMost(10, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return eventsReceived >= 2;
+ }
+ });
+ Assert.fail("There should only have been only one event left in the queue we got: " + eventsReceived);
+ } catch (Exception e) {
+ // expected behavior
+ }
+ log.info("Received " + eventsReceived + " events");
+ queue.stopQueue();
+ }
+
public static class TestNotificationQueueModule extends AbstractModule {