killbill-memoizeit
Fix notification queue (so it can be disabled) Rework NotificationQ/Persistent …
5/11/2012 3:26:36 PM
Changes
invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java 18(+6 -12)
Details
diff --git a/api/src/main/java/com/ning/billing/config/EntitlementConfig.java b/api/src/main/java/com/ning/billing/config/EntitlementConfig.java
index c73530f..12418c6 100644
--- a/api/src/main/java/com/ning/billing/config/EntitlementConfig.java
+++ b/api/src/main/java/com/ning/billing/config/EntitlementConfig.java
@@ -24,19 +24,9 @@ import com.google.common.annotations.VisibleForTesting;
public interface EntitlementConfig extends NotificationConfig, KillbillConfig {
@Override
- @Config("killbill.entitlement.dao.claim.time")
- @Default("60000")
- public long getDaoClaimTimeMs();
-
- @Override
- @Config("killbill.entitlement.dao.ready.max")
- @Default("10")
- public int getDaoMaxReadyEvents();
-
- @Override
@Config("killbill.entitlement.engine.notifications.sleep")
@Default("500")
- public long getNotificationSleepTimeMs();
+ public long getSleepTimeMs();
@Override
@Config("killbill.notifications.off")
diff --git a/api/src/main/java/com/ning/billing/config/InvoiceConfig.java b/api/src/main/java/com/ning/billing/config/InvoiceConfig.java
index 18972eb..407f4d3 100644
--- a/api/src/main/java/com/ning/billing/config/InvoiceConfig.java
+++ b/api/src/main/java/com/ning/billing/config/InvoiceConfig.java
@@ -21,22 +21,12 @@ import org.skife.config.Default;
public interface InvoiceConfig extends NotificationConfig, KillbillConfig {
- @Override
- @Config("killbill.invoice.dao.claim.time")
- @Default("60000")
- public long getDaoClaimTimeMs();
-
- @Override
- @Config("killbill.invoice.dao.ready.max")
- @Default("10")
- public int getDaoMaxReadyEvents();
-
- @Override
+ @Override
@Config("killbill.invoice.engine.notifications.sleep")
@Default("500")
- public long getNotificationSleepTimeMs();
+ public long getSleepTimeMs();
- @Override
+ @Override
@Config("killbill.notifications.off")
@Default("false")
public boolean isNotificationProcessingOff();
diff --git a/api/src/main/java/com/ning/billing/config/NotificationConfig.java b/api/src/main/java/com/ning/billing/config/NotificationConfig.java
index 82f68b6..630a2d6 100644
--- a/api/src/main/java/com/ning/billing/config/NotificationConfig.java
+++ b/api/src/main/java/com/ning/billing/config/NotificationConfig.java
@@ -17,13 +17,6 @@
package com.ning.billing.config;
-public interface NotificationConfig {
-
- public long getDaoClaimTimeMs();
-
- public int getDaoMaxReadyEvents();
-
- public long getNotificationSleepTimeMs();
-
+public interface NotificationConfig extends PersistentQueueConfig {
public boolean isNotificationProcessingOff();
}
diff --git a/api/src/main/java/com/ning/billing/config/PaymentConfig.java b/api/src/main/java/com/ning/billing/config/PaymentConfig.java
index 599232c..f606039 100644
--- a/api/src/main/java/com/ning/billing/config/PaymentConfig.java
+++ b/api/src/main/java/com/ning/billing/config/PaymentConfig.java
@@ -35,19 +35,9 @@ public interface PaymentConfig extends NotificationConfig, KillbillConfig {
public List<Integer> getPaymentRetryDays();
@Override
- @Config("killbill.payment.dao.claim.time")
- @Default("60000")
- public long getDaoClaimTimeMs();
-
- @Override
- @Config("killbill.payment.dao.ready.max")
- @Default("10")
- public int getDaoMaxReadyEvents();
-
- @Override
@Config("killbill.payment.engine.notifications.sleep")
@Default("500")
- public long getNotificationSleepTimeMs();
+ public long getSleepTimeMs();
@Override
@Config("killbill.payment.engine.events.off")
diff --git a/api/src/main/java/com/ning/billing/config/PersistentQueueConfig.java b/api/src/main/java/com/ning/billing/config/PersistentQueueConfig.java
new file mode 100644
index 0000000..c7e1515
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/config/PersistentQueueConfig.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.ning.billing.config;
+
+public interface PersistentQueueConfig {
+ public long getSleepTimeMs();
+}
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java
index 579f47c..44e7297 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java
@@ -85,7 +85,7 @@ public class TestIntegrationBase implements TestListenerStatus {
protected static final Logger log = LoggerFactory.getLogger(TestIntegration.class);
protected static long AT_LEAST_ONE_MONTH_MS = 31L * 24L * 3600L * 1000L;
- protected static final long DELAY = 10000;
+ protected static final long DELAY = 5000;
@Inject
protected IDBI dbi;
diff --git a/beatrix/src/test/resources/resource.properties b/beatrix/src/test/resources/resource.properties
index d63334b..a2c4ec1 100644
--- a/beatrix/src/test/resources/resource.properties
+++ b/beatrix/src/test/resources/resource.properties
@@ -1,7 +1,11 @@
killbill.catalog.uri=file:src/test/resources/catalogSample.xml
killbill.entitlement.dao.claim.time=60000
killbill.entitlement.dao.ready.max=1
-killbill.entitlement.engine.notifications.sleep=500
+killbill.payment.engine.notifications.sleep=100
+killbill.invoice.engine.notifications.sleep=100
+killbill.entitlement.engine.notifications.sleep=100
+killbill.billing.util.persistent.bus.sleep=100
+killbill.billing.util.persistent.bus.nbThreads=1
user.timezone=UTC
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index c8f82a2..749bf2c 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -135,21 +135,15 @@ public class Engine implements EventListener, EntitlementService {
}
},
new NotificationConfig() {
+
@Override
- public boolean isNotificationProcessingOff() {
- return config.isNotificationProcessingOff();
- }
- @Override
- public long getNotificationSleepTimeMs() {
- return config.getNotificationSleepTimeMs();
- }
- @Override
- public int getDaoMaxReadyEvents() {
- return config.getDaoMaxReadyEvents();
+ public long getSleepTimeMs() {
+ return config.getSleepTimeMs();
}
+
@Override
- public long getDaoClaimTimeMs() {
- return config.getDaoClaimTimeMs();
+ public boolean isNotificationProcessingOff() {
+ return config.isNotificationProcessingOff();
}
});
} catch (NotificationQueueAlreadyExists e) {
diff --git a/entitlement/src/test/resources/entitlement.properties b/entitlement/src/test/resources/entitlement.properties
index af1c3fc..58f4f95 100644
--- a/entitlement/src/test/resources/entitlement.properties
+++ b/entitlement/src/test/resources/entitlement.properties
@@ -1,6 +1,8 @@
killbill.catalog.uri=file:src/test/resources/testInput.xml
killbill.entitlement.dao.claim.time=60000
killbill.entitlement.dao.ready.max=1
-killbill.entitlement.engine.notifications.sleep=500
+killbill.entitlement.engine.notifications.sleep=100
+killbill.billing.util.persistent.bus.sleep=100
+killbill.billing.util.persistent.bus.nbThreads=1
user.timezone=UTC
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 28d15a8..d61e206 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -86,21 +86,15 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier
}
},
new NotificationConfig() {
+
@Override
- public boolean isNotificationProcessingOff() {
- return config.isNotificationProcessingOff();
- }
- @Override
- public long getNotificationSleepTimeMs() {
- return config.getNotificationSleepTimeMs();
+ public long getSleepTimeMs() {
+ return config.getSleepTimeMs();
}
+
@Override
- public int getDaoMaxReadyEvents() {
- return config.getDaoMaxReadyEvents();
- }
- @Override
- public long getDaoClaimTimeMs() {
- return config.getDaoClaimTimeMs();
+ public boolean isNotificationProcessingOff() {
+ return config.isNotificationProcessingOff();
}
});
} catch (NotificationQueueAlreadyExists e) {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
index 08354db..91dbf7d 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
@@ -55,11 +55,7 @@ public abstract class InvoiceDaoTestBase extends InvoicingTestBase {
private final InvoiceConfig invoiceConfig = new InvoiceConfig() {
@Override
- public long getDaoClaimTimeMs() {throw new UnsupportedOperationException();}
- @Override
- public int getDaoMaxReadyEvents() {throw new UnsupportedOperationException();}
- @Override
- public long getNotificationSleepTimeMs() {throw new UnsupportedOperationException();}
+ public long getSleepTimeMs() {throw new UnsupportedOperationException();}
@Override
public boolean isNotificationProcessingOff() {throw new UnsupportedOperationException();}
@Override
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java
index 49fca25..960dfd0 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java
@@ -70,17 +70,7 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
Clock clock = new DefaultClock();
InvoiceConfig invoiceConfig = new InvoiceConfig() {
@Override
- public long getDaoClaimTimeMs() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getDaoMaxReadyEvents() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getNotificationSleepTimeMs() {
+ public long getSleepTimeMs() {
throw new UnsupportedOperationException();
}
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
index 06acac9..5e3e8e3 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
@@ -75,16 +75,8 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
return config.isNotificationProcessingOff();
}
@Override
- public long getNotificationSleepTimeMs() {
- return config.getNotificationSleepTimeMs();
- }
- @Override
- public int getDaoMaxReadyEvents() {
- return config.getDaoMaxReadyEvents();
- }
- @Override
- public long getDaoClaimTimeMs() {
- return config.getDaoClaimTimeMs();
+ public long getSleepTimeMs() {
+ return config.getSleepTimeMs();
}
});
} catch (NotificationQueueAlreadyExists e) {
diff --git a/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java b/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java
index 267eeb0..b4f4d0e 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java
@@ -26,19 +26,9 @@ import com.ning.billing.config.NotificationConfig;
public interface OverdueProperties extends NotificationConfig, KillbillConfig {
@Override
- @Config("killbill.overdue.dao.claim.time")
- @Default("60000")
- public long getDaoClaimTimeMs();
-
- @Override
- @Config("killbill.overdue.dao.ready.max")
- @Default("10")
- public int getDaoMaxReadyEvents();
-
- @Override
@Config("killbill.overdue.engine.notifications.sleep")
@Default("500")
- public long getNotificationSleepTimeMs();
+ public long getSleepTimeMs();
@Override
@Config("killbill.notifications.off")
diff --git a/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java b/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java
index cf9eddd..ce664b3 100644
--- a/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java
+++ b/server/src/test/java/com/ning/billing/jaxrs/TestJaxrsBase.java
@@ -491,7 +491,7 @@ public class TestJaxrsBase {
* but until we have a strong need for it, this is in the TODO list...
*/
protected void crappyWaitForLackOfProperSynchonization() throws Exception {
- Thread.sleep(7000);
+ Thread.sleep(5000);
}
}
diff --git a/server/src/test/resources/killbill.properties b/server/src/test/resources/killbill.properties
index 3463266..9aa3e66 100644
--- a/server/src/test/resources/killbill.properties
+++ b/server/src/test/resources/killbill.properties
@@ -10,6 +10,11 @@ user.timezone=UTC
com.ning.core.server.jetty.logPath=/var/tmp/.logs
+killbill.payment.engine.notifications.sleep=100
+killbill.invoice.engine.notifications.sleep=100
+killbill.entitlement.engine.notifications.sleep=100
+killbill.billing.util.persistent.bus.sleep=100
+killbill.billing.util.persistent.bus.nbThreads=1
# Local DB
#com.ning.billing.dbi.test.useLocalDb=true
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java b/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
index b9c6faa..8e98211 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
@@ -17,19 +17,19 @@ package com.ning.billing.util.bus.dao;
import org.joda.time.DateTime;
-import com.ning.billing.util.notificationq.NotificationLifecycle;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle;
-public class BusEventEntry implements NotificationLifecycle {
+public class BusEventEntry implements PersistentQueueEntryLifecycle {
private final long id;
private final String owner;
private final String createdOwner;
private final DateTime nextAvailable;
- private final NotificationLifecycleState processingState;
+ private final PersistentQueueEntryLifecycleState processingState;
private final String busEventClass;
private final String busEventJson;
- public BusEventEntry(final long id, final String createdOwner, final String owner, final DateTime nextAvailable, NotificationLifecycleState processingState, final String busEventClass, final String busEventJson) {
+ public BusEventEntry(final long id, final String createdOwner, final String owner, final DateTime nextAvailable, PersistentQueueEntryLifecycleState processingState, final String busEventClass, final String busEventJson) {
this.id = id;
this.createdOwner = createdOwner;
this.owner = owner;
@@ -73,7 +73,7 @@ public class BusEventEntry implements NotificationLifecycle {
}
@Override
- public NotificationLifecycleState getProcessingState() {
+ public PersistentQueueEntryLifecycleState getProcessingState() {
return processingState;
}
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
index f8007de..3c32f85 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
@@ -34,7 +34,7 @@ import org.skife.jdbi.v2.tweak.ResultSetMapper;
import com.ning.billing.util.dao.BinderBase;
import com.ning.billing.util.dao.MapperBase;
-import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
@ExternalizedSqlViaStringTemplate3()
public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>, CloseMe {
@@ -70,7 +70,7 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
stmt.bind("creating_owner", evt.getCreatedOwner());
stmt.bind("processing_available_dt", getDate(evt.getNextAvailableDate()));
stmt.bind("processing_owner", evt.getOwner());
- stmt.bind("processing_state", NotificationLifecycleState.AVAILABLE.toString());
+ stmt.bind("processing_state", PersistentQueueEntryLifecycleState.AVAILABLE.toString());
}
}
@@ -86,7 +86,7 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
final String eventJson = r.getString("event_json");
final DateTime nextAvailableDate = getDate(r, "processing_available_dt");
final String processingOwner = r.getString("processing_owner");
- final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
+ final PersistentQueueEntryLifecycleState processingState = PersistentQueueEntryLifecycleState.valueOf(r.getString("processing_state"));
return new BusEventEntry(id, createdOwner, processingOwner, nextAvailableDate, processingState, className, eventJson);
}
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
index bfce245..92b35d7 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
@@ -33,19 +33,19 @@ import org.slf4j.LoggerFactory;
import com.google.common.eventbus.EventBus;
import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import com.ning.billing.config.PersistentQueueConfig;
import com.ning.billing.util.Hostname;
import com.ning.billing.util.bus.dao.BusEventEntry;
import com.ning.billing.util.bus.dao.PersistentBusSqlDao;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.glue.BusModule;
import com.ning.billing.util.queue.PersistentQueueBase;
public class PersistentBus extends PersistentQueueBase implements Bus {
- private final static int NB_BUS_THREADS = 1;
- private final static long TIMEOUT_MSEC = 15L * 1000L; // 15 sec
private final static long DELTA_IN_PROCESSING_TIME_MS = 1000L * 60L * 5L; // 5 minutes
- private final static long SLEEP_TIME_MS = 1000; // 1 sec
private final static int MAX_BUS_EVENTS = 1;
private static final Logger log = LoggerFactory.getLogger(PersistentBus.class);
@@ -80,15 +80,16 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
}
@Inject
- public PersistentBus(final IDBI dbi, final Clock clock) {
- super("Bus", Executors.newFixedThreadPool(NB_BUS_THREADS, new ThreadFactory() {
+ public PersistentBus(final IDBI dbi, final Clock clock, final PersistentBusConfig config) {
+
+ super("Bus", Executors.newFixedThreadPool(config.getNbThreads(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(new ThreadGroup(DefaultBusService.EVENT_BUS_GROUP_NAME),
r,
DefaultBusService.EVENT_BUS_TH_NAME);
}
- }), NB_BUS_THREADS, TIMEOUT_MSEC, SLEEP_TIME_MS);
+ }), config.getNbThreads(), config);
this.dao = dbi.onDemand(PersistentBusSqlDao.class);
this.clock = clock;
this.objectMapper = new ObjectMapper();
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
new file mode 100644
index 0000000..a2e9879
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.ning.billing.util.bus;
+
+import org.skife.config.Config;
+import org.skife.config.Default;
+
+import com.ning.billing.config.PersistentQueueConfig;
+
+public interface PersistentBusConfig extends PersistentQueueConfig {
+
+ @Override
+ @Config("killbill.billing.util.persistent.bus.sleep")
+ @Default("500")
+ public long getSleepTimeMs();
+
+ @Config("killbill.billing.util.persistent.bus.nbThreads")
+ @Default("3")
+ public int getNbThreads();
+}
diff --git a/util/src/main/java/com/ning/billing/util/glue/BusModule.java b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
index 656253d..2793208 100644
--- a/util/src/main/java/com/ning/billing/util/glue/BusModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
@@ -16,10 +16,16 @@
package com.ning.billing.util.glue;
+import org.skife.config.ConfigurationObjectFactory;
+
import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+import com.ning.billing.config.EntitlementConfig;
+import com.ning.billing.config.PersistentQueueConfig;
import com.ning.billing.util.bus.DefaultBusService;
import com.ning.billing.util.bus.Bus;
import com.ning.billing.util.bus.BusService;
+import com.ning.billing.util.bus.PersistentBusConfig;
import com.ning.billing.util.bus.InMemoryBus;
import com.ning.billing.util.bus.PersistentBus;
@@ -58,9 +64,14 @@ public class BusModule extends AbstractModule {
}
+ protected void configurePersistentBusConfig() {
+ final PersistentBusConfig config = new ConfigurationObjectFactory(System.getProperties()).build(PersistentBusConfig.class);
+ bind(PersistentBusConfig.class).toInstance(config);
+ }
+
private void configurePersistentEventBus() {
+ configurePersistentBusConfig();
bind(Bus.class).to(PersistentBus.class).asEagerSingleton();
-
}
private void configureInMemoryEventBus() {
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index f943d0b..01a9363 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -39,7 +39,7 @@ import org.skife.jdbi.v2.tweak.ResultSetMapper;
import com.ning.billing.util.notificationq.DefaultNotification;
import com.ning.billing.util.notificationq.Notification;
-import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
@ExternalizedSqlViaStringTemplate3()
public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, CloseMe {
@@ -77,7 +77,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
stmt.bind("queue_name", evt.getQueueName());
stmt.bind("processing_available_dt", getDate(evt.getNextAvailableDate()));
stmt.bind("processing_owner", evt.getOwner());
- stmt.bind("processing_state", NotificationLifecycleState.AVAILABLE.toString());
+ stmt.bind("processing_state", PersistentQueueEntryLifecycleState.AVAILABLE.toString());
}
}
@@ -95,7 +95,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
final DateTime effectiveDate = getDate(r, "effective_dt");
final DateTime nextAvailableDate = getDate(r, "processing_available_dt");
final String processingOwner = r.getString("processing_owner");
- final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
+ final PersistentQueueEntryLifecycleState processingState = PersistentQueueEntryLifecycleState.valueOf(r.getString("processing_state"));
return new DefaultNotification(id, uuid, createdOwner, processingOwner, queueName, nextAvailableDate,
processingState, notificationKey, effectiveDate);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index 9d2c601..4c42c73 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -28,13 +28,13 @@ public class DefaultNotification implements Notification {
private final String createdOwner;
private final String queueName;
private final DateTime nextAvailableDate;
- private final NotificationLifecycleState lifecycleState;
+ private final PersistentQueueEntryLifecycleState lifecycleState;
private final String notificationKey;
private final DateTime effectiveDate;
public DefaultNotification(long id, UUID uuid, String createdOwner, String owner, String queueName, DateTime nextAvailableDate,
- NotificationLifecycleState lifecycleState,
+ PersistentQueueEntryLifecycleState lifecycleState,
String notificationKey, DateTime effectiveDate) {
super();
this.id = id;
@@ -54,7 +54,7 @@ public class DefaultNotification implements Notification {
}
public DefaultNotification(String queueName, String createdOwner, String notificationKey, DateTime effectiveDate) {
- this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
+ this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, PersistentQueueEntryLifecycleState.AVAILABLE, notificationKey, effectiveDate);
}
@Override
public UUID getUUID() {
@@ -72,7 +72,7 @@ public class DefaultNotification implements Notification {
}
@Override
- public NotificationLifecycleState getProcessingState() {
+ public PersistentQueueEntryLifecycleState getProcessingState() {
return lifecycleState;
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 3db7ce1..4554a43 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -88,9 +88,9 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
private List<Notification> getReadyNotifications() {
final Date now = clock.getUTCNow().toDate();
- final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
+ final Date nextAvailable = clock.getUTCNow().plus(CLAIM_TIME_MS).toDate();
- List<Notification> input = dao.getReadyNotifications(now, hostname, config.getDaoMaxReadyEvents(), getFullQName());
+ List<Notification> input = dao.getReadyNotifications(now, hostname, CLAIM_TIME_MS, getFullQName());
List<Notification> claimedNotifications = new ArrayList<Notification>();
for (Notification cur : input) {
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
index 0b60c2d..f4bbae0 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
@@ -20,8 +20,10 @@ import java.util.UUID;
import org.joda.time.DateTime;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle;
-public interface Notification extends NotificationLifecycle {
+
+public interface Notification extends PersistentQueueEntryLifecycle {
public long getId();
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index 7c82db6..cd3c0c2 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -35,11 +35,15 @@ public abstract class NotificationQueueBase extends PersistentQueueBase implemen
protected final static Logger log = LoggerFactory.getLogger(NotificationQueueBase.class);
- protected static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
- protected static final long STOP_WAIT_TIMEOUT_MS = 60000;
-
- protected final String svcName;
- protected final String queueName;
+ public final static int CLAIM_TIME_MS = (5 * 60 * 1000); // 5 minutes
+
+ private final static String NOTIFICATION_THREAD_PREFIX = "Notification-";
+ private final static int NB_THREADS = 1;
+
+
+ private final String svcName;
+ private final String queueName;
+
protected final NotificationQueueHandler handler;
protected final NotificationConfig config;
@@ -64,7 +68,7 @@ public abstract class NotificationQueueBase extends PersistentQueueBase implemen
});
return th;
}
- }), 1, STOP_WAIT_TIMEOUT_MS, config.getNotificationSleepTimeMs());
+ }), NB_THREADS, config);
this.clock = clock;
this.svcName = svcName;
@@ -75,6 +79,21 @@ public abstract class NotificationQueueBase extends PersistentQueueBase implemen
this.nbProcessedEvents = new AtomicLong();
}
+ @Override
+ public void startQueue() {
+ if (config.isNotificationProcessingOff()) {
+ return;
+ }
+ super.startQueue();
+ }
+
+ @Override
+ public void stopQueue() {
+ if (config.isNotificationProcessingOff()) {
+ return;
+ }
+ super.stopQueue();
+ }
@Override
public int processReadyNotification() {
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index a6844ad..0ffe882 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -22,26 +22,28 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.ning.billing.config.PersistentQueueConfig;
+
public abstract class PersistentQueueBase implements QueueLifecycle {
private static final Logger log = LoggerFactory.getLogger(PersistentQueueBase.class);
-
+
+ private static final long waitTimeoutMs = 15L * 1000L; // 15 seconds
+
private final int nbThreads;
private final Executor executor;
private final String svcName;
private final long sleepTimeMs;
- private final long waitTimeoutMs;
private boolean isProcessingEvents;
private int curActiveThreads;
- public PersistentQueueBase(final String svcName, final Executor executor, final int nbThreads, final long waitTimeoutMs, final long sleepTimeMs) {
+ public PersistentQueueBase(final String svcName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
this.executor = executor;
this.nbThreads = nbThreads;
this.svcName = svcName;
- this.waitTimeoutMs = waitTimeoutMs;
- this.sleepTimeMs = sleepTimeMs;
+ this.sleepTimeMs = config.getSleepTimeMs();
this.isProcessingEvents = false;
this.curActiveThreads = 0;
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 4702409..74c09f1 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -39,8 +39,8 @@ import com.google.inject.Inject;
import com.ning.billing.dbi.MysqlTestingHelper;
import com.ning.billing.util.notificationq.DefaultNotification;
import com.ning.billing.util.notificationq.Notification;
-import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
import com.ning.billing.util.notificationq.dao.NotificationSqlDao.NotificationSqlMapper;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -117,7 +117,7 @@ public class TestNotificationSqlDao {
assertEquals(notification.getNotificationKey(), notificationKey);
validateDate(notification.getEffectiveDate(), effDt);
assertEquals(notification.getOwner(), null);
- assertEquals(notification.getProcessingState(), NotificationLifecycleState.AVAILABLE);
+ assertEquals(notification.getProcessingState(), PersistentQueueEntryLifecycleState.AVAILABLE);
assertEquals(notification.getNextAvailableDate(), null);
DateTime nextAvailable = now.plusMinutes(5);
@@ -129,7 +129,7 @@ public class TestNotificationSqlDao {
assertEquals(notification.getNotificationKey(), notificationKey);
validateDate(notification.getEffectiveDate(), effDt);
assertEquals(notification.getOwner().toString(), ownerId);
- assertEquals(notification.getProcessingState(), NotificationLifecycleState.IN_PROCESSING);
+ assertEquals(notification.getProcessingState(), PersistentQueueEntryLifecycleState.IN_PROCESSING);
validateDate(notification.getNextAvailableDate(), nextAvailable);
dao.clearNotification(notification.getId(), ownerId);
@@ -138,7 +138,7 @@ public class TestNotificationSqlDao {
assertEquals(notification.getNotificationKey(), notificationKey);
validateDate(notification.getEffectiveDate(), effDt);
//assertEquals(notification.getOwner(), null);
- assertEquals(notification.getProcessingState(), NotificationLifecycleState.PROCESSED);
+ assertEquals(notification.getProcessingState(), PersistentQueueEntryLifecycleState.PROCESSED);
validateDate(notification.getNextAvailableDate(), nextAvailable);
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 86ec0ea..4052315 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -28,8 +28,8 @@ import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import com.ning.billing.config.NotificationConfig;
import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
private final TreeSet<Notification> notifications;
@@ -67,7 +67,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
List<Notification> result = new ArrayList<Notification>();
for (Notification notification : notifications) {
- if (notification.getProcessingState() == NotificationLifecycleState.AVAILABLE) {
+ if (notification.getProcessingState() == PersistentQueueEntryLifecycleState.AVAILABLE) {
result.add(notification);
}
}
@@ -96,7 +96,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
result = readyNotifications.size();
for (Notification cur : readyNotifications) {
handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
- DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
+ DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, hostname, "MockQueue", clock.getUTCNow().plus(CLAIM_TIME_MS), PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
oldNotifications.add(cur);
processedNotifications.add(processedNotification);
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 80f4a5c..50f0a68 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -288,17 +288,9 @@ public class TestNotificationQueue {
return false;
}
@Override
- public long getNotificationSleepTimeMs() {
+ public long getSleepTimeMs() {
return 10;
}
- @Override
- public int getDaoMaxReadyEvents() {
- return 1;
- }
- @Override
- public long getDaoClaimTimeMs() {
- return 60000;
- }
};
@@ -397,17 +389,9 @@ public class TestNotificationQueue {
return off;
}
@Override
- public long getNotificationSleepTimeMs() {
+ public long getSleepTimeMs() {
return sleepTime;
}
- @Override
- public int getDaoMaxReadyEvents() {
- return maxReadyEvents;
- }
- @Override
- public long getDaoClaimTimeMs() {
- return claimTimeMs;
- }
};
}