killbill-memoizeit

Fix notification queue (so it can be disabled) Rework NotificationQ/Persistent

5/11/2012 3:26:36 PM

Changes

Details

diff --git a/api/src/main/java/com/ning/billing/config/EntitlementConfig.java b/api/src/main/java/com/ning/billing/config/EntitlementConfig.java
index c73530f..12418c6 100644
--- a/api/src/main/java/com/ning/billing/config/EntitlementConfig.java
+++ b/api/src/main/java/com/ning/billing/config/EntitlementConfig.java
@@ -24,19 +24,9 @@ import com.google.common.annotations.VisibleForTesting;
 public interface EntitlementConfig extends NotificationConfig, KillbillConfig  {
 
 	@Override
-    @Config("killbill.entitlement.dao.claim.time")
-    @Default("60000")
-    public long getDaoClaimTimeMs();
-
-	@Override
-    @Config("killbill.entitlement.dao.ready.max")
-    @Default("10")
-    public int getDaoMaxReadyEvents();
-
-	@Override
     @Config("killbill.entitlement.engine.notifications.sleep")
     @Default("500")
-    public long getNotificationSleepTimeMs();
+    public long getSleepTimeMs();    
 
 	@Override
     @Config("killbill.notifications.off")
diff --git a/api/src/main/java/com/ning/billing/config/InvoiceConfig.java b/api/src/main/java/com/ning/billing/config/InvoiceConfig.java
index 18972eb..407f4d3 100644
--- a/api/src/main/java/com/ning/billing/config/InvoiceConfig.java
+++ b/api/src/main/java/com/ning/billing/config/InvoiceConfig.java
@@ -21,22 +21,12 @@ import org.skife.config.Default;
 
 public interface InvoiceConfig extends NotificationConfig, KillbillConfig  {
 
-	@Override
-    @Config("killbill.invoice.dao.claim.time")
-    @Default("60000")
-    public long getDaoClaimTimeMs();
-
-	@Override	
-    @Config("killbill.invoice.dao.ready.max")
-    @Default("10")
-    public int getDaoMaxReadyEvents();
-
-	@Override
+    @Override    
     @Config("killbill.invoice.engine.notifications.sleep")
     @Default("500")
-    public long getNotificationSleepTimeMs();
+    public long getSleepTimeMs();
 
-	@Override
+    @Override
     @Config("killbill.notifications.off")
     @Default("false")
     public boolean isNotificationProcessingOff();
diff --git a/api/src/main/java/com/ning/billing/config/NotificationConfig.java b/api/src/main/java/com/ning/billing/config/NotificationConfig.java
index 82f68b6..630a2d6 100644
--- a/api/src/main/java/com/ning/billing/config/NotificationConfig.java
+++ b/api/src/main/java/com/ning/billing/config/NotificationConfig.java
@@ -17,13 +17,6 @@
 package com.ning.billing.config;
 
 
-public interface NotificationConfig {
-
-    public long getDaoClaimTimeMs();
-
-    public int getDaoMaxReadyEvents();
-
-    public long getNotificationSleepTimeMs();
-
+public interface NotificationConfig extends PersistentQueueConfig {
     public boolean isNotificationProcessingOff();
 }
diff --git a/api/src/main/java/com/ning/billing/config/PaymentConfig.java b/api/src/main/java/com/ning/billing/config/PaymentConfig.java
index 599232c..f606039 100644
--- a/api/src/main/java/com/ning/billing/config/PaymentConfig.java
+++ b/api/src/main/java/com/ning/billing/config/PaymentConfig.java
@@ -35,19 +35,9 @@ public interface PaymentConfig extends NotificationConfig, KillbillConfig  {
     public List<Integer> getPaymentRetryDays();
 
 	@Override
-    @Config("killbill.payment.dao.claim.time")
-    @Default("60000")
-    public long getDaoClaimTimeMs();
-
-	@Override
-    @Config("killbill.payment.dao.ready.max")
-    @Default("10")
-    public int getDaoMaxReadyEvents();
-
-	@Override
     @Config("killbill.payment.engine.notifications.sleep")
     @Default("500")
-    public long getNotificationSleepTimeMs();
+    public long getSleepTimeMs();
 
 	@Override
     @Config("killbill.payment.engine.events.off")
diff --git a/api/src/main/java/com/ning/billing/config/PersistentQueueConfig.java b/api/src/main/java/com/ning/billing/config/PersistentQueueConfig.java
new file mode 100644
index 0000000..c7e1515
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/config/PersistentQueueConfig.java
@@ -0,0 +1,20 @@
+/* 
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.ning.billing.config;
+
+public interface PersistentQueueConfig {
+    public long getSleepTimeMs();
+}
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java
index 579f47c..44e7297 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java
@@ -85,7 +85,7 @@ public class TestIntegrationBase implements TestListenerStatus {
     protected static final Logger log = LoggerFactory.getLogger(TestIntegration.class);
     protected static long AT_LEAST_ONE_MONTH_MS =  31L * 24L * 3600L * 1000L;
 
-    protected static final long DELAY = 10000;
+    protected static final long DELAY = 5000;
 
     @Inject
     protected IDBI dbi;
diff --git a/beatrix/src/test/resources/resource.properties b/beatrix/src/test/resources/resource.properties
index d63334b..a2c4ec1 100644
--- a/beatrix/src/test/resources/resource.properties
+++ b/beatrix/src/test/resources/resource.properties
@@ -1,7 +1,11 @@
 killbill.catalog.uri=file:src/test/resources/catalogSample.xml
 killbill.entitlement.dao.claim.time=60000
 killbill.entitlement.dao.ready.max=1
-killbill.entitlement.engine.notifications.sleep=500
+killbill.payment.engine.notifications.sleep=100
+killbill.invoice.engine.notifications.sleep=100
+killbill.entitlement.engine.notifications.sleep=100
+killbill.billing.util.persistent.bus.sleep=100
+killbill.billing.util.persistent.bus.nbThreads=1
 user.timezone=UTC
 
 
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index c8f82a2..749bf2c 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -135,21 +135,15 @@ public class Engine implements EventListener, EntitlementService {
                 }
             },
             new NotificationConfig() {
+                
                 @Override
-                public boolean isNotificationProcessingOff() {
-                    return config.isNotificationProcessingOff();
-                }
-                @Override
-                public long getNotificationSleepTimeMs() {
-                    return config.getNotificationSleepTimeMs();
-                }
-                @Override
-                public int getDaoMaxReadyEvents() {
-                    return config.getDaoMaxReadyEvents();
+                public long getSleepTimeMs() {
+                    return config.getSleepTimeMs();
                 }
+                
                 @Override
-                public long getDaoClaimTimeMs() {
-                    return config.getDaoClaimTimeMs();
+                public boolean isNotificationProcessingOff() {
+                    return config.isNotificationProcessingOff();
                 }
             });
         } catch (NotificationQueueAlreadyExists e) {
diff --git a/entitlement/src/test/resources/entitlement.properties b/entitlement/src/test/resources/entitlement.properties
index af1c3fc..58f4f95 100644
--- a/entitlement/src/test/resources/entitlement.properties
+++ b/entitlement/src/test/resources/entitlement.properties
@@ -1,6 +1,8 @@
 killbill.catalog.uri=file:src/test/resources/testInput.xml
 killbill.entitlement.dao.claim.time=60000
 killbill.entitlement.dao.ready.max=1
-killbill.entitlement.engine.notifications.sleep=500
+killbill.entitlement.engine.notifications.sleep=100
+killbill.billing.util.persistent.bus.sleep=100
+killbill.billing.util.persistent.bus.nbThreads=1
 user.timezone=UTC
 
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 28d15a8..d61e206 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -86,21 +86,15 @@ public class DefaultNextBillingDateNotifier implements  NextBillingDateNotifier 
                 }
             },
             new NotificationConfig() {
+                
                 @Override
-                public boolean isNotificationProcessingOff() {
-                    return config.isNotificationProcessingOff();
-                }
-                @Override
-                public long getNotificationSleepTimeMs() {
-                    return config.getNotificationSleepTimeMs();
+                public long getSleepTimeMs() {
+                    return config.getSleepTimeMs();
                 }
+                
                 @Override
-                public int getDaoMaxReadyEvents() {
-                    return config.getDaoMaxReadyEvents();
-                }
-                @Override
-                public long getDaoClaimTimeMs() {
-                    return config.getDaoClaimTimeMs();
+                public boolean isNotificationProcessingOff() {
+                    return config.isNotificationProcessingOff();
                 }
             });
         } catch (NotificationQueueAlreadyExists e) {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
index 08354db..91dbf7d 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
@@ -55,11 +55,7 @@ public abstract class InvoiceDaoTestBase extends InvoicingTestBase {
 
     private final InvoiceConfig invoiceConfig = new InvoiceConfig() {
         @Override
-        public long getDaoClaimTimeMs() {throw new UnsupportedOperationException();}
-        @Override
-        public int getDaoMaxReadyEvents() {throw new UnsupportedOperationException();}
-        @Override
-        public long getNotificationSleepTimeMs() {throw new UnsupportedOperationException();}
+        public long getSleepTimeMs() {throw new UnsupportedOperationException();}
         @Override
         public boolean isNotificationProcessingOff() {throw new UnsupportedOperationException();}
         @Override
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java
index 49fca25..960dfd0 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java
@@ -70,17 +70,7 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         Clock clock = new DefaultClock();
         InvoiceConfig invoiceConfig = new InvoiceConfig() {
             @Override
-            public long getDaoClaimTimeMs() {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
-            public int getDaoMaxReadyEvents() {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
-            public long getNotificationSleepTimeMs() {
+            public long getSleepTimeMs() {
                 throw new UnsupportedOperationException();
             }
 
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
index 06acac9..5e3e8e3 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
@@ -75,16 +75,8 @@ public class DefaultOverdueCheckNotifier implements  OverdueCheckNotifier {
                     return config.isNotificationProcessingOff();
                 }
                 @Override
-                public long getNotificationSleepTimeMs() {
-                    return config.getNotificationSleepTimeMs();
-                }
-                @Override
-                public int getDaoMaxReadyEvents() {
-                    return config.getDaoMaxReadyEvents();
-                }
-                @Override
-                public long getDaoClaimTimeMs() {
-                    return config.getDaoClaimTimeMs();
+                public long getSleepTimeMs() {
+                    return config.getSleepTimeMs();
                 }
             });
         } catch (NotificationQueueAlreadyExists e) {
diff --git a/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java b/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java
index 267eeb0..b4f4d0e 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java
@@ -26,19 +26,9 @@ import com.ning.billing.config.NotificationConfig;
 public interface OverdueProperties extends NotificationConfig, KillbillConfig  {
 
     @Override
-    @Config("killbill.overdue.dao.claim.time")
-    @Default("60000")
-    public long getDaoClaimTimeMs();
-
-    @Override   
-    @Config("killbill.overdue.dao.ready.max")
-    @Default("10")
-    public int getDaoMaxReadyEvents();
-
-    @Override
     @Config("killbill.overdue.engine.notifications.sleep")
     @Default("500")
-    public long getNotificationSleepTimeMs();
+    public long getSleepTimeMs();
 
     @Override
     @Config("killbill.notifications.off")
diff --git a/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java b/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java
index cf9eddd..ce664b3 100644
--- a/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java
+++ b/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java
@@ -491,7 +491,7 @@ public class TestJaxrsBase {
      * but until we have a strong need for it, this is in the TODO list...
      */
     protected void crappyWaitForLackOfProperSynchonization() throws Exception {
-        Thread.sleep(7000);
+        Thread.sleep(5000);
     }
 
 }
diff --git a/server/src/test/resources/killbill.properties b/server/src/test/resources/killbill.properties
index 3463266..9aa3e66 100644
--- a/server/src/test/resources/killbill.properties
+++ b/server/src/test/resources/killbill.properties
@@ -10,6 +10,11 @@ user.timezone=UTC
 
 com.ning.core.server.jetty.logPath=/var/tmp/.logs
 
+killbill.payment.engine.notifications.sleep=100
+killbill.invoice.engine.notifications.sleep=100
+killbill.entitlement.engine.notifications.sleep=100
+killbill.billing.util.persistent.bus.sleep=100
+killbill.billing.util.persistent.bus.nbThreads=1
 # Local DB 
 #com.ning.billing.dbi.test.useLocalDb=true
 
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java b/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
index b9c6faa..8e98211 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
@@ -17,19 +17,19 @@ package com.ning.billing.util.bus.dao;
 
 import org.joda.time.DateTime;
 
-import com.ning.billing.util.notificationq.NotificationLifecycle;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle;
 
-public class BusEventEntry implements NotificationLifecycle  {
+public class BusEventEntry implements PersistentQueueEntryLifecycle  {
     
     private final long id;
     private final String owner;
     private final String createdOwner;
     private final DateTime nextAvailable;
-    private final NotificationLifecycleState processingState;
+    private final PersistentQueueEntryLifecycleState processingState;
     private final String busEventClass;
     private final String busEventJson;
 
-    public BusEventEntry(final long id, final String createdOwner, final String owner, final DateTime nextAvailable, NotificationLifecycleState processingState, final String busEventClass, final String busEventJson) {
+    public BusEventEntry(final long id, final String createdOwner, final String owner, final DateTime nextAvailable, PersistentQueueEntryLifecycleState processingState, final String busEventClass, final String busEventJson) {
         this.id = id;
         this.createdOwner = createdOwner;
         this.owner = owner;
@@ -73,7 +73,7 @@ public class BusEventEntry implements NotificationLifecycle  {
     }
 
     @Override
-    public NotificationLifecycleState getProcessingState() {
+    public PersistentQueueEntryLifecycleState getProcessingState() {
         return processingState;
     }
 
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
index f8007de..3c32f85 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
@@ -34,7 +34,7 @@ import org.skife.jdbi.v2.tweak.ResultSetMapper;
 
 import com.ning.billing.util.dao.BinderBase;
 import com.ning.billing.util.dao.MapperBase;
-import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 
 @ExternalizedSqlViaStringTemplate3()
 public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>, CloseMe {
@@ -70,7 +70,7 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
             stmt.bind("creating_owner", evt.getCreatedOwner());
             stmt.bind("processing_available_dt", getDate(evt.getNextAvailableDate()));
             stmt.bind("processing_owner", evt.getOwner());
-            stmt.bind("processing_state", NotificationLifecycleState.AVAILABLE.toString());
+            stmt.bind("processing_state", PersistentQueueEntryLifecycleState.AVAILABLE.toString());
         }
     }
     
@@ -86,7 +86,7 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
             final String eventJson = r.getString("event_json"); 
             final DateTime nextAvailableDate = getDate(r, "processing_available_dt");
             final String processingOwner = r.getString("processing_owner");
-            final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
+            final PersistentQueueEntryLifecycleState processingState = PersistentQueueEntryLifecycleState.valueOf(r.getString("processing_state"));
             
             return new BusEventEntry(id, createdOwner, processingOwner, nextAvailableDate, processingState, className, eventJson);
         }
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
index bfce245..92b35d7 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
@@ -33,19 +33,19 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.eventbus.EventBus;
 import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import com.ning.billing.config.PersistentQueueConfig;
 import com.ning.billing.util.Hostname;
 import com.ning.billing.util.bus.dao.BusEventEntry;
 import com.ning.billing.util.bus.dao.PersistentBusSqlDao;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.glue.BusModule;
 import com.ning.billing.util.queue.PersistentQueueBase;
 
 
 public class PersistentBus extends PersistentQueueBase implements Bus {
 
-    private final static int NB_BUS_THREADS = 1;
-    private final static long TIMEOUT_MSEC = 15L * 1000L; // 15 sec
     private final static long DELTA_IN_PROCESSING_TIME_MS = 1000L * 60L * 5L; // 5 minutes
-    private final static long SLEEP_TIME_MS = 1000; // 1 sec
     private final static int MAX_BUS_EVENTS = 1;
     
     private static final Logger log = LoggerFactory.getLogger(PersistentBus.class);
@@ -80,15 +80,16 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
     }
     
     @Inject
-    public PersistentBus(final IDBI dbi, final Clock clock) {
-        super("Bus", Executors.newFixedThreadPool(NB_BUS_THREADS, new ThreadFactory() {
+    public PersistentBus(final IDBI dbi, final Clock clock, final PersistentBusConfig config) {
+        
+        super("Bus", Executors.newFixedThreadPool(config.getNbThreads(), new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
                 return new Thread(new ThreadGroup(DefaultBusService.EVENT_BUS_GROUP_NAME),
                         r,
                         DefaultBusService.EVENT_BUS_TH_NAME);
             }
-        }), NB_BUS_THREADS, TIMEOUT_MSEC, SLEEP_TIME_MS);
+        }), config.getNbThreads(), config);
         this.dao = dbi.onDemand(PersistentBusSqlDao.class);
         this.clock = clock;
         this.objectMapper = new ObjectMapper();
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
new file mode 100644
index 0000000..a2e9879
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
@@ -0,0 +1,33 @@
+/* 
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.ning.billing.util.bus;
+
+import org.skife.config.Config;
+import org.skife.config.Default;
+
+import com.ning.billing.config.PersistentQueueConfig;
+
+public interface PersistentBusConfig extends PersistentQueueConfig {
+
+    @Override
+    @Config("killbill.billing.util.persistent.bus.sleep")
+    @Default("500")
+    public long getSleepTimeMs();
+    
+    @Config("killbill.billing.util.persistent.bus.nbThreads")
+    @Default("3")
+    public int getNbThreads();
+}
diff --git a/util/src/main/java/com/ning/billing/util/glue/BusModule.java b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
index 656253d..2793208 100644
--- a/util/src/main/java/com/ning/billing/util/glue/BusModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
@@ -16,10 +16,16 @@
 
 package com.ning.billing.util.glue;
 
+import org.skife.config.ConfigurationObjectFactory;
+
 import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+import com.ning.billing.config.EntitlementConfig;
+import com.ning.billing.config.PersistentQueueConfig;
 import com.ning.billing.util.bus.DefaultBusService;
 import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.bus.BusService;
+import com.ning.billing.util.bus.PersistentBusConfig;
 import com.ning.billing.util.bus.InMemoryBus;
 import com.ning.billing.util.bus.PersistentBus;
 
@@ -58,9 +64,14 @@ public class BusModule extends AbstractModule {
         
     }
 
+    protected void configurePersistentBusConfig() {
+        final PersistentBusConfig config = new ConfigurationObjectFactory(System.getProperties()).build(PersistentBusConfig.class);
+        bind(PersistentBusConfig.class).toInstance(config);
+    }
+    
     private void configurePersistentEventBus() {
+        configurePersistentBusConfig();        
         bind(Bus.class).to(PersistentBus.class).asEagerSingleton();
-        
     }
     
     private void configureInMemoryEventBus() {
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index f943d0b..01a9363 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -39,7 +39,7 @@ import org.skife.jdbi.v2.tweak.ResultSetMapper;
 
 import com.ning.billing.util.notificationq.DefaultNotification;
 import com.ning.billing.util.notificationq.Notification;
-import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 
 @ExternalizedSqlViaStringTemplate3()
 public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, CloseMe {
@@ -77,7 +77,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             stmt.bind("queue_name", evt.getQueueName());
             stmt.bind("processing_available_dt", getDate(evt.getNextAvailableDate()));
             stmt.bind("processing_owner", evt.getOwner());
-            stmt.bind("processing_state", NotificationLifecycleState.AVAILABLE.toString());
+            stmt.bind("processing_state", PersistentQueueEntryLifecycleState.AVAILABLE.toString());
         }
     }
 
@@ -95,7 +95,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             final DateTime effectiveDate = getDate(r, "effective_dt");
             final DateTime nextAvailableDate = getDate(r, "processing_available_dt");
             final String processingOwner = r.getString("processing_owner");
-            final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
+            final PersistentQueueEntryLifecycleState processingState = PersistentQueueEntryLifecycleState.valueOf(r.getString("processing_state"));
 
             return new DefaultNotification(id, uuid, createdOwner, processingOwner, queueName, nextAvailableDate,
                     processingState, notificationKey, effectiveDate);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index 9d2c601..4c42c73 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -28,13 +28,13 @@ public class DefaultNotification implements Notification {
     private final String createdOwner;
     private final String queueName;
     private final DateTime nextAvailableDate;
-    private final NotificationLifecycleState lifecycleState;
+    private final PersistentQueueEntryLifecycleState lifecycleState;
     private final String notificationKey;
     private final DateTime effectiveDate;
 
 
     public DefaultNotification(long id, UUID uuid, String createdOwner, String owner, String queueName, DateTime nextAvailableDate,
-            NotificationLifecycleState lifecycleState,
+            PersistentQueueEntryLifecycleState lifecycleState,
             String notificationKey, DateTime effectiveDate) {
         super();
         this.id = id;
@@ -54,7 +54,7 @@ public class DefaultNotification implements Notification {
     }
 
     public DefaultNotification(String queueName, String createdOwner, String notificationKey, DateTime effectiveDate) {
-        this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
+        this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, PersistentQueueEntryLifecycleState.AVAILABLE, notificationKey, effectiveDate);
     }
     @Override
     public UUID getUUID() {
@@ -72,7 +72,7 @@ public class DefaultNotification implements Notification {
     }
 
     @Override
-    public NotificationLifecycleState getProcessingState() {
+    public PersistentQueueEntryLifecycleState getProcessingState() {
         return lifecycleState;
     }
 
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 3db7ce1..4554a43 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -88,9 +88,9 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     private List<Notification> getReadyNotifications() {
 
         final Date now = clock.getUTCNow().toDate();
-        final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
+        final Date nextAvailable = clock.getUTCNow().plus(CLAIM_TIME_MS).toDate();
 
-        List<Notification> input = dao.getReadyNotifications(now, hostname, config.getDaoMaxReadyEvents(), getFullQName());
+        List<Notification> input = dao.getReadyNotifications(now, hostname, CLAIM_TIME_MS, getFullQName());
 
         List<Notification> claimedNotifications = new ArrayList<Notification>();
         for (Notification cur : input) {
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
index 0b60c2d..f4bbae0 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
@@ -20,8 +20,10 @@ import java.util.UUID;
 
 import org.joda.time.DateTime;
 
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle;
 
-public interface Notification extends NotificationLifecycle {
+
+public interface Notification extends PersistentQueueEntryLifecycle {
 
     public long getId();
 
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index 7c82db6..cd3c0c2 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -35,11 +35,15 @@ public abstract class NotificationQueueBase extends PersistentQueueBase implemen
 
     protected final static Logger log = LoggerFactory.getLogger(NotificationQueueBase.class);
 
-    protected static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
-    protected static final long STOP_WAIT_TIMEOUT_MS = 60000;
-
-    protected final String svcName;
-    protected final String queueName;
+    public final static int CLAIM_TIME_MS = (5 * 60 * 1000); // 5 minutes
+    
+    private final static String NOTIFICATION_THREAD_PREFIX = "Notification-";
+    private final static int NB_THREADS = 1;
+
+    
+    private final String svcName;
+    private final String queueName;
+    
     protected final NotificationQueueHandler handler;
     protected final NotificationConfig config;
 
@@ -64,7 +68,7 @@ public abstract class NotificationQueueBase extends PersistentQueueBase implemen
                 });
                 return th;
             }
-        }), 1, STOP_WAIT_TIMEOUT_MS, config.getNotificationSleepTimeMs());
+        }), NB_THREADS, config);
 
         this.clock = clock;
         this.svcName = svcName;
@@ -75,6 +79,21 @@ public abstract class NotificationQueueBase extends PersistentQueueBase implemen
         this.nbProcessedEvents = new AtomicLong();
     }
 
+    @Override
+    public void startQueue() {
+        if (config.isNotificationProcessingOff()) {
+            return;
+        }
+        super.startQueue();
+    }
+    
+    @Override
+    public void stopQueue() {
+        if (config.isNotificationProcessingOff()) {
+            return;
+        }
+        super.stopQueue();
+    }
 
     @Override
     public int processReadyNotification() {
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index a6844ad..0ffe882 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -22,26 +22,28 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.ning.billing.config.PersistentQueueConfig;
+
 
 public abstract class PersistentQueueBase implements QueueLifecycle {
 
     private static final Logger log = LoggerFactory.getLogger(PersistentQueueBase.class);
-
+    
+    private static final long waitTimeoutMs = 15L * 1000L; // 15 seconds
+    
     private final int nbThreads;
     private final Executor executor;
     private final String svcName;
     private final long sleepTimeMs;
-    private final long waitTimeoutMs;
 
     private boolean isProcessingEvents;
     private int curActiveThreads;
     
-    public PersistentQueueBase(final String svcName, final Executor executor, final int nbThreads, final long waitTimeoutMs, final long sleepTimeMs) {
+    public PersistentQueueBase(final String svcName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
         this.executor = executor;
         this.nbThreads = nbThreads;
         this.svcName = svcName;
-        this.waitTimeoutMs = waitTimeoutMs;
-        this.sleepTimeMs = sleepTimeMs;
+        this.sleepTimeMs = config.getSleepTimeMs();
         this.isProcessingEvents = false;
         this.curActiveThreads = 0;
     }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 4702409..74c09f1 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -39,8 +39,8 @@ import com.google.inject.Inject;
 import com.ning.billing.dbi.MysqlTestingHelper;
 import com.ning.billing.util.notificationq.DefaultNotification;
 import com.ning.billing.util.notificationq.Notification;
-import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
 import com.ning.billing.util.notificationq.dao.NotificationSqlDao.NotificationSqlMapper;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
@@ -117,7 +117,7 @@ public class TestNotificationSqlDao {
         assertEquals(notification.getNotificationKey(), notificationKey);
         validateDate(notification.getEffectiveDate(), effDt);
         assertEquals(notification.getOwner(), null);
-        assertEquals(notification.getProcessingState(), NotificationLifecycleState.AVAILABLE);
+        assertEquals(notification.getProcessingState(), PersistentQueueEntryLifecycleState.AVAILABLE);
         assertEquals(notification.getNextAvailableDate(), null);
 
         DateTime nextAvailable = now.plusMinutes(5);
@@ -129,7 +129,7 @@ public class TestNotificationSqlDao {
         assertEquals(notification.getNotificationKey(), notificationKey);
         validateDate(notification.getEffectiveDate(), effDt);
         assertEquals(notification.getOwner().toString(), ownerId);
-        assertEquals(notification.getProcessingState(), NotificationLifecycleState.IN_PROCESSING);
+        assertEquals(notification.getProcessingState(), PersistentQueueEntryLifecycleState.IN_PROCESSING);
         validateDate(notification.getNextAvailableDate(), nextAvailable);
 
         dao.clearNotification(notification.getId(), ownerId);
@@ -138,7 +138,7 @@ public class TestNotificationSqlDao {
         assertEquals(notification.getNotificationKey(), notificationKey);
         validateDate(notification.getEffectiveDate(), effDt);
         //assertEquals(notification.getOwner(), null);
-        assertEquals(notification.getProcessingState(), NotificationLifecycleState.PROCESSED);
+        assertEquals(notification.getProcessingState(), PersistentQueueEntryLifecycleState.PROCESSED);
         validateDate(notification.getNextAvailableDate(), nextAvailable);
 
     }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 86ec0ea..4052315 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -28,8 +28,8 @@ import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
 import com.ning.billing.config.NotificationConfig;
 import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 
 public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
     private final TreeSet<Notification> notifications;
@@ -67,7 +67,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
         List<Notification> result = new ArrayList<Notification>();
 
         for (Notification notification : notifications) {
-            if (notification.getProcessingState() == NotificationLifecycleState.AVAILABLE) {
+            if (notification.getProcessingState() == PersistentQueueEntryLifecycleState.AVAILABLE) {
                 result.add(notification);
             }
         }
@@ -96,7 +96,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
         result = readyNotifications.size();
         for (Notification cur : readyNotifications) {
             handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
-            DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
+            DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, hostname, "MockQueue", clock.getUTCNow().plus(CLAIM_TIME_MS), PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
             oldNotifications.add(cur);
             processedNotifications.add(processedNotification);
         }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 80f4a5c..50f0a68 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -288,17 +288,9 @@ public class TestNotificationQueue {
                 return false;
             }
             @Override
-            public long getNotificationSleepTimeMs() {
+            public long getSleepTimeMs() {
                 return 10;
             }
-            @Override
-            public int getDaoMaxReadyEvents() {
-                return 1;
-            }
-            @Override
-            public long getDaoClaimTimeMs() {
-                return 60000;
-            }
         };
 
 
@@ -397,17 +389,9 @@ public class TestNotificationQueue {
                 return off;
             }
             @Override
-            public long getNotificationSleepTimeMs() {
+            public long getSleepTimeMs() {
                 return sleepTime;
             }
-            @Override
-            public int getDaoMaxReadyEvents() {
-                return maxReadyEvents;
-            }
-            @Override
-            public long getDaoClaimTimeMs() {
-                return claimTimeMs;
-            }
         };
     }