Details
diff --git a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
index c80e303..d3bf41b 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
@@ -77,7 +77,7 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
private NotificationQueueConfig config = new NotificationQueueConfig() {
@Override
- public boolean isNotificationProcessingOff() {
+ public boolean isProcessingOff() {
return false;
}
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java b/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java
index b993df0..b2743c4 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java
@@ -67,12 +67,12 @@ public class DefaultBeatrixService implements BeatrixService {
@LifecycleHandlerType(LifecycleLevel.INIT_BUS)
public void startBus() {
- ((PersistentExternalBus) externalBus).start();
+ ((PersistentExternalBus) externalBus).startQueue();
}
@LifecycleHandlerType(LifecycleLevel.STOP_BUS)
public void stopBus() {
- ((PersistentExternalBus) externalBus).stop();
+ ((PersistentExternalBus) externalBus).stopQueue();
}
}
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
index eddf4b5..697c88a 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
@@ -63,8 +63,6 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
private final InternalCallContextFactory internalCallContextFactory;
private final AccountInternalApi accountApi;
- private volatile boolean isStarted;
-
private static final class EventBusDelegate extends EventBus {
public EventBusDelegate(final String busName) {
@@ -90,16 +88,6 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
this.accountApi = accountApi;
}
- public void start() {
- startQueue();
- isStarted = true;
- }
-
- public void stop() {
- stopQueue();
- isStarted = false;
- }
-
@Override
public int doProcessEvents() {
@@ -123,11 +111,6 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
return result;
}
- @Override
- public boolean isStarted() {
- return isStarted;
- }
-
private final UUID getAccountIdFromRecordId(final Long recordId, final InternalCallContext context) {
try {
final Account account = accountApi.getAccountByRecordId(recordId, context);
diff --git a/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java b/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java
index 71d3511..3e5b5dc 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java
@@ -32,7 +32,7 @@ public interface OverdueProperties extends NotificationConfig, KillbillConfig {
@Override
@Config("killbill.overdue.engine.notifications.off")
@Default("false")
- public boolean isNotificationProcessingOff();
+ public boolean isProcessingOff();
@Config("killbill.overdue.maxNumberOfMonthsInFuture")
@Default("36")
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
index adc7a61..cf27ca6 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
@@ -27,6 +27,11 @@ public interface PersistentBusConfig extends PersistentQueueConfig {
@Default("500")
public long getSleepTimeMs();
+ @Config("killbill.billing.util.persistent.bus.off")
+ @Default("false")
+ @Override
+ public boolean isProcessingOff();
+
@Config("killbill.billing.util.persistent.bus.nbThreads")
@Default("3")
public int getNbThreads();
diff --git a/util/src/main/java/com/ning/billing/util/config/NotificationConfig.java b/util/src/main/java/com/ning/billing/util/config/NotificationConfig.java
index 78181f4..5c0d151 100644
--- a/util/src/main/java/com/ning/billing/util/config/NotificationConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/NotificationConfig.java
@@ -16,7 +16,5 @@
package com.ning.billing.util.config;
-
public interface NotificationConfig extends PersistentQueueConfig {
- public boolean isNotificationProcessingOff();
}
diff --git a/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java b/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java
index 366dc18..a52ee01 100644
--- a/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java
@@ -13,8 +13,12 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
+
package com.ning.billing.util.config;
public interface PersistentQueueConfig {
+
public long getSleepTimeMs();
+
+ public boolean isProcessingOff();
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java
index 512b19d..26ac76a 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java
@@ -31,6 +31,6 @@ public interface NotificationQueueConfig extends NotificationConfig {
@Config("killbill.billing.util.notificationq.notification.off")
@Default("false")
@Override
- public boolean isNotificationProcessingOff();
+ public boolean isProcessingOff();
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
index cd23563..50619e8 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
@@ -62,8 +61,6 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
protected final Clock clock;
protected final Map<String, NotificationQueue> queues;
- private AtomicBoolean isStarted;
-
// Package visibility on purpose
NotificationQueueDispatcher(final Clock clock, final NotificationQueueConfig config, final IDBI dbi, final InternalCallContextFactory internalCallContextFactory) {
super("NotificationQ", Executors.newFixedThreadPool(1, new ThreadFactory() {
@@ -89,25 +86,11 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
this.nbProcessedEvents = new AtomicLong();
this.queues = new TreeMap<String, NotificationQueue>();
-
- this.isStarted = new AtomicBoolean(false);
-
- }
-
- @Override
- public void startQueue() {
- if (config.isNotificationProcessingOff()) {
- return;
- }
- // The first startQueue from any queue will trigger starting the processing for all queues
- if (isStarted.compareAndSet(false, true)) {
- super.startQueue();
- }
}
@Override
public void stopQueue() {
- if (config.isNotificationProcessingOff()) {
+ if (config.isProcessingOff() || !isStarted()) {
return;
}
@@ -115,24 +98,17 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
// (This is not intended to be robust against a system that would stop and start queues at the same time,
// for a a normal shutdown sequence)
//
- if (isStarted.get()) {
- int nbQueueStarted = 0;
- synchronized (queues) {
- for (NotificationQueue cur : queues.values()) {
- if (cur.isStarted()) {
- nbQueueStarted++;
- }
+ int nbQueueStarted = 0;
+ synchronized (queues) {
+ for (final NotificationQueue cur : queues.values()) {
+ if (cur.isStarted()) {
+ nbQueueStarted++;
}
}
- if (nbQueueStarted == 0 && (isStarted.compareAndSet(true, false))) {
- super.stopQueue();
- }
}
- }
-
- @Override
- public boolean isStarted() {
- return isStarted.get();
+ if (nbQueueStarted == 0) {
+ super.stopQueue();
+ }
}
public AtomicLong getNbProcessedEvents() {
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index 2fa9baa..bff0441 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -18,6 +18,7 @@ package com.ning.billing.util.queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,18 +36,19 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
private final int nbThreads;
private final Executor executor;
private final String svcQName;
- private final long sleepTimeMs;
+ private final PersistentQueueConfig config;
private boolean isProcessingEvents;
private int curActiveThreads;
protected final ObjectMapper objectMapper;
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
public PersistentQueueBase(final String svcQName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
this.executor = executor;
this.nbThreads = nbThreads;
this.svcQName = svcQName;
- this.sleepTimeMs = config.getSleepTimeMs();
+ this.config = config;
this.objectMapper = new ObjectMapper();
this.isProcessingEvents = false;
this.curActiveThreads = 0;
@@ -55,6 +57,9 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
@Override
public void startQueue() {
+ if (config.isProcessingOff() || !isStarted.compareAndSet(false, true)) {
+ return;
+ }
isProcessingEvents = true;
curActiveThreads = 0;
@@ -115,7 +120,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
}
private void sleepALittle() throws InterruptedException {
- Thread.sleep(sleepTimeMs);
+ Thread.sleep(config.getSleepTimeMs());
}
});
}
@@ -135,6 +140,10 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
@Override
public void stopQueue() {
+ if (config.isProcessingOff() || !isStarted.compareAndSet(true, false)) {
+ return;
+ }
+
int remaining = 0;
try {
synchronized (this) {
@@ -172,6 +181,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
public abstract int doProcessEvents();
- public abstract boolean isStarted();
-
+ public boolean isStarted() {
+ return isStarted.get();
+ }
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index d12cccf..2ad3a3b 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -415,7 +415,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
static NotificationQueueConfig getNotificationConfig(final boolean off, final long sleepTime) {
return new NotificationQueueConfig() {
@Override
- public boolean isNotificationProcessingOff() {
+ public boolean isProcessingOff() {
return off;
}