killbill-memoizeit
Changes
entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg 14(+7 -7)
Details
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java
index 55efa97..2b748fe 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java
@@ -187,7 +187,7 @@ public class TestBasic {
public Void inTransaction(Handle h, TransactionStatus status)
throws Exception {
h.execute("truncate table accounts");
- h.execute("truncate table events");
+ h.execute("truncate table entitlement_events");
h.execute("truncate table subscriptions");
h.execute("truncate table bundles");
h.execute("truncate table notifications");
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
index 55ad7f4..dfdc746 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
@@ -1,5 +1,5 @@
-DROP TABLE IF EXISTS events;
-CREATE TABLE events (
+DROP TABLE IF EXISTS entitlement_events;
+CREATE TABLE entitlement_events (
id int(11) unsigned NOT NULL AUTO_INCREMENT,
event_id char(36) NOT NULL,
event_type varchar(9) NOT NULL,
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
index 704e2c7..10f565d 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
@@ -15,14 +15,14 @@ getEventById(event_id) ::= <<
, plist_name
, current_version
, is_active
- from events
+ from entitlement_events
where
event_id = :event_id
;
>>
insertEvent() ::= <<
- insert into events (
+ insert into entitlement_events (
event_id
, event_type
, user_type
@@ -54,14 +54,14 @@ insertEvent() ::= <<
>>
removeEvents(subscription_id) ::= <<
- delete from events
+ delete from entitlement_events
where
subscription_id = :subscription_id
;
>>
unactiveEvent(event_id, now) ::= <<
- update events
+ update entitlement_events
set
is_active = 0
, updated_dt = :now
@@ -71,7 +71,7 @@ unactiveEvent(event_id, now) ::= <<
>>
reactiveEvent(event_id, now) ::= <<
- update events
+ update entitlement_events
set
is_active = 1
, updated_dt = :now
@@ -95,7 +95,7 @@ getFutureActiveEventForSubscription(subscription_id, now) ::= <<
, plist_name
, current_version
, is_active
- from events
+ from entitlement_events
where
subscription_id = :subscription_id
and is_active = 1
@@ -123,7 +123,7 @@ getEventsForSubscription(subscription_id) ::= <<
, plist_name
, current_version
, is_active
- from events
+ from entitlement_events
where
subscription_id = :subscription_id
order by
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
index 2204274..c5881f9 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
@@ -58,7 +58,7 @@ public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEnti
public static interface ResetSqlDao extends Transactional<ResetSqlDao>, CloseMe {
- @SqlUpdate("truncate table events")
+ @SqlUpdate("truncate table entitlement_events")
public void resetEvents();
@SqlUpdate("truncate table subscriptions")
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 3c19a2b..97d3bf9 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -38,13 +38,13 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
}
@Override
- protected void doProcessEvents(final int sequenceId) {
+ protected boolean doProcessEvents(final int sequenceId) {
logDebug("ENTER doProcessEvents");
List<Notification> notifications = getReadyNotifications(sequenceId);
if (notifications.size() == 0) {
logDebug("EXIT doProcessEvents");
- return;
+ return false;
}
logDebug("START processing %d events at time %s", notifications.size(), clock.getUTCNow().toDate());
@@ -58,6 +58,7 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
logDebug("done handling notification %s, key = %s for time %s",
cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
}
+ return true;
}
@Override
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 4ea38f7..4826356 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -38,8 +38,9 @@ public interface NotificationQueue {
* This is only valid when the queue has been configured with isNotificationProcessingOff is true
* In which case, it will callback users for all the ready notifications.
*
+ * @return true if we processed some active notifications
*/
- public void processReadyNotification();
+ public boolean processReadyNotification();
/**
* Stops the queue. Blocks until queue is completely stopped.
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index 9a42d2e..15679f6 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -57,10 +57,10 @@ public abstract class NotificationQueueBase implements NotificationQueue {
// Use this object's monitor for synchronization (no need for volatile)
protected boolean isProcessingEvents;
-
+
private boolean startedComplete = false;
private boolean stoppedComplete = false;
-
+
// Package visibility on purpose
NotificationQueueBase(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
this.clock = clock;
@@ -88,8 +88,8 @@ public abstract class NotificationQueueBase implements NotificationQueue {
@Override
- public void processReadyNotification() {
- doProcessEvents(sequenceId.incrementAndGet());
+ public boolean processReadyNotification() {
+ return doProcessEvents(sequenceId.incrementAndGet());
}
@@ -181,14 +181,14 @@ public abstract class NotificationQueueBase implements NotificationQueue {
});
waitForNotificationStartCompletion();
}
-
+
private void completedQueueStop() {
synchronized (this) {
stoppedComplete = true;
this.notifyAll();
}
}
-
+
private void completedQueueStart() {
synchronized (this) {
startedComplete = true;
@@ -237,5 +237,5 @@ public abstract class NotificationQueueBase implements NotificationQueue {
return svcName + ":" + queueName;
}
- protected abstract void doProcessEvents(int sequenceId);
+ protected abstract boolean doProcessEvents(int sequenceId);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index 5cb00aa..ee04781 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -76,7 +76,7 @@ public interface NotificationQueueService {
/**
*
* @param services
- * @return
+ * @return whether or not things were ready in the queue
*/
- public void triggerManualQueueProcessing(final String [] services);
+ public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
index 7833529..85d92c9 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -83,8 +83,13 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
}
+ //
+ // Test ONLY
+ //
@Override
- public void triggerManualQueueProcessing(final String [] services) {
+ public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning) {
+
+ boolean result = false;
List<NotificationQueue> manualQueues = null;
if (services == null) {
@@ -102,8 +107,15 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
}
}
for (NotificationQueue cur : manualQueues) {
- cur.processReadyNotification();
+ boolean processedNotifications = true;
+ do {
+ processedNotifications = cur.processReadyNotification();
+ if (result == false) {
+ result = processedNotifications;
+ }
+ } while(keepRunning && processedNotifications);
}
+ return result;
}
private final void addQueuesForService(final List<NotificationQueue> result, final String svcName) {
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index e1da366..922fb29 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -59,7 +59,9 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
}
@Override
- protected void doProcessEvents(int sequenceId) {
+ protected boolean doProcessEvents(int sequenceId) {
+
+ boolean result = false;
List<Notification> processedNotifications = new ArrayList<Notification>();
List<Notification> oldNotifications = new ArrayList<Notification>();
@@ -73,7 +75,10 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
readyNotifications.add(cur);
}
}
+
+ result = readyNotifications.size() > 0;
for (Notification cur : readyNotifications) {
+
handler.handleReadyNotification(cur.getNotificationKey());
DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
oldNotifications.add(cur);
@@ -87,6 +92,6 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
notifications.addAll(processedNotifications);
}
}
-
+ return result;
}
}