killbill-memoizeit

util: uniform persistent bus and notifications * Prefetch

5/7/2013 6:51:10 PM

Details

diff --git a/beatrix/src/test/resources/beatrix.properties b/beatrix/src/test/resources/beatrix.properties
index 8ec7211..36b83d7 100644
--- a/beatrix/src/test/resources/beatrix.properties
+++ b/beatrix/src/test/resources/beatrix.properties
@@ -6,6 +6,7 @@ killbill.invoice.engine.notifications.sleep=100
 killbill.entitlement.engine.notifications.sleep=100
 killbill.billing.util.persistent.bus.sleep=100
 killbill.billing.util.persistent.bus.nbThreads=1
+killbill.billing.util.persistent.bus.prefetch=1
 user.timezone=UTC
 killbill.payment.retry.days=8,8,8,8,8,8,8,8
 killbill.osgi.bundle.install.dir=/var/tmp/beatrix-bundles
diff --git a/entitlement/src/test/resources/entitlement.properties b/entitlement/src/test/resources/entitlement.properties
index 7039cc2..1fcab63 100644
--- a/entitlement/src/test/resources/entitlement.properties
+++ b/entitlement/src/test/resources/entitlement.properties
@@ -4,4 +4,5 @@ killbill.entitlement.dao.ready.max=1
 killbill.entitlement.engine.notifications.sleep=100
 killbill.billing.util.persistent.bus.sleep=100
 killbill.billing.util.persistent.bus.nbThreads=1
+killbill.billing.util.persistent.bus.prefetch=1
 user.timezone=UTC
diff --git a/junction/src/test/resources/resource.properties b/junction/src/test/resources/resource.properties
index d63334b..840728c 100644
--- a/junction/src/test/resources/resource.properties
+++ b/junction/src/test/resources/resource.properties
@@ -2,6 +2,7 @@ killbill.catalog.uri=file:src/test/resources/catalogSample.xml
 killbill.entitlement.dao.claim.time=60000
 killbill.entitlement.dao.ready.max=1
 killbill.entitlement.engine.notifications.sleep=500
+killbill.billing.util.persistent.bus.prefetch=1
 user.timezone=UTC
 
 
diff --git a/overdue/src/test/resources/resource.properties b/overdue/src/test/resources/resource.properties
index d63334b..840728c 100644
--- a/overdue/src/test/resources/resource.properties
+++ b/overdue/src/test/resources/resource.properties
@@ -2,6 +2,7 @@ killbill.catalog.uri=file:src/test/resources/catalogSample.xml
 killbill.entitlement.dao.claim.time=60000
 killbill.entitlement.dao.ready.max=1
 killbill.entitlement.engine.notifications.sleep=500
+killbill.billing.util.persistent.bus.prefetch=1
 user.timezone=UTC
 
 
diff --git a/payment/src/test/resources/payment.properties b/payment/src/test/resources/payment.properties
index 8a11d90..ae3d222 100644
--- a/payment/src/test/resources/payment.properties
+++ b/payment/src/test/resources/payment.properties
@@ -1,4 +1,4 @@
 killbill.payment.failure.retry.start.sec=3600
 killbill.payment.failure.retry.multiplier=1
 killbill.payment.failure.retry.max.attempts=3
-
+killbill.billing.util.persistent.bus.prefetch=1
diff --git a/server/src/test/resources/killbill.properties b/server/src/test/resources/killbill.properties
index bbbb8c1..73a833d 100644
--- a/server/src/test/resources/killbill.properties
+++ b/server/src/test/resources/killbill.properties
@@ -32,6 +32,7 @@ killbill.invoice.engine.notifications.sleep=100
 killbill.entitlement.engine.notifications.sleep=100
 killbill.billing.util.persistent.bus.sleep=100
 killbill.billing.util.persistent.bus.nbThreads=1
+killbill.billing.util.persistent.bus.prefetch=1
 # Local DB 
 #com.ning.billing.dbi.test.useLocalDb=true
 
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
index 3203c11..6f2410f 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
@@ -19,6 +19,7 @@ package com.ning.billing.util.bus.dao;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Date;
+import java.util.List;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
@@ -46,10 +47,10 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
 
     @SqlQuery
     @Mapper(PersistentBusSqlMapper.class)
-    public BusEventEntry getNextBusEventEntry(@Bind("max") int max,
-                                              @Bind("owner") String owner,
-                                              @Bind("now") Date now,
-                                              @BindBean final InternalTenantContext context);
+    public List<BusEventEntry> getNextBusEventEntries(@Bind("max") int max,
+                                                      @Bind("owner") String owner,
+                                                      @Bind("now") Date now,
+                                                      @BindBean final InternalTenantContext context);
 
     @SqlUpdate
     public int claimBusEvent(@Bind("owner") String owner,
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
index 2a3822b..c2a8ddd 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
@@ -24,6 +24,11 @@ import com.ning.billing.util.config.PersistentQueueConfig;
 
 public interface PersistentBusConfig extends PersistentQueueConfig {
 
+    @Config("killbill.billing.util.persistent.bus.prefetch")
+    @Default("5")
+    @Description("Number of bus events to fetch from the database at once")
+    public int getPrefetchAmount();
+
     @Override
     @Config("killbill.billing.util.persistent.bus.sleep")
     @Default("3000")
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
index cc8d4b0..49b4f89 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
@@ -16,8 +16,8 @@
 
 package com.ning.billing.util.bus;
 
-import java.util.Collections;
 import java.util.Date;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -49,7 +49,6 @@ import com.google.inject.Inject;
 public class PersistentInternalBus extends PersistentQueueBase implements InternalBus {
 
     private static final long DELTA_IN_PROCESSING_TIME_MS = 1000L * 60L * 5L; // 5 minutes
-    private static final int MAX_BUS_EVENTS = 1;
 
     private static final Logger log = LoggerFactory.getLogger(PersistentInternalBus.class);
 
@@ -145,19 +144,18 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
         final Date now = clock.getUTCNow().toDate();
         final Date nextAvailable = clock.getUTCNow().plus(DELTA_IN_PROCESSING_TIME_MS).toDate();
 
-        final BusEventEntry input = dao.getNextBusEventEntry(MAX_BUS_EVENTS, hostname, now, context);
-        if (input == null) {
-            return Collections.emptyList();
-        }
-
-        // We need to re-hydrate the context with the record ids from the BusEventEntry
-        final InternalCallContext rehydratedContext = internalCallContextFactory.createInternalCallContext(input.getTenantRecordId(), input.getAccountRecordId(), context);
-        final boolean claimed = (dao.claimBusEvent(hostname, nextAvailable, input.getId(), now, rehydratedContext) == 1);
-        if (claimed) {
-            dao.insertClaimedHistory(hostname, now, input.getId(), rehydratedContext);
-            return Collections.singletonList(input);
+        final List<BusEventEntry> entries = dao.getNextBusEventEntries(config.getPrefetchAmount(), hostname, now, context);
+        final List<BusEventEntry> claimedEntries = new LinkedList<BusEventEntry>();
+        for (final BusEventEntry entry : entries) {
+            // We need to re-hydrate the context with the record ids from the BusEventEntry
+            final InternalCallContext rehydratedContext = internalCallContextFactory.createInternalCallContext(entry.getTenantRecordId(), entry.getAccountRecordId(), context);
+            final boolean claimed = (dao.claimBusEvent(hostname, nextAvailable, entry.getId(), now, rehydratedContext) == 1);
+            if (claimed) {
+                dao.insertClaimedHistory(hostname, now, entry.getId(), rehydratedContext);
+                claimedEntries.add(entry);
+            }
         }
-        return Collections.emptyList();
+        return claimedEntries;
     }
 
     @Override
diff --git a/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java b/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java
index a52ee01..2753e7a 100644
--- a/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java
@@ -18,7 +18,11 @@ package com.ning.billing.util.config;
 
 public interface PersistentQueueConfig {
 
+    public int getPrefetchAmount();
+
     public long getSleepTimeMs();
 
     public boolean isProcessingOff();
+
+    public int getNbThreads();
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java
index dfc8678..b92b974 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java
@@ -40,4 +40,9 @@ public interface NotificationQueueConfig extends PersistentQueueConfig {
     @Default("false")
     @Description("Whether to turn off the notification queue")
     public boolean isProcessingOff();
+
+    @Config("killbill.billing.util.notificationq.notification.nbThreads")
+    @Default("1")
+    @Description("Number of threads to use")
+    public int getNbThreads();
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
index 6899ea7..3e69cd8 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
@@ -34,7 +33,6 @@ import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.weakref.jmx.Managed;
 
 import com.ning.billing.util.Hostname;
 import com.ning.billing.util.callcontext.CallOrigin;
@@ -58,7 +56,6 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
     public static final int CLAIM_TIME_MS = (5 * 60 * 1000); // 5 minutes
 
     private static final String NOTIFICATION_THREAD_NAME = "Notification-queue-dispatch";
-    private static final int NB_THREADS = 1;
 
     private final NotificationQueueConfig config;
     private final String hostname;
@@ -92,7 +89,7 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
                 });
                 return th;
             }
-        }), NB_THREADS, config);
+        }), config.getNbThreads(), config);
 
         this.clock = clock;
         this.config = config;
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index 247cf8f..1e62466 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -37,7 +37,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
     private final int nbThreads;
     private final Executor executor;
     private final String svcQName;
-    private final PersistentQueueConfig config;
+    protected final PersistentQueueConfig config;
     private boolean isProcessingEvents;
     private int curActiveThreads;
 
diff --git a/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
index 18ec252..7fbd5d7 100644
--- a/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
@@ -3,7 +3,7 @@ group PersistentBusSqlDao;
 CHECK_TENANT() ::= "tenant_record_id = :tenantRecordId"
 AND_CHECK_TENANT() ::= "AND <CHECK_TENANT()>"
 
-getNextBusEventEntry() ::= <<
+getNextBusEventEntries() ::= <<
     select
       record_id
       , class_name