killbill-memoizeit

util: share start/stop logic between notification and bus Signed-off-by:

12/5/2012 10:44:34 PM

Details

diff --git a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
index c80e303..d3bf41b 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
@@ -77,7 +77,7 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
 
     private NotificationQueueConfig config = new NotificationQueueConfig() {
         @Override
-        public boolean isNotificationProcessingOff() {
+        public boolean isProcessingOff() {
             return false;
         }
 
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java b/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java
index b993df0..b2743c4 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java
@@ -67,12 +67,12 @@ public class DefaultBeatrixService implements BeatrixService {
 
     @LifecycleHandlerType(LifecycleLevel.INIT_BUS)
     public void startBus() {
-        ((PersistentExternalBus) externalBus).start();
+        ((PersistentExternalBus) externalBus).startQueue();
     }
 
     @LifecycleHandlerType(LifecycleLevel.STOP_BUS)
     public void stopBus() {
-        ((PersistentExternalBus) externalBus).stop();
+        ((PersistentExternalBus) externalBus).stopQueue();
     }
 
 }
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
index eddf4b5..697c88a 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
@@ -63,8 +63,6 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
     private final InternalCallContextFactory internalCallContextFactory;
     private final AccountInternalApi accountApi;
 
-    private volatile boolean isStarted;
-
     private static final class EventBusDelegate extends EventBus {
 
         public EventBusDelegate(final String busName) {
@@ -90,16 +88,6 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
         this.accountApi = accountApi;
     }
 
-    public void start() {
-        startQueue();
-        isStarted = true;
-    }
-
-    public void stop() {
-        stopQueue();
-        isStarted = false;
-    }
-
     @Override
     public int doProcessEvents() {
 
@@ -123,11 +111,6 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
         return result;
     }
 
-    @Override
-    public boolean isStarted() {
-        return isStarted;
-    }
-
     private final UUID getAccountIdFromRecordId(final Long recordId, final InternalCallContext context) {
         try {
             final Account account = accountApi.getAccountByRecordId(recordId, context);
diff --git a/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java b/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java
index 71d3511..3e5b5dc 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/OverdueProperties.java
@@ -32,7 +32,7 @@ public interface OverdueProperties extends NotificationConfig, KillbillConfig {
     @Override
     @Config("killbill.overdue.engine.notifications.off")
     @Default("false")
-    public boolean isNotificationProcessingOff();
+    public boolean isProcessingOff();
 
     @Config("killbill.overdue.maxNumberOfMonthsInFuture")
     @Default("36")
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
index adc7a61..cf27ca6 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java
@@ -27,6 +27,11 @@ public interface PersistentBusConfig extends PersistentQueueConfig {
     @Default("500")
     public long getSleepTimeMs();
 
+    @Config("killbill.billing.util.persistent.bus.off")
+    @Default("false")
+    @Override
+    public boolean isProcessingOff();
+
     @Config("killbill.billing.util.persistent.bus.nbThreads")
     @Default("3")
     public int getNbThreads();
diff --git a/util/src/main/java/com/ning/billing/util/config/NotificationConfig.java b/util/src/main/java/com/ning/billing/util/config/NotificationConfig.java
index 78181f4..5c0d151 100644
--- a/util/src/main/java/com/ning/billing/util/config/NotificationConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/NotificationConfig.java
@@ -16,7 +16,5 @@
 
 package com.ning.billing.util.config;
 
-
 public interface NotificationConfig extends PersistentQueueConfig {
-    public boolean isNotificationProcessingOff();
 }
diff --git a/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java b/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java
index 366dc18..a52ee01 100644
--- a/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java
@@ -13,8 +13,12 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
+
 package com.ning.billing.util.config;
 
 public interface PersistentQueueConfig {
+
     public long getSleepTimeMs();
+
+    public boolean isProcessingOff();
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java
index 512b19d..26ac76a 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java
@@ -31,6 +31,6 @@ public interface NotificationQueueConfig extends NotificationConfig {
     @Config("killbill.billing.util.notificationq.notification.off")
     @Default("false")
     @Override
-    public boolean isNotificationProcessingOff();
+    public boolean isProcessingOff();
 
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
index cd23563..50619e8 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
@@ -62,8 +61,6 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
     protected final Clock clock;
     protected final Map<String, NotificationQueue> queues;
 
-    private AtomicBoolean isStarted;
-
     // Package visibility on purpose
     NotificationQueueDispatcher(final Clock clock, final NotificationQueueConfig config, final IDBI dbi, final InternalCallContextFactory internalCallContextFactory) {
         super("NotificationQ", Executors.newFixedThreadPool(1, new ThreadFactory() {
@@ -89,25 +86,11 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
         this.nbProcessedEvents = new AtomicLong();
 
         this.queues = new TreeMap<String, NotificationQueue>();
-
-        this.isStarted = new AtomicBoolean(false);
-
-    }
-
-    @Override
-    public void startQueue() {
-        if (config.isNotificationProcessingOff()) {
-            return;
-        }
-        // The first startQueue from any queue will trigger starting the processing for all queues
-        if (isStarted.compareAndSet(false, true)) {
-            super.startQueue();
-        }
     }
 
     @Override
     public void stopQueue() {
-        if (config.isNotificationProcessingOff()) {
+        if (config.isProcessingOff() || !isStarted()) {
             return;
         }
 
@@ -115,24 +98,17 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
         // (This is not intended to be robust against a system that would stop and start queues at the same time,
         // for a a normal shutdown sequence)
         //
-        if (isStarted.get()) {
-            int nbQueueStarted = 0;
-            synchronized (queues) {
-                for (NotificationQueue cur : queues.values()) {
-                    if (cur.isStarted()) {
-                        nbQueueStarted++;
-                    }
+        int nbQueueStarted = 0;
+        synchronized (queues) {
+            for (final NotificationQueue cur : queues.values()) {
+                if (cur.isStarted()) {
+                    nbQueueStarted++;
                 }
             }
-            if (nbQueueStarted == 0 && (isStarted.compareAndSet(true, false))) {
-                super.stopQueue();
-            }
         }
-    }
-
-    @Override
-    public boolean isStarted() {
-        return isStarted.get();
+        if (nbQueueStarted == 0) {
+            super.stopQueue();
+        }
     }
 
     public AtomicLong getNbProcessedEvents() {
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index 2fa9baa..bff0441 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -18,6 +18,7 @@ package com.ning.billing.util.queue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,18 +36,19 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
     private final int nbThreads;
     private final Executor executor;
     private final String svcQName;
-    private final long sleepTimeMs;
+    private final PersistentQueueConfig config;
     private boolean isProcessingEvents;
     private int curActiveThreads;
 
     protected final ObjectMapper objectMapper;
 
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
     public PersistentQueueBase(final String svcQName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
         this.executor = executor;
         this.nbThreads = nbThreads;
         this.svcQName = svcQName;
-        this.sleepTimeMs = config.getSleepTimeMs();
+        this.config = config;
         this.objectMapper = new ObjectMapper();        
         this.isProcessingEvents = false;
         this.curActiveThreads = 0;
@@ -55,6 +57,9 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
 
     @Override
     public void startQueue() {
+        if (config.isProcessingOff() || !isStarted.compareAndSet(false, true)) {
+            return;
+        }
 
         isProcessingEvents = true;
         curActiveThreads = 0;
@@ -115,7 +120,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
                 }
 
                 private void sleepALittle() throws InterruptedException {
-                    Thread.sleep(sleepTimeMs);
+                    Thread.sleep(config.getSleepTimeMs());
                 }
             });
         }
@@ -135,6 +140,10 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
 
     @Override
     public void stopQueue() {
+        if (config.isProcessingOff() || !isStarted.compareAndSet(true, false)) {
+            return;
+        }
+
         int remaining = 0;
         try {
             synchronized (this) {
@@ -172,6 +181,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
 
     public abstract int doProcessEvents();
 
-    public abstract boolean isStarted();
-
+    public boolean isStarted() {
+        return isStarted.get();
+    }
 }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index d12cccf..2ad3a3b 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -415,7 +415,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
     static NotificationQueueConfig getNotificationConfig(final boolean off, final long sleepTime) {
         return new NotificationQueueConfig() {
             @Override
-            public boolean isNotificationProcessingOff() {
+            public boolean isProcessingOff() {
                 return off;
             }