killbill-aplcache

Details

pom.xml 18(+17 -1)

diff --git a/pom.xml b/pom.xml
index d6f3cbf..9ca0abc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,8 @@
 	OR CONDITIONS OF ANY KIND, either express or implied. See the ~ License for 
 	the specific language governing permissions and limitations ~ under the License. -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.sonatype.oss</groupId>
         <artifactId>oss-parent</artifactId>
@@ -365,6 +366,21 @@
                 <artifactId>jul-to-slf4j</artifactId>
                 <version>${slf4j.version}</version>
             </dependency>
+            <dependency><!-- Needed by jmxutils -->
+                <groupId>com.google.inject.extensions</groupId>
+                <artifactId>guice-multibindings</artifactId>
+                <version>3.0</version>
+            </dependency>
+            <dependency>
+                <groupId>org.weakref</groupId>
+                <artifactId>jmxutils</artifactId>
+                <version>1.12</version>
+            </dependency>
+            <dependency>
+                <groupId>com.yammer.metrics</groupId>
+                <artifactId>metrics-core</artifactId>
+                <version>2.2.0</version>
+            </dependency>
             <dependency>
                 <groupId>org.testng</groupId>
                 <artifactId>testng</artifactId>
diff --git a/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java b/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java
index 862e9d0..15c9b9a 100644
--- a/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java
+++ b/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java
@@ -21,14 +21,16 @@ import javax.servlet.ServletContextEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.ning.billing.beatrix.bus.api.ExternalBus;
 import com.ning.billing.beatrix.lifecycle.DefaultLifecycle;
 import com.ning.billing.jaxrs.util.KillbillEventHandler;
 import com.ning.billing.server.config.KillbillServerConfig;
 import com.ning.billing.server.healthchecks.KillbillHealthcheck;
 import com.ning.billing.server.modules.KillbillServerModule;
 import com.ning.billing.server.security.TenantFilter;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
+import com.ning.billing.util.notificationq.NotificationQueueService;
 import com.ning.billing.util.svcsapi.bus.BusService;
+import com.ning.billing.util.svcsapi.bus.InternalBus;
 import com.ning.jetty.base.modules.ServerModuleBuilder;
 import com.ning.jetty.core.listeners.SetupServer;
 
@@ -56,6 +58,9 @@ public class KillbillGuiceListener extends SetupServer {
                 .addConfig(KillbillServerConfig.class)
                 .addHealthCheck(KillbillHealthcheck.class)
                 .addJMXExport(KillbillHealthcheck.class)
+                .addJMXExport(NotificationQueueService.class)
+                .addJMXExport(InternalBus.class)
+                .addJMXExport(ExternalBus.class)
                 .addModule(getModule())
                 .addJerseyResource("com.ning.billing.jaxrs.mappers")
                 .addJerseyResource("com.ning.billing.jaxrs.resources");

util/pom.xml 12(+12 -0)

diff --git a/util/pom.xml b/util/pom.xml
index 4d15aca..c6b82af 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -92,6 +92,18 @@
             <artifactId>stringtemplate</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.google.inject.extensions</groupId>
+            <artifactId>guice-multibindings</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.weakref</groupId>
+            <artifactId>jmxutils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.yammer.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.jayway.awaitility</groupId>
             <artifactId>awaitility</artifactId>
             <scope>test</scope>
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 0a295ae..4e59096 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -58,6 +58,10 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
                                                     @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlQuery
+    public int getPendingCountNotifications(@Bind("now") Date now,
+                                            @InternalTenantContextBinder final InternalTenantContext context);
+
+    @SqlQuery
     @Mapper(NotificationSqlMapper.class)
     public List<Notification> getNotificationForAccountAndDate(@Bind("accountRecordId") final long accountRecordId,
                                                                @Bind("effectiveDate") final Date effectiveDate,
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
index 0a972e0..6899ea7 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
@@ -19,18 +19,22 @@ package com.ning.billing.util.notificationq;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
 
+import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.weakref.jmx.Managed;
 
 import com.ning.billing.util.Hostname;
 import com.ning.billing.util.callcontext.CallOrigin;
@@ -42,6 +46,11 @@ import com.ning.billing.util.notificationq.NotificationQueueService.Notification
 import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 import com.ning.billing.util.queue.PersistentQueueBase;
 
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+
 public class NotificationQueueDispatcher extends PersistentQueueBase {
 
     protected static final Logger log = LoggerFactory.getLogger(NotificationQueueDispatcher.class);
@@ -60,6 +69,14 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
     protected final Clock clock;
     protected final Map<String, NotificationQueue> queues;
 
+
+    //
+    // Metrics
+    //
+    private final Gauge pendingNotifications;
+    private final Counter processedNotificationsSinceStart;
+    private final Map<String, Histogram> perQueueProcessingTime;
+
     // Package visibility on purpose
     NotificationQueueDispatcher(final Clock clock, final NotificationQueueConfig config, final IDBI dbi, final InternalCallContextFactory internalCallContextFactory) {
         super("NotificationQ", Executors.newFixedThreadPool(1, new ThreadFactory() {
@@ -85,8 +102,17 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
         this.nbProcessedEvents = new AtomicLong();
 
         this.queues = new TreeMap<String, NotificationQueue>();
-    }
 
+        this.pendingNotifications = Metrics.newGauge(NotificationQueueDispatcher.class, "pending-notifications", new Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return dao != null ? dao.getPendingCountNotifications(clock.getUTCNow().toDate(), createCallContext(null, null)) : 0;
+            }
+        });
+
+        this.processedNotificationsSinceStart = Metrics.newCounter(NotificationQueueDispatcher.class, "processed-notifications-since-start");
+        this.perQueueProcessingTime = new HashMap<String, Histogram>();
+    }
     @Override
     public void stopQueue() {
         if (config.isProcessingOff() || !isStarted()) {
@@ -122,6 +148,7 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
         return clock;
     }
 
+
     protected NotificationQueueHandler getHandlerForActiveQueue(final String compositeName) {
         synchronized (queues) {
             final NotificationQueue queue = queues.get(compositeName);
@@ -138,6 +165,7 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
     }
 
     protected int doProcessEventsWithLimit(int limit) {
+
         logDebug("ENTER doProcessEvents");
         // Finding and claiming notifications is not done per tenant (yet?)
         final List<Notification> notifications = getReadyNotifications(createCallContext(null, null));
@@ -164,16 +192,41 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
             if (handler == null) {
                 continue;
             }
-            handler.handleReadyNotification(key, cur.getEffectiveDate(), cur.getFutureUserToken(), cur.getAccountRecordId(), cur.getTenantRecordId());
 
+            handleNotificationWithMetrics(handler, cur, key);
             result++;
             clearNotification(cur, createCallContext(cur.getTenantRecordId(), cur.getAccountRecordId()));
             logDebug("done handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
         }
-
         return result;
     }
 
+    private void handleNotificationWithMetrics(final NotificationQueueHandler handler, final Notification notification, final NotificationKey key) {
+
+        // Create specific metric name because:
+        // - ':' is not allowed for metric name
+        // - name would be too long (e.g entitlement-service:subscription-events-process-time -> ent-subscription-events-process-time)
+        //
+        final String [] parts = notification.getQueueName().split(":");
+        final String metricName = new StringBuilder(parts[0].substring(0, 3))
+                .append("-")
+                .append(parts[1])
+                .append("-process-time").toString();
+
+        final Histogram perQueueHistogramProcessingTime;
+        synchronized(perQueueProcessingTime) {
+            if (!perQueueProcessingTime.containsKey(notification.getQueueName())) {
+                perQueueProcessingTime.put(notification.getQueueName(), Metrics.newHistogram(NotificationQueueDispatcher.class, metricName));
+            }
+            perQueueHistogramProcessingTime = perQueueProcessingTime.get(notification.getQueueName());
+        }
+        final DateTime beforeProcessing =  clock.getUTCNow();
+        handler.handleReadyNotification(key, notification.getEffectiveDate(), notification.getFutureUserToken(), notification.getAccountRecordId(), notification.getTenantRecordId());
+        final DateTime afterProcessing =  clock.getUTCNow();
+        perQueueHistogramProcessingTime.update(afterProcessing.getMillis() - beforeProcessing.getMillis());
+        processedNotificationsSinceStart.inc();
+    }
+
     private void clearNotification(final Notification cleared, final InternalCallContext context) {
         dao.clearNotification(cleared.getId().toString(), getHostname(), context);
     }
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index bff0441..8e1f7bf 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.weakref.jmx.Managed;
 
 import com.ning.billing.util.config.PersistentQueueConfig;
 import com.ning.billing.util.jackson.ObjectMapper;
@@ -44,6 +45,10 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
 
     private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
+    // Allow to disable/re-enable notitifcation processing through JMX
+    private final AtomicBoolean isProcessingSuspended;
+
+
     public PersistentQueueBase(final String svcQName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
         this.executor = executor;
         this.nbThreads = nbThreads;
@@ -52,6 +57,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
         this.objectMapper = new ObjectMapper();        
         this.isProcessingEvents = false;
         this.curActiveThreads = 0;
+        this.isProcessingSuspended = new AtomicBoolean(false);
     }
 
 
@@ -97,7 +103,9 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
                             }
 
                             try {
-                                doProcessEvents();
+                                if (!isProcessingSuspended.get()) {
+                                    doProcessEvents();
+                                }
                             } catch (Exception e) {
                                 log.warn(String.format("%s: Thread  %s  [%d] got an exception, catching and moving on...",
                                                        svcQName,
@@ -169,6 +177,22 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
         }
     }
 
+    @Managed(description="suspend processing for all notifications")
+    public void suspendNotificationProcessing() {
+        isProcessingSuspended.set(true);
+    }
+
+    @Managed(description="resume processing for all notifications")
+    public void resumeNotificationProcessing() {
+        isProcessingSuspended.set(false);
+    }
+
+    @Managed(description="check whether notification processing is suspended")
+    public boolean isNotificationProcessingSuspended() {
+        return isProcessingSuspended.get();
+    }
+
+
     protected <T> T deserializeEvent(final String className, final String json) {
         try {
             final Class<?> claz = Class.forName(className);
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 895ea5d..d2266b9 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -34,6 +34,16 @@ getReadyNotifications() ::= <<
     ;
 >>
 
+getPendingCountNotifications() ::= <<
+    select
+      count(*)
+    from notifications
+    where
+      effective_date \<= :now
+      and processing_state = 'AVAILABLE'
+    ;
+>>
+
 getNotificationForAccountAndDate() ::= <<
    select
        record_id
diff --git a/util/src/test/java/com/ning/billing/TestJMX.java b/util/src/test/java/com/ning/billing/TestJMX.java
new file mode 100644
index 0000000..aea18fa
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/TestJMX.java
@@ -0,0 +1,4 @@
+package com.ning.billing;
+
+public class TestJMX {
+}