killbill-aplcache

util: remove protected fields in NotificationQueueBase Signed-off-by:

6/28/2012 5:32:41 PM

Details

diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 19923b3..c89f908 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -24,26 +24,27 @@ import java.util.List;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.ning.billing.config.NotificationConfig;
-import com.ning.billing.util.bus.dao.BusEventEntry;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
 public class DefaultNotificationQueue extends NotificationQueueBase {
+    private static final Logger log = LoggerFactory.getLogger(DefaultNotificationQueue.class);
 
-    protected final NotificationSqlDao dao;
-
-    public DefaultNotificationQueue(final IDBI dbi, final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+    private final NotificationSqlDao dao;
 
+    public DefaultNotificationQueue(final IDBI dbi, final Clock clock, final String svcName, final String queueName,
+                                    final NotificationQueueHandler handler, final NotificationConfig config) {
         super(clock, svcName, queueName, handler, config);
         this.dao = dbi.onDemand(NotificationSqlDao.class);
     }
 
     @Override
     public int doProcessEvents() {
-
         logDebug("ENTER doProcessEvents");
         final List<Notification> notifications = getReadyNotifications();
         if (notifications.size() == 0) {
@@ -51,24 +52,22 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
             return 0;
         }
 
-        logDebug("START processing %d events at time %s", notifications.size(), clock.getUTCNow().toDate());
+        logDebug("START processing %d events at time %s", notifications.size(), getClock().getUTCNow().toDate());
 
         int result = 0;
         for (final Notification cur : notifications) {
-            nbProcessedEvents.incrementAndGet();
-            logDebug("handling notification %s, key = %s for time %s",
-                     cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
-            NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey()); 
-            handler.handleReadyNotification(key, cur.getEffectiveDate());
+            getNbProcessedEvents().incrementAndGet();
+            logDebug("handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
+            final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
+            getHandler().handleReadyNotification(key, cur.getEffectiveDate());
             result++;
             clearNotification(cur);
-            logDebug("done handling notification %s, key = %s for time %s",
-                     cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
+            logDebug("done handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
         }
+
         return result;
     }
 
-
     @Override
     public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
         recordFutureNotificationInternal(futureNotificationTime, notificationKey, dao);
@@ -76,47 +75,50 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
 
     @Override
     public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
-                                                        final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
+                                                        final DateTime futureNotificationTime,
+                                                        final NotificationKey notificationKey) throws IOException {
         final NotificationSqlDao transactionalNotificationDao = transactionalDao.become(NotificationSqlDao.class);
         recordFutureNotificationInternal(futureNotificationTime, notificationKey, transactionalNotificationDao);
     }
-    
-    private void recordFutureNotificationInternal(final DateTime futureNotificationTime, final NotificationKey notificationKey, final NotificationSqlDao thisDao) throws IOException {
+
+    private void recordFutureNotificationInternal(final DateTime futureNotificationTime,
+                                                  final NotificationKey notificationKey,
+                                                  final NotificationSqlDao thisDao) throws IOException {
         final String json = objectMapper.writeValueAsString(notificationKey);
-        final Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.getClass().getName(), json, futureNotificationTime);
+        final Notification notification = new DefaultNotification(getFullQName(), getHostname(), notificationKey.getClass().getName(), json, futureNotificationTime);
         thisDao.insertNotification(notification);
     }
 
     private void clearNotification(final Notification cleared) {
-        dao.clearNotification(cleared.getId().toString(), hostname);
+        dao.clearNotification(cleared.getId().toString(), getHostname());
     }
 
     private List<Notification> getReadyNotifications() {
-
-        final Date now = clock.getUTCNow().toDate();
-        final Date nextAvailable = clock.getUTCNow().plus(CLAIM_TIME_MS).toDate();
-
-        final List<Notification> input = dao.getReadyNotifications(now, hostname, CLAIM_TIME_MS, getFullQName());
+        final Date now = getClock().getUTCNow().toDate();
+        final Date nextAvailable = getClock().getUTCNow().plus(CLAIM_TIME_MS).toDate();
+        final List<Notification> input = dao.getReadyNotifications(now, getHostname(), CLAIM_TIME_MS, getFullQName());
 
         final List<Notification> claimedNotifications = new ArrayList<Notification>();
         for (final Notification cur : input) {
             logDebug("about to claim notification %s,  key = %s for time %s",
                      cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
-            final boolean claimed = (dao.claimNotification(hostname, nextAvailable, cur.getId().toString(), now) == 1);
+
+            final boolean claimed = (dao.claimNotification(getHostname(), nextAvailable, cur.getId().toString(), now) == 1);
             logDebug("claimed notification %s, key = %s for time %s result = %s",
-                     cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate(), Boolean.valueOf(claimed));
+                     cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate(), claimed);
+
             if (claimed) {
                 claimedNotifications.add(cur);
-                dao.insertClaimedHistory(hostname, now, cur.getId().toString());
+                dao.insertClaimedHistory(getHostname(), now, cur.getId().toString());
             }
         }
 
         for (final Notification cur : claimedNotifications) {
-            if (cur.getOwner() != null && !cur.getOwner().equals(hostname)) {
-                log.warn(String.format("NotificationQueue %s stealing notification %s from %s",
-                                       getFullQName(), cur, cur.getOwner()));
+            if (cur.getOwner() != null && !cur.getOwner().equals(getHostname())) {
+                log.warn("NotificationQueue {} stealing notification {} from {}", new Object[]{getFullQName(), cur, cur.getOwner()});
             }
         }
+
         return claimedNotifications;
     }
 
@@ -130,6 +132,5 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     @Override
     public void removeNotificationsByKey(final NotificationKey notificationKey) {
         dao.removeNotificationsByKey(notificationKey.toString());
-
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index 38b6e3c..fcc8c08 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -30,32 +30,26 @@ import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.queue.PersistentQueueBase;
 
-
 public abstract class NotificationQueueBase extends PersistentQueueBase implements NotificationQueue {
-
-    protected static final Logger log = LoggerFactory.getLogger(NotificationQueueBase.class);
+    private static final Logger log = LoggerFactory.getLogger(NotificationQueueBase.class);
 
     public static final int CLAIM_TIME_MS = (5 * 60 * 1000); // 5 minutes
 
     private static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
     private static final int NB_THREADS = 1;
 
-
     private final String svcName;
     private final String queueName;
+    private final NotificationQueueHandler handler;
+    private final NotificationConfig config;
+    private final Clock clock;
+    private final String hostname;
 
-    protected final NotificationQueueHandler handler;
-    protected final NotificationConfig config;
-
-    protected final Clock clock;
-    protected final String hostname;
-
-    protected AtomicLong nbProcessedEvents;
+    private AtomicLong nbProcessedEvents;
 
     // Package visibility on purpose
     NotificationQueueBase(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
         super(svcName, Executors.newFixedThreadPool(1, new ThreadFactory() {
-
             @Override
             public Thread newThread(final Runnable r) {
                 final Thread th = new Thread(r);
@@ -115,11 +109,26 @@ public abstract class NotificationQueueBase extends PersistentQueueBase implemen
         return queueName;
     }
 
-
     public String getFullQName() {
         return NotificationQueueServiceBase.getCompositeName(svcName, queueName);
     }
 
+    public AtomicLong getNbProcessedEvents() {
+        return nbProcessedEvents;
+    }
+
+    public String getHostname() {
+        return hostname;
+    }
+
+    public NotificationQueueHandler getHandler() {
+        return handler;
+    }
+
+    public Clock getClock() {
+        return clock;
+    }
+
     @Override
     public abstract int doProcessEvents();
 }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 5e7d607..edb4f2e 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -19,7 +19,6 @@ package com.ning.billing.util.notificationq;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
 import java.util.TreeSet;
 
@@ -33,10 +32,10 @@ import com.ning.billing.util.notificationq.NotificationQueueService.Notification
 import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 
 public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
     private final TreeSet<Notification> notifications;
 
-    ObjectMapper objectMapper = new ObjectMapper();
-    
     public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
         super(clock, svcName, queueName, handler, config);
         notifications = new TreeSet<Notification>(new Comparator<Notification>() {
@@ -52,18 +51,16 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
-    public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException  {
+    public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
         final String json = objectMapper.writeValueAsString(notificationKey);
-        final Notification notification = new DefaultNotification("MockQueue", hostname, notificationKey.getClass().getName(), json, futureNotificationTime);
+        final Notification notification = new DefaultNotification("MockQueue", getHostname(), notificationKey.getClass().getName(), json, futureNotificationTime);
         synchronized (notifications) {
             notifications.add(notification);
         }
     }
 
     @Override
-    public void recordFutureNotificationFromTransaction(
-            final Transmogrifier transactionalDao, final DateTime futureNotificationTime,
-            final NotificationKey notificationKey) throws IOException  {
+    public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao, final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
         recordFutureNotification(futureNotificationTime, notificationKey);
     }
 
@@ -75,23 +72,20 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
                 result.add(notification);
             }
         }
+
         return result;
     }
 
     @Override
     public int doProcessEvents() {
-
-        int result = 0;
-
+        final int result;
         final List<Notification> processedNotifications = new ArrayList<Notification>();
         final List<Notification> oldNotifications = new ArrayList<Notification>();
 
         final List<Notification> readyNotifications = new ArrayList<Notification>();
         synchronized (notifications) {
-            final Iterator<Notification> it = notifications.iterator();
-            while (it.hasNext()) {
-                final Notification cur = it.next();
-                if (cur.isAvailableForProcessing(clock.getUTCNow())) {
+            for (final Notification cur : notifications) {
+                if (cur.isAvailableForProcessing(getClock().getUTCNow())) {
                     readyNotifications.add(cur);
                 }
             }
@@ -99,15 +93,16 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
 
         result = readyNotifications.size();
         for (final Notification cur : readyNotifications) {
-            
-            
-            NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey()); 
-            handler.handleReadyNotification(key, cur.getEffectiveDate());
-            final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), hostname, hostname, "MockQueue", clock.getUTCNow().plus(CLAIM_TIME_MS), PersistentQueueEntryLifecycleState.PROCESSED,
-                    cur.getNotificationKeyClass(), cur.getNotificationKey(), cur.getEffectiveDate());
+            final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
+            getHandler().handleReadyNotification(key, cur.getEffectiveDate());
+            final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
+                                                                                      "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
+                                                                                      PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
+                                                                                      cur.getNotificationKey(), cur.getEffectiveDate());
             oldNotifications.add(cur);
             processedNotifications.add(processedNotification);
         }
+
         synchronized (notifications) {
             if (oldNotifications.size() > 0) {
                 notifications.removeAll(oldNotifications);
@@ -117,6 +112,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
                 notifications.addAll(processedNotifications);
             }
         }
+
         return result;
     }
 
@@ -128,11 +124,11 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
                 toClearNotifications.add(notification);
             }
         }
+
         synchronized (notifications) {
             if (toClearNotifications.size() > 0) {
                 notifications.removeAll(toClearNotifications);
             }
         }
-
     }
 }