diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java
index 4ca6487..39d0435 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java
@@ -20,13 +20,19 @@ package org.killbill.billing.jaxrs.resources;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
import java.util.UUID;
+import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
@@ -67,13 +73,22 @@ import org.killbill.billing.util.callcontext.TenantContext;
import org.killbill.billing.util.entity.Pagination;
import org.killbill.billing.util.tag.Tag;
import org.killbill.billing.util.tag.dao.SystemTags;
+import org.killbill.bus.api.BusEvent;
+import org.killbill.bus.api.BusEventWithMetadata;
+import org.killbill.bus.api.PersistentBus;
import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
import com.fasterxml.jackson.core.JsonGenerator;
import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
import com.google.inject.Singleton;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
@@ -96,15 +111,84 @@ public class AdminResource extends JaxRsResourceBase {
private final TenantUserApi tenantApi;
private final CacheManager cacheManager;
private final RecordIdApi recordIdApi;
+ private final PersistentBus persistentBus;
+ private final NotificationQueueService notificationQueueService;
@Inject
- public AdminResource(final JaxrsUriBuilder uriBuilder, final TagUserApi tagUserApi, final CustomFieldUserApi customFieldUserApi, final AuditUserApi auditUserApi, final AccountUserApi accountUserApi, final PaymentApi paymentApi, final AdminPaymentApi adminPaymentApi, final InvoiceUserApi invoiceUserApi, final CacheManager cacheManager, final TenantUserApi tenantApi, final RecordIdApi recordIdApi, final Clock clock, final Context context) {
+ public AdminResource(final JaxrsUriBuilder uriBuilder,
+ final TagUserApi tagUserApi,
+ final CustomFieldUserApi customFieldUserApi,
+ final AuditUserApi auditUserApi,
+ final AccountUserApi accountUserApi,
+ final PaymentApi paymentApi,
+ final AdminPaymentApi adminPaymentApi,
+ final InvoiceUserApi invoiceUserApi,
+ final CacheManager cacheManager,
+ final TenantUserApi tenantApi,
+ final RecordIdApi recordIdApi,
+ final PersistentBus persistentBus,
+ final NotificationQueueService notificationQueueService,
+ final Clock clock,
+ final Context context) {
super(uriBuilder, tagUserApi, customFieldUserApi, auditUserApi, accountUserApi, paymentApi, null, clock, context);
this.adminPaymentApi = adminPaymentApi;
this.invoiceUserApi = invoiceUserApi;
this.tenantApi = tenantApi;
this.recordIdApi = recordIdApi;
this.cacheManager = cacheManager;
+ this.persistentBus = persistentBus;
+ this.notificationQueueService = notificationQueueService;
+ }
+
+ @GET
+ @Path("/queues")
+ @Produces(APPLICATION_JSON)
+ @ApiOperation(value = "Get queues entries", response = Response.class)
+ @ApiResponses(value = {})
+ public Response getCurrentTime(@QueryParam("accountId") final String accountIdStr,
+ @QueryParam("queueName") final String queueName,
+ @QueryParam("serviceName") final String serviceName,
+ @QueryParam("withHistory") @DefaultValue("true") final Boolean withHistory,
+ @QueryParam("withInProcessing") @DefaultValue("true") final Boolean withInProcessing,
+ @QueryParam("withBusEvents") @DefaultValue("true") final Boolean withBusEvents,
+ @QueryParam("withNotifications") @DefaultValue("true") final Boolean withNotifications,
+ @javax.ws.rs.core.Context final HttpServletRequest request) {
+ final TenantContext tenantContext = context.createContext(request);
+ final Long tenantRecordId = recordIdApi.getRecordId(tenantContext.getTenantId(), ObjectType.TENANT, tenantContext);
+ final Long accountRecordId = Strings.isNullOrEmpty(accountIdStr) ? null : recordIdApi.getRecordId(UUID.fromString(accountIdStr), ObjectType.ACCOUNT, tenantContext);
+
+ final StreamingOutput json = new StreamingOutput() {
+ @Override
+ public void write(final OutputStream output) throws IOException, WebApplicationException {
+ final JsonGenerator generator = mapper.getFactory().createGenerator(output);
+ generator.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
+
+ generator.writeStartObject();
+
+ if (withBusEvents) {
+ generator.writeFieldName("busEvents");
+ generator.writeStartArray();
+ for (final BusEventWithMetadata<BusEvent> busEvent : getBusEvents(withInProcessing, withHistory, accountRecordId, tenantRecordId)) {
+ generator.writeObject(new BusEventWithRichMetadata(busEvent));
+ }
+ generator.writeEndArray();
+ }
+
+ if (withNotifications) {
+ generator.writeFieldName("notifications");
+ generator.writeStartArray();
+ for (final NotificationEventWithMetadata<NotificationEvent> notification : getNotifications(queueName, serviceName, withInProcessing, withHistory, accountRecordId, tenantRecordId)) {
+ generator.writeObject(notification);
+ }
+ generator.writeEndArray();
+ }
+
+ generator.writeEndObject();
+ generator.close();
+ }
+ };
+
+ return Response.status(Status.OK).entity(json).build();
}
@PUT
@@ -296,4 +380,85 @@ public class AdminResource extends JaxRsResourceBase {
}
}
+ private Iterable<NotificationEventWithMetadata<NotificationEvent>> getNotifications(@Nullable final String queueName,
+ @Nullable final String serviceName,
+ final boolean includeInProcessing,
+ final boolean includeHistory, // TODO
+ @Nullable final Long accountRecordId,
+ final Long tenantRecordId) {
+ final Collection<NotificationEventWithMetadata<NotificationEvent>> notifications = new LinkedList<NotificationEventWithMetadata<NotificationEvent>>();
+ for (final NotificationQueue notificationQueue : notificationQueueService.getNotificationQueues()) {
+ if (queueName != null && !queueName.equals(notificationQueue.getQueueName())) {
+ continue;
+ } else if (serviceName != null && !serviceName.equals(notificationQueue.getServiceName())) {
+ continue;
+ }
+
+ final List<NotificationEventWithMetadata<NotificationEvent>> notificationsForQueue;
+ if (includeInProcessing) {
+ if (accountRecordId != null) {
+ notificationsForQueue = notificationQueue.getFutureOrInProcessingNotificationForSearchKeys(accountRecordId, tenantRecordId);
+ } else {
+ notificationsForQueue = notificationQueue.getFutureOrInProcessingNotificationForSearchKey2(tenantRecordId);
+ }
+ } else {
+ if (accountRecordId != null) {
+ notificationsForQueue = notificationQueue.getFutureNotificationForSearchKeys(accountRecordId, tenantRecordId);
+ } else {
+ notificationsForQueue = notificationQueue.getFutureNotificationForSearchKey2(tenantRecordId);
+ }
+ }
+
+ notifications.addAll(notificationsForQueue);
+ }
+
+ return Ordering.<NotificationEventWithMetadata<NotificationEvent>>from(new Comparator<NotificationEventWithMetadata<NotificationEvent>>() {
+ @Override
+ public int compare(final NotificationEventWithMetadata<NotificationEvent> o1, final NotificationEventWithMetadata<NotificationEvent> o2) {
+ final int effectiveDateComparison = o1.getEffectiveDate().compareTo(o2.getEffectiveDate());
+ return effectiveDateComparison == 0 ? o1.getRecordId().compareTo(o2.getRecordId()) : effectiveDateComparison;
+ }
+ }).sortedCopy(notifications);
+ }
+
+ private Iterable<BusEventWithMetadata<BusEvent>> getBusEvents(final boolean includeInProcessing,
+ final boolean includeHistory, // TODO
+ @Nullable final Long accountRecordId,
+ final Long tenantRecordId) {
+ final List<BusEventWithMetadata<BusEvent>> busEvents;
+ if (includeInProcessing) {
+ if (accountRecordId != null) {
+ busEvents = persistentBus.getAvailableOrInProcessingBusEventsForSearchKeys(accountRecordId, tenantRecordId);
+ } else {
+ busEvents = persistentBus.getAvailableOrInProcessingBusEventsForSearchKey2(tenantRecordId);
+ }
+ } else {
+ if (accountRecordId != null) {
+ busEvents = persistentBus.getAvailableBusEventsForSearchKeys(accountRecordId, tenantRecordId);
+ } else {
+ busEvents = persistentBus.getAvailableBusEventsForSearchKey2(tenantRecordId);
+ }
+ }
+
+ return Ordering.<BusEventWithMetadata<BusEvent>>from(new Comparator<BusEventWithMetadata<BusEvent>>() {
+ @Override
+ public int compare(final BusEventWithMetadata<BusEvent> o1, final BusEventWithMetadata<BusEvent> o2) {
+ return o1.getRecordId().compareTo(o2.getRecordId());
+ }
+ }).sortedCopy(busEvents);
+ }
+
+ private class BusEventWithRichMetadata extends BusEventWithMetadata<BusEvent> {
+
+ private final String className;
+
+ public BusEventWithRichMetadata(final BusEventWithMetadata<BusEvent> event) {
+ super(event.getRecordId(), event.getUserToken(), event.getCreatedDate(), event.getSearchKey1(), event.getSearchKey2(), event.getEvent());
+ this.className = event.getEvent().getClass().getName();
+ }
+
+ public String getClassName() {
+ return className;
+ }
+ }
}