killbill-aplcache
Changes
pom.xml 18(+17 -1)
util/pom.xml 12(+12 -0)
Details
pom.xml 18(+17 -1)
diff --git a/pom.xml b/pom.xml
index d6f3cbf..9ca0abc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,8 @@
OR CONDITIONS OF ANY KIND, either express or implied. See the ~ License for
the specific language governing permissions and limitations ~ under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.sonatype.oss</groupId>
<artifactId>oss-parent</artifactId>
@@ -365,6 +366,21 @@
<artifactId>jul-to-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
+ <dependency><!-- Needed by jmxutils -->
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-multibindings</artifactId>
+ <version>3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.weakref</groupId>
+ <artifactId>jmxutils</artifactId>
+ <version>1.12</version>
+ </dependency>
+ <dependency>
+ <groupId>com.yammer.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>2.2.0</version>
+ </dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
diff --git a/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java b/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java
index 862e9d0..15c9b9a 100644
--- a/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java
+++ b/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java
@@ -21,14 +21,16 @@ import javax.servlet.ServletContextEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.ning.billing.beatrix.bus.api.ExternalBus;
import com.ning.billing.beatrix.lifecycle.DefaultLifecycle;
import com.ning.billing.jaxrs.util.KillbillEventHandler;
import com.ning.billing.server.config.KillbillServerConfig;
import com.ning.billing.server.healthchecks.KillbillHealthcheck;
import com.ning.billing.server.modules.KillbillServerModule;
import com.ning.billing.server.security.TenantFilter;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
+import com.ning.billing.util.notificationq.NotificationQueueService;
import com.ning.billing.util.svcsapi.bus.BusService;
+import com.ning.billing.util.svcsapi.bus.InternalBus;
import com.ning.jetty.base.modules.ServerModuleBuilder;
import com.ning.jetty.core.listeners.SetupServer;
@@ -56,6 +58,9 @@ public class KillbillGuiceListener extends SetupServer {
.addConfig(KillbillServerConfig.class)
.addHealthCheck(KillbillHealthcheck.class)
.addJMXExport(KillbillHealthcheck.class)
+ .addJMXExport(NotificationQueueService.class)
+ .addJMXExport(InternalBus.class)
+ .addJMXExport(ExternalBus.class)
.addModule(getModule())
.addJerseyResource("com.ning.billing.jaxrs.mappers")
.addJerseyResource("com.ning.billing.jaxrs.resources");
util/pom.xml 12(+12 -0)
diff --git a/util/pom.xml b/util/pom.xml
index 4d15aca..c6b82af 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -92,6 +92,18 @@
<artifactId>stringtemplate</artifactId>
</dependency>
<dependency>
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-multibindings</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.weakref</groupId>
+ <artifactId>jmxutils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.yammer.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.jayway.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 0a295ae..4e59096 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -58,6 +58,10 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
@InternalTenantContextBinder final InternalTenantContext context);
@SqlQuery
+ public int getPendingCountNotifications(@Bind("now") Date now,
+ @InternalTenantContextBinder final InternalTenantContext context);
+
+ @SqlQuery
@Mapper(NotificationSqlMapper.class)
public List<Notification> getNotificationForAccountAndDate(@Bind("accountRecordId") final long accountRecordId,
@Bind("effectiveDate") final Date effectiveDate,
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
index 0a972e0..6899ea7 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java
@@ -19,18 +19,22 @@ package com.ning.billing.util.notificationq;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
+import org.joda.time.DateTime;
import org.skife.jdbi.v2.IDBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.weakref.jmx.Managed;
import com.ning.billing.util.Hostname;
import com.ning.billing.util.callcontext.CallOrigin;
@@ -42,6 +46,11 @@ import com.ning.billing.util.notificationq.NotificationQueueService.Notification
import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
import com.ning.billing.util.queue.PersistentQueueBase;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+
public class NotificationQueueDispatcher extends PersistentQueueBase {
protected static final Logger log = LoggerFactory.getLogger(NotificationQueueDispatcher.class);
@@ -60,6 +69,14 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
protected final Clock clock;
protected final Map<String, NotificationQueue> queues;
+
+ //
+ // Metrics
+ //
+ private final Gauge pendingNotifications;
+ private final Counter processedNotificationsSinceStart;
+ private final Map<String, Histogram> perQueueProcessingTime;
+
// Package visibility on purpose
NotificationQueueDispatcher(final Clock clock, final NotificationQueueConfig config, final IDBI dbi, final InternalCallContextFactory internalCallContextFactory) {
super("NotificationQ", Executors.newFixedThreadPool(1, new ThreadFactory() {
@@ -85,8 +102,17 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
this.nbProcessedEvents = new AtomicLong();
this.queues = new TreeMap<String, NotificationQueue>();
- }
+ this.pendingNotifications = Metrics.newGauge(NotificationQueueDispatcher.class, "pending-notifications", new Gauge<Integer>() {
+ @Override
+ public Integer value() {
+ return dao != null ? dao.getPendingCountNotifications(clock.getUTCNow().toDate(), createCallContext(null, null)) : 0;
+ }
+ });
+
+ this.processedNotificationsSinceStart = Metrics.newCounter(NotificationQueueDispatcher.class, "processed-notifications-since-start");
+ this.perQueueProcessingTime = new HashMap<String, Histogram>();
+ }
@Override
public void stopQueue() {
if (config.isProcessingOff() || !isStarted()) {
@@ -122,6 +148,7 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
return clock;
}
+
protected NotificationQueueHandler getHandlerForActiveQueue(final String compositeName) {
synchronized (queues) {
final NotificationQueue queue = queues.get(compositeName);
@@ -138,6 +165,7 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
}
protected int doProcessEventsWithLimit(int limit) {
+
logDebug("ENTER doProcessEvents");
// Finding and claiming notifications is not done per tenant (yet?)
final List<Notification> notifications = getReadyNotifications(createCallContext(null, null));
@@ -164,16 +192,41 @@ public class NotificationQueueDispatcher extends PersistentQueueBase {
if (handler == null) {
continue;
}
- handler.handleReadyNotification(key, cur.getEffectiveDate(), cur.getFutureUserToken(), cur.getAccountRecordId(), cur.getTenantRecordId());
+ handleNotificationWithMetrics(handler, cur, key);
result++;
clearNotification(cur, createCallContext(cur.getTenantRecordId(), cur.getAccountRecordId()));
logDebug("done handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
}
-
return result;
}
+ private void handleNotificationWithMetrics(final NotificationQueueHandler handler, final Notification notification, final NotificationKey key) {
+
+ // Create specific metric name because:
+ // - ':' is not allowed for metric name
+ // - name would be too long (e.g entitlement-service:subscription-events-process-time -> ent-subscription-events-process-time)
+ //
+ final String [] parts = notification.getQueueName().split(":");
+ final String metricName = new StringBuilder(parts[0].substring(0, 3))
+ .append("-")
+ .append(parts[1])
+ .append("-process-time").toString();
+
+ final Histogram perQueueHistogramProcessingTime;
+ synchronized(perQueueProcessingTime) {
+ if (!perQueueProcessingTime.containsKey(notification.getQueueName())) {
+ perQueueProcessingTime.put(notification.getQueueName(), Metrics.newHistogram(NotificationQueueDispatcher.class, metricName));
+ }
+ perQueueHistogramProcessingTime = perQueueProcessingTime.get(notification.getQueueName());
+ }
+ final DateTime beforeProcessing = clock.getUTCNow();
+ handler.handleReadyNotification(key, notification.getEffectiveDate(), notification.getFutureUserToken(), notification.getAccountRecordId(), notification.getTenantRecordId());
+ final DateTime afterProcessing = clock.getUTCNow();
+ perQueueHistogramProcessingTime.update(afterProcessing.getMillis() - beforeProcessing.getMillis());
+ processedNotificationsSinceStart.inc();
+ }
+
private void clearNotification(final Notification cleared, final InternalCallContext context) {
dao.clearNotification(cleared.getId().toString(), getHostname(), context);
}
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index bff0441..8e1f7bf 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.weakref.jmx.Managed;
import com.ning.billing.util.config.PersistentQueueConfig;
import com.ning.billing.util.jackson.ObjectMapper;
@@ -44,6 +45,10 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
private final AtomicBoolean isStarted = new AtomicBoolean(false);
+ // Allow to disable/re-enable notitifcation processing through JMX
+ private final AtomicBoolean isProcessingSuspended;
+
+
public PersistentQueueBase(final String svcQName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
this.executor = executor;
this.nbThreads = nbThreads;
@@ -52,6 +57,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
this.objectMapper = new ObjectMapper();
this.isProcessingEvents = false;
this.curActiveThreads = 0;
+ this.isProcessingSuspended = new AtomicBoolean(false);
}
@@ -97,7 +103,9 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
}
try {
- doProcessEvents();
+ if (!isProcessingSuspended.get()) {
+ doProcessEvents();
+ }
} catch (Exception e) {
log.warn(String.format("%s: Thread %s [%d] got an exception, catching and moving on...",
svcQName,
@@ -169,6 +177,22 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
}
}
+ @Managed(description="suspend processing for all notifications")
+ public void suspendNotificationProcessing() {
+ isProcessingSuspended.set(true);
+ }
+
+ @Managed(description="resume processing for all notifications")
+ public void resumeNotificationProcessing() {
+ isProcessingSuspended.set(false);
+ }
+
+ @Managed(description="check whether notification processing is suspended")
+ public boolean isNotificationProcessingSuspended() {
+ return isProcessingSuspended.get();
+ }
+
+
protected <T> T deserializeEvent(final String className, final String json) {
try {
final Class<?> claz = Class.forName(className);
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 895ea5d..d2266b9 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -34,6 +34,16 @@ getReadyNotifications() ::= <<
;
>>
+getPendingCountNotifications() ::= <<
+ select
+ count(*)
+ from notifications
+ where
+ effective_date \<= :now
+ and processing_state = 'AVAILABLE'
+ ;
+>>
+
getNotificationForAccountAndDate() ::= <<
select
record_id
diff --git a/util/src/test/java/com/ning/billing/TestJMX.java b/util/src/test/java/com/ning/billing/TestJMX.java
new file mode 100644
index 0000000..aea18fa
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/TestJMX.java
@@ -0,0 +1,4 @@
+package com.ning.billing;
+
+public class TestJMX {
+}