Details
diff --git a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
index 9e4460b..31105c8 100644
--- a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
@@ -28,9 +28,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class InMemoryBus implements Bus {
- // STEPH config ?
- private final static int MAX_EVENT_THREADS = 1;
-
private final static String EVENT_BUS_IDENTIFIER = "bus-service";
private final static String EVENT_BUS_GROUP_NAME = "bus-grp";
private final static String EVENT_BUS_TH_NAME = "bus-th";
@@ -68,7 +65,7 @@ public class InMemoryBus implements Bus {
public InMemoryBus() {
final ThreadGroup group = new ThreadGroup(EVENT_BUS_GROUP_NAME);
- Executor executor = Executors.newFixedThreadPool(MAX_EVENT_THREADS, new ThreadFactory() {
+ Executor executor = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(group, r, EVENT_BUS_TH_NAME);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 4b4aa89..8e2aaf8 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -38,27 +38,29 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
}
@Override
- protected boolean doProcessEvents(final int sequenceId) {
+ protected int doProcessEvents(final int sequenceId) {
logDebug("ENTER doProcessEvents");
List<Notification> notifications = getReadyNotifications(sequenceId);
if (notifications.size() == 0) {
logDebug("EXIT doProcessEvents");
- return false;
+ return 0;
}
logDebug("START processing %d events at time %s", notifications.size(), clock.getUTCNow().toDate());
+ int result = 0;
for (final Notification cur : notifications) {
nbProcessedEvents.incrementAndGet();
logDebug("handling notification %s, key = %s for time %s",
cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
+ result++;
clearNotification(cur);
logDebug("done handling notification %s, key = %s for time %s",
cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
}
- return true;
+ return result;
}
@Override
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 4826356..e1dcdbf 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -38,9 +38,9 @@ public interface NotificationQueue {
* This is only valid when the queue has been configured with isNotificationProcessingOff is true
* In which case, it will callback users for all the ready notifications.
*
- * @return true if we processed some active notifications
+ * @return the number of entries we processed
*/
- public boolean processReadyNotification();
+ public int processReadyNotification();
/**
* Stops the queue. Blocks until queue is completely stopped.
@@ -56,4 +56,10 @@ public interface NotificationQueue {
*/
public void startQueue();
+ /**
+ *
+ * @return the name of that queue
+ */
+ public String getFullQName();
+
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index 15679f6..cc1ea28 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -88,7 +88,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
@Override
- public boolean processReadyNotification() {
+ public int processReadyNotification() {
return doProcessEvents(sequenceId.incrementAndGet());
}
@@ -233,9 +233,10 @@ public abstract class NotificationQueueBase implements NotificationQueue {
}
}
- protected String getFullQName() {
+ @Override
+ public String getFullQName() {
return svcName + ":" + queueName;
}
- protected abstract boolean doProcessEvents(int sequenceId);
+ protected abstract int doProcessEvents(int sequenceId);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index 72816f2..4d56b03 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -78,7 +78,7 @@ public interface NotificationQueueService {
/**
*
* @param services
- * @return whether or not things were ready in the queue
+ * @return the number of processed notifications
*/
- public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning);
+ public int triggerManualQueueProcessing(final String [] services, final Boolean keepRunning);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
index 85d92c9..3f8f26f 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -87,9 +87,9 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
// Test ONLY
//
@Override
- public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning) {
+ public int triggerManualQueueProcessing(final String [] services, final Boolean keepRunning) {
- boolean result = false;
+ int result = 0;
List<NotificationQueue> manualQueues = null;
if (services == null) {
@@ -107,13 +107,12 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
}
}
for (NotificationQueue cur : manualQueues) {
- boolean processedNotifications = true;
+ int processedNotifications = 0;
do {
processedNotifications = cur.processReadyNotification();
- if (result == false) {
- result = processedNotifications;
- }
- } while(keepRunning && processedNotifications);
+ log.info("Got {} results from queue {}", processedNotifications, cur.getFullQName());
+ result += processedNotifications;
+ } while(keepRunning && processedNotifications > 0);
}
return result;
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index c3eecc0..e96d2cf 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -59,9 +59,9 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
}
@Override
- protected boolean doProcessEvents(int sequenceId) {
+ protected int doProcessEvents(int sequenceId) {
- boolean result = false;
+ int result = 0;
List<Notification> processedNotifications = new ArrayList<Notification>();
List<Notification> oldNotifications = new ArrayList<Notification>();
@@ -76,7 +76,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
}
}
- result = readyNotifications.size() > 0;
+ result = readyNotifications.size();
for (Notification cur : readyNotifications) {
handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());