Details
diff --git a/beatrix/src/test/resources/beatrix.properties b/beatrix/src/test/resources/beatrix.properties
index 8ec7211..36b83d7 100644
--- a/beatrix/src/test/resources/beatrix.properties
+++ b/beatrix/src/test/resources/beatrix.properties
@@ -6,6 +6,7 @@ killbill.invoice.engine.notifications.sleep=100
killbill.entitlement.engine.notifications.sleep=100
killbill.billing.util.persistent.bus.sleep=100
killbill.billing.util.persistent.bus.nbThreads=1
+killbill.billing.util.persistent.bus.prefetch=1
user.timezone=UTC
killbill.payment.retry.days=8,8,8,8,8,8,8,8
killbill.osgi.bundle.install.dir=/var/tmp/beatrix-bundles
diff --git a/entitlement/src/test/resources/entitlement.properties b/entitlement/src/test/resources/entitlement.properties
index 7039cc2..1fcab63 100644
--- a/entitlement/src/test/resources/entitlement.properties
+++ b/entitlement/src/test/resources/entitlement.properties
@@ -4,4 +4,5 @@ killbill.entitlement.dao.ready.max=1
killbill.entitlement.engine.notifications.sleep=100
killbill.billing.util.persistent.bus.sleep=100
killbill.billing.util.persistent.bus.nbThreads=1
+killbill.billing.util.persistent.bus.prefetch=1
user.timezone=UTC
diff --git a/junction/src/test/resources/resource.properties b/junction/src/test/resources/resource.properties
index d63334b..840728c 100644
--- a/junction/src/test/resources/resource.properties
+++ b/junction/src/test/resources/resource.properties
@@ -2,6 +2,7 @@ killbill.catalog.uri=file:src/test/resources/catalogSample.xml
killbill.entitlement.dao.claim.time=60000
killbill.entitlement.dao.ready.max=1
killbill.entitlement.engine.notifications.sleep=500
+killbill.billing.util.persistent.bus.prefetch=1
user.timezone=UTC
diff --git a/overdue/src/test/resources/resource.properties b/overdue/src/test/resources/resource.properties
index d63334b..840728c 100644
--- a/overdue/src/test/resources/resource.properties
+++ b/overdue/src/test/resources/resource.properties
@@ -2,6 +2,7 @@ killbill.catalog.uri=file:src/test/resources/catalogSample.xml
killbill.entitlement.dao.claim.time=60000
killbill.entitlement.dao.ready.max=1
killbill.entitlement.engine.notifications.sleep=500
+killbill.billing.util.persistent.bus.prefetch=1
user.timezone=UTC
diff --git a/payment/src/test/resources/payment.properties b/payment/src/test/resources/payment.properties
index 8a11d90..ae3d222 100644
--- a/payment/src/test/resources/payment.properties
+++ b/payment/src/test/resources/payment.properties
@@ -1,4 +1,4 @@
killbill.payment.failure.retry.start.sec=3600
killbill.payment.failure.retry.multiplier=1
killbill.payment.failure.retry.max.attempts=3
-
+killbill.billing.util.persistent.bus.prefetch=1
diff --git a/server/src/test/resources/killbill.properties b/server/src/test/resources/killbill.properties
index bbbb8c1..73a833d 100644
--- a/server/src/test/resources/killbill.properties
+++ b/server/src/test/resources/killbill.properties
@@ -32,6 +32,7 @@ killbill.invoice.engine.notifications.sleep=100
killbill.entitlement.engine.notifications.sleep=100
killbill.billing.util.persistent.bus.sleep=100
killbill.billing.util.persistent.bus.nbThreads=1
+killbill.billing.util.persistent.bus.prefetch=1
# Local DB
#com.ning.billing.dbi.test.useLocalDb=true
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
index 3203c11..6f2410f 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
@@ -19,6 +19,7 @@ package com.ning.billing.util.bus.dao;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
+import java.util.List;
import java.util.UUID;
import org.joda.time.DateTime;
@@ -46,10 +47,10 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
@SqlQuery
@Mapper(PersistentBusSqlMapper.class)
- public BusEventEntry getNextBusEventEntry(@Bind("max") int max,
- @Bind("owner") String owner,
- @Bind("now") Date now,
- @BindBean final InternalTenantContext context);
+ public List<BusEventEntry> getNextBusEventEntries(@Bind("max") int max,
+ @Bind("owner") String owner,
+ @Bind("now") Date now,
+ @BindBean final InternalTenantContext context);
@SqlUpdate
public int claimBusEvent(@Bind("owner") String owner,
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
index 2a3822b..c2a8ddd 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
@@ -24,6 +24,11 @@ import com.ning.billing.util.config.PersistentQueueConfig;
public interface PersistentBusConfig extends PersistentQueueConfig {
+ @Config("killbill.billing.util.persistent.bus.prefetch")
+ @Default("5")
+ @Description("Number of bus events to fetch from the database at once")
+ public int getPrefetchAmount();
+
@Override
@Config("killbill.billing.util.persistent.bus.sleep")
@Default("3000")
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
index cc8d4b0..49b4f89 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
@@ -16,8 +16,8 @@
package com.ning.billing.util.bus;
-import java.util.Collections;
import java.util.Date;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -49,7 +49,6 @@ import com.google.inject.Inject;
public class PersistentInternalBus extends PersistentQueueBase implements InternalBus {
private static final long DELTA_IN_PROCESSING_TIME_MS = 1000L * 60L * 5L; // 5 minutes
- private static final int MAX_BUS_EVENTS = 1;
private static final Logger log = LoggerFactory.getLogger(PersistentInternalBus.class);
@@ -145,19 +144,18 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
final Date now = clock.getUTCNow().toDate();
final Date nextAvailable = clock.getUTCNow().plus(DELTA_IN_PROCESSING_TIME_MS).toDate();
- final BusEventEntry input = dao.getNextBusEventEntry(MAX_BUS_EVENTS, hostname, now, context);
- if (input == null) {
- return Collections.emptyList();
- }
-
- // We need to re-hydrate the context with the record ids from the BusEventEntry
- final InternalCallContext rehydratedContext = internalCallContextFactory.createInternalCallContext(input.getTenantRecordId(), input.getAccountRecordId(), context);
- final boolean claimed = (dao.claimBusEvent(hostname, nextAvailable, input.getId(), now, rehydratedContext) == 1);
- if (claimed) {
- dao.insertClaimedHistory(hostname, now, input.getId(), rehydratedContext);
- return Collections.singletonList(input);
+ final List<BusEventEntry> entries = dao.getNextBusEventEntries(config.getPrefetchAmount(), hostname, now, context);
+ final List<BusEventEntry> claimedEntries = new LinkedList<BusEventEntry>();
+ for (final BusEventEntry entry : entries) {
+ // We need to re-hydrate the context with the record ids from the BusEventEntry
+ final InternalCallContext rehydratedContext = internalCallContextFactory.createInternalCallContext(entry.getTenantRecordId(), entry.getAccountRecordId(), context);
+ final boolean claimed = (dao.claimBusEvent(hostname, nextAvailable, entry.getId(), now, rehydratedContext) == 1);
+ if (claimed) {
+ dao.insertClaimedHistory(hostname, now, entry.getId(), rehydratedContext);
+ claimedEntries.add(entry);
+ }
}
- return Collections.emptyList();
+ return claimedEntries;
}
@Override
diff --git a/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java b/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java
index a52ee01..2753e7a 100644
--- a/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java
@@ -18,7 +18,11 @@ package com.ning.billing.util.config;
public interface PersistentQueueConfig {
+ public int getPrefetchAmount();
+
public long getSleepTimeMs();
public boolean isProcessingOff();
+
+ public int getNbThreads();
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java
index dfc8678..b92b974 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java
@@ -40,4 +40,9 @@ public interface NotificationQueueConfig extends PersistentQueueConfig {
@Default("false")
@Description("Whether to turn off the notification queue")
public boolean isProcessingOff();
+
+ @Config("killbill.billing.util.notificationq.notification.nbThreads")
+ @Default("1")
+ @Description("Number of threads to use")
+ public int getNbThreads();
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
index 6899ea7..3e69cd8 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
@@ -25,7 +25,6 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
@@ -34,7 +33,6 @@ import org.joda.time.DateTime;
import org.skife.jdbi.v2.IDBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.weakref.jmx.Managed;
import com.ning.billing.util.Hostname;
import com.ning.billing.util.callcontext.CallOrigin;
@@ -58,7 +56,6 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
public static final int CLAIM_TIME_MS = (5 * 60 * 1000); // 5 minutes
private static final String NOTIFICATION_THREAD_NAME = "Notification-queue-dispatch";
- private static final int NB_THREADS = 1;
private final NotificationQueueConfig config;
private final String hostname;
@@ -92,7 +89,7 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
});
return th;
}
- }), NB_THREADS, config);
+ }), config.getNbThreads(), config);
this.clock = clock;
this.config = config;
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index 247cf8f..1e62466 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -37,7 +37,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
private final int nbThreads;
private final Executor executor;
private final String svcQName;
- private final PersistentQueueConfig config;
+ protected final PersistentQueueConfig config;
private boolean isProcessingEvents;
private int curActiveThreads;
diff --git a/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
index 18ec252..7fbd5d7 100644
--- a/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
@@ -3,7 +3,7 @@ group PersistentBusSqlDao;
CHECK_TENANT() ::= "tenant_record_id = :tenantRecordId"
AND_CHECK_TENANT() ::= "AND <CHECK_TENANT()>"
-getNextBusEventEntry() ::= <<
+getNextBusEventEntries() ::= <<
select
record_id
, class_name