killbill-aplcache
Changes
profiles/killbill/src/main/java/org/killbill/billing/server/modules/KillbillServerModule.java 11(+9 -2)
profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationKey.java 98(+98 -0)
profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java 118(+117 -1)
Details
diff --git a/profiles/killbill/src/main/java/org/killbill/billing/server/DefaultServerService.java b/profiles/killbill/src/main/java/org/killbill/billing/server/DefaultServerService.java
index a085bfa..cad84c9 100644
--- a/profiles/killbill/src/main/java/org/killbill/billing/server/DefaultServerService.java
+++ b/profiles/killbill/src/main/java/org/killbill/billing/server/DefaultServerService.java
@@ -25,8 +25,11 @@ import org.killbill.billing.lifecycle.glue.BusModule;
import org.killbill.billing.platform.api.LifecycleHandlerType;
import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
import org.killbill.billing.server.notifications.PushNotificationListener;
+import org.killbill.billing.server.notifications.PushNotificationRetryService;
import org.killbill.bus.api.PersistentBus;
import org.killbill.bus.api.PersistentBus.EventBusException;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueAlreadyExists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,15 +37,19 @@ public class DefaultServerService implements ServerService {
private static final Logger log = LoggerFactory.getLogger(DefaultServerService.class);
- private static final String SERVER_SERVICE = "server-service";
+ public static final String SERVER_SERVICE = "server-service";
private final PersistentBus bus;
private final PushNotificationListener pushNotificationListener;
+ private final PushNotificationRetryService pushNotificationRetryService;
@Inject
- public DefaultServerService(@Named(BusModule.EXTERNAL_BUS_NAMED) final PersistentBus bus, final PushNotificationListener pushNotificationListener) {
+ public DefaultServerService(@Named(BusModule.EXTERNAL_BUS_NAMED) final PersistentBus bus,
+ final PushNotificationListener pushNotificationListener,
+ final PushNotificationRetryService pushNotificationRetryService) {
this.bus = bus;
this.pushNotificationListener = pushNotificationListener;
+ this.pushNotificationRetryService = pushNotificationRetryService;
}
@Override
@@ -51,20 +58,27 @@ public class DefaultServerService implements ServerService {
}
@LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
- public void registerForNotifications() {
+ public void registerForNotifications() throws NotificationQueueAlreadyExists {
try {
bus.register(pushNotificationListener);
} catch (final EventBusException e) {
log.warn("Failed to register PushNotificationListener", e);
}
+ pushNotificationRetryService.initialize();
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+ public void start() {
+ pushNotificationRetryService.start();
}
@LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
- public void unregisterForNotifications() {
+ public void unregisterForNotifications() throws NoSuchNotificationQueue {
try {
bus.unregister(pushNotificationListener);
} catch (final EventBusException e) {
log.warn("Failed to unregister PushNotificationListener", e);
}
+ pushNotificationRetryService.stop();
}
}
diff --git a/profiles/killbill/src/main/java/org/killbill/billing/server/modules/KillbillServerModule.java b/profiles/killbill/src/main/java/org/killbill/billing/server/modules/KillbillServerModule.java
index 8c32108..cd01301 100644
--- a/profiles/killbill/src/main/java/org/killbill/billing/server/modules/KillbillServerModule.java
+++ b/profiles/killbill/src/main/java/org/killbill/billing/server/modules/KillbillServerModule.java
@@ -60,9 +60,11 @@ import org.killbill.billing.server.ServerService;
import org.killbill.billing.server.config.KillbillServerConfig;
import org.killbill.billing.server.filters.ResponseCorsFilter;
import org.killbill.billing.server.notifications.PushNotificationListener;
+import org.killbill.billing.server.notifications.PushNotificationRetryService;
import org.killbill.billing.subscription.glue.DefaultSubscriptionModule;
import org.killbill.billing.tenant.glue.DefaultTenantModule;
import org.killbill.billing.usage.glue.UsageModule;
+import org.killbill.billing.util.config.definition.NotificationConfig;
import org.killbill.billing.util.dao.AuditLogModelDaoMapper;
import org.killbill.billing.util.dao.RecordIdIdMappingsMapper;
import org.killbill.billing.util.email.EmailModule;
@@ -76,9 +78,9 @@ import org.killbill.billing.util.glue.ConfigModule;
import org.killbill.billing.util.glue.CustomFieldModule;
import org.killbill.billing.util.glue.ExportModule;
import org.killbill.billing.util.glue.GlobalLockerModule;
-import org.killbill.billing.util.glue.NodesModule;
import org.killbill.billing.util.glue.KillBillShiroAopModule;
import org.killbill.billing.util.glue.KillbillApiAopModule;
+import org.killbill.billing.util.glue.NodesModule;
import org.killbill.billing.util.glue.NonEntityDaoModule;
import org.killbill.billing.util.glue.RecordIdModule;
import org.killbill.billing.util.glue.SecurityModule;
@@ -88,6 +90,7 @@ import org.killbill.clock.Clock;
import org.killbill.clock.ClockMock;
import org.killbill.commons.embeddeddb.EmbeddedDB;
import org.killbill.commons.jdbi.mapper.LowerToCamelBeanMapperFactory;
+import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.ResultSetMapperFactory;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
@@ -207,7 +210,11 @@ public class KillbillServerModule extends KillbillPlatformModule {
}
protected void configurePushNotification() {
- bind(ServerService.class).to(DefaultServerService.class).asEagerSingleton();
+ final ConfigurationObjectFactory factory = new ConfigurationObjectFactory(skifeConfigSource);
+ final NotificationConfig notificationConfig = factory.build(NotificationConfig.class);
+ bind(NotificationConfig.class).toInstance(notificationConfig);
bind(PushNotificationListener.class).asEagerSingleton();
+ bind(PushNotificationRetryService.class).asEagerSingleton();
+ bind(ServerService.class).to(DefaultServerService.class).asEagerSingleton();
}
}
diff --git a/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationKey.java b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationKey.java
new file mode 100644
index 0000000..f77799e
--- /dev/null
+++ b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationKey.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.server.notifications;
+
+import java.util.UUID;
+
+import org.killbill.notificationq.api.NotificationEvent;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class PushNotificationKey implements NotificationEvent {
+
+ private final UUID tenantId;
+ private final UUID accountId;
+ private final String eventType;
+ private final String objectType;
+ private final UUID objectId;
+ private Integer attemptNumber;
+ private final String url;
+
+ @JsonCreator
+ public PushNotificationKey(@JsonProperty("tenantId") final UUID tenantId,
+ @JsonProperty("accountId") final UUID accountId,
+ @JsonProperty("eventType") final String eventType,
+ @JsonProperty("objectType") final String objectType,
+ @JsonProperty("objectId") final UUID objectId,
+ @JsonProperty("attemptNumber") final Integer attemptNumber,
+ @JsonProperty("url") final String url) {
+ this.tenantId = tenantId;
+ this.accountId = accountId;
+ this.eventType = eventType;
+ this.objectType = objectType;
+ this.objectId = objectId;
+ this.attemptNumber = attemptNumber;
+ this.url = url;
+ }
+
+ public void increaseAttemptNumber() {
+ this.attemptNumber = ++attemptNumber;
+ }
+
+ public UUID getTenantId() {
+ return tenantId;
+ }
+
+ public UUID getAccountId() {
+ return accountId;
+ }
+
+ public String getEventType() {
+ return eventType;
+ }
+
+ public String getObjectType() {
+ return objectType;
+ }
+
+ public UUID getObjectId() {
+ return objectId;
+ }
+
+ public Integer getAttemptNumber() {
+ return attemptNumber == null ? 0 : attemptNumber;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ @Override
+ public String toString() {
+ return "PushNotificationKey{" +
+ "tenantId=" + tenantId +
+ ", accountId=" + accountId +
+ ", eventType='" + eventType + '\'' +
+ ", objectType='" + objectType + '\'' +
+ ", objectId=" + objectId +
+ ", attemptNumber=" + attemptNumber +
+ ", url='" + url + '\'' +
+ '}';
+ }
+}
diff --git a/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java
index 7e3a993..b6e968c 100644
--- a/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java
+++ b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java
@@ -22,16 +22,29 @@ import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
+import org.joda.time.DateTime;
+import org.killbill.billing.ObjectType;
+import org.killbill.billing.callcontext.InternalCallContext;
+import org.killbill.billing.callcontext.InternalTenantContext;
import org.killbill.billing.jaxrs.json.NotificationJson;
import org.killbill.billing.notification.plugin.api.ExtBusEvent;
+import org.killbill.billing.server.DefaultServerService;
import org.killbill.billing.tenant.api.TenantApiException;
import org.killbill.billing.tenant.api.TenantKV.TenantKey;
import org.killbill.billing.tenant.api.TenantUserApi;
import org.killbill.billing.util.callcontext.CallContextFactory;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.TenantContext;
+import org.killbill.billing.util.config.definition.NotificationConfig;
+import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.skife.config.TimeSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,8 +55,10 @@ import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.ListenableFuture;
import com.ning.http.client.Response;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
@@ -62,13 +77,23 @@ public class PushNotificationListener {
private final CallContextFactory contextFactory;
private final AsyncHttpClient httpClient;
private final ObjectMapper mapper;
+ private final NotificationQueueService notificationQueueService;
+ private final InternalCallContextFactory internalCallContextFactory;
+ private final Clock clock;
+ private final NotificationConfig notificationConfig;
@Inject
- public PushNotificationListener(final ObjectMapper mapper, final TenantUserApi tenantApi, final CallContextFactory contextFactory) {
+ public PushNotificationListener(final ObjectMapper mapper, final TenantUserApi tenantApi, final CallContextFactory contextFactory,
+ final NotificationQueueService notificationQueueService, final InternalCallContextFactory internalCallContextFactory,
+ final Clock clock, final NotificationConfig notificationConfig) {
this.httpClient = new AsyncHttpClient(new AsyncHttpClientConfig.Builder().setRequestTimeout(TIMEOUT_NOTIFICATION * 1000).build());
this.tenantApi = tenantApi;
this.contextFactory = contextFactory;
this.mapper = mapper;
+ this.notificationQueueService = notificationQueueService;
+ this.internalCallContextFactory = internalCallContextFactory;
+ this.clock = clock;
+ this.notificationConfig = notificationConfig;
}
@AllowConcurrentEvents
@@ -112,13 +137,104 @@ public class PushNotificationListener {
}
});
response = futureStatus.get(timeoutSec, TimeUnit.SECONDS);
+ } catch (final TimeoutException toe) {
+ saveRetryPushNotificationInQueue(tenantId, url, body);
+ return false;
} catch (final Exception e) {
log.warn("Failed to push notification url='{}', tenantId='{}'", url, tenantId, e);
return false;
}
+
+ if ((response.getStatusCode() == 301) || (response.getStatusCode() == 500)) {
+ saveRetryPushNotificationInQueue(tenantId, url, body);
+ return false;
+ }
+
return response.getStatusCode() >= 200 && response.getStatusCode() < 300;
}
+ public void resendPushNotification(final PushNotificationKey key, final InternalCallContext callContext) throws JsonProcessingException {
+
+ final NotificationJson notification = new NotificationJson(key.getEventType() != null ? key.getEventType().toString() : null,
+ key.getAccountId() != null ? key.getAccountId().toString() : null,
+ key.getObjectType() != null ? key.getObjectType().toString() : null,
+ key.getObjectId() != null ? key.getObjectId().toString() : null);
+ final String body = mapper.writeValueAsString(notification);
+
+ final BoundRequestBuilder builder = httpClient.preparePost(key.getUrl());
+ builder.setBody(body == null ? "{}" : body);
+ builder.addHeader(HTTP_HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON);
+
+ final Response response;
+ try {
+ final ListenableFuture<Response> futureStatus =
+ builder.execute(new AsyncCompletionHandler<Response>() {
+ @Override
+ public Response onCompleted(final Response response) throws Exception {
+ return response;
+ }
+ });
+ response = futureStatus.get(TIMEOUT_NOTIFICATION, TimeUnit.SECONDS);
+ } catch (final TimeoutException toe) {
+ saveRetryPushNotificationInQueue(key);
+ return;
+ } catch (final Exception e) {
+ log.warn("Failed to push notification url='{}', tenantId='{}'", key.getUrl(), key.getTenantId(), e);
+ return;
+ }
+
+ if ((response.getStatusCode() == 301) || (response.getStatusCode() == 500)) {
+ saveRetryPushNotificationInQueue(key);
+ }
+ }
+
+ private void saveRetryPushNotificationInQueue(final UUID tenantId, final String url, final String body) {
+ try {
+ final NotificationJson notificationJson = mapper.readValue(body, NotificationJson.class);
+ final PushNotificationKey key = new PushNotificationKey(tenantId,
+ notificationJson.getAccountId() != null ? UUID.fromString(notificationJson.getAccountId()) : null,
+ notificationJson.getEventType(),
+ notificationJson.getObjectType(),
+ notificationJson.getObjectId() != null ? UUID.fromString(notificationJson.getObjectId()) : null,
+ 0, url);
+ saveRetryPushNotificationInQueue(key);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void saveRetryPushNotificationInQueue(final PushNotificationKey key) {
+ final TenantContext tenantContext = contextFactory.createTenantContext(key.getTenantId());
+ key.increaseAttemptNumber();
+ final DateTime nextNotificationTime = getNextNotificationTime(key.getAttemptNumber(), internalCallContextFactory.createInternalTenantContextWithoutAccountRecordId(tenantContext));
+
+ if (nextNotificationTime == null) {
+ log.warn("Max attempt number reached for push notification url='{}', tenantId='{}'", key.getUrl(), key.getTenantId());
+ return;
+ }
+ log.debug("Push notification is schedule to send at {} for url='{}', tenantId='{}'", nextNotificationTime, key.getUrl(), key.getTenantId());
+ final Long accountRecordId = internalCallContextFactory.getRecordIdFromObject(key.getAccountId(), ObjectType.ACCOUNT, tenantContext);
+ final Long tenantRecordId = internalCallContextFactory.getRecordIdFromObject(key.getTenantId(), ObjectType.TENANT, tenantContext);
+ try {
+ final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(DefaultServerService.SERVER_SERVICE, PushNotificationRetryService.QUEUE_NAME);
+ notificationQueue.recordFutureNotification(nextNotificationTime, key, null, MoreObjects.firstNonNull(accountRecordId, new Long(0)), tenantRecordId);
+ } catch (NoSuchNotificationQueue noSuchNotificationQueue) {
+ log.error("Failed to push notification url='{}', tenantId='{}'", key.getUrl(), key.getTenantId(), noSuchNotificationQueue);
+ } catch (IOException e) {
+ log.error("Failed to push notification url='{}', tenantId='{}'", key.getUrl(), key.getTenantId(), e);
+ }
+ }
+
+ private DateTime getNextNotificationTime(final int attemptNumber, final InternalTenantContext tenantContext) {
+
+ final List<TimeSpan> retries = notificationConfig.getPushNotificationsRetries(tenantContext);
+ if (attemptNumber > retries.size()) {
+ return null;
+ }
+ final TimeSpan nextDelay = retries.get(attemptNumber - 1);
+ return clock.getUTCNow().plusMillis((int) nextDelay.getMillis());
+ }
+
private List<String> getCallbacksForTenant(final TenantContext context) throws TenantApiException {
return tenantApi.getTenantValuesForKey(TenantKey.PUSH_NOTIFICATION_CB.toString(), context);
}
diff --git a/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationRetryService.java b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationRetryService.java
new file mode 100644
index 0000000..94e805d
--- /dev/null
+++ b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationRetryService.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.server.notifications;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.callcontext.InternalCallContext;
+import org.killbill.billing.server.DefaultServerService;
+import org.killbill.billing.util.callcontext.CallOrigin;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.billing.util.callcontext.UserType;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueAlreadyExists;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.inject.Inject;
+
+public class PushNotificationRetryService {
+
+ private static final Logger log = LoggerFactory.getLogger(PushNotificationRetryService.class);
+ public static final String QUEUE_NAME = "push-notification-queue";
+ private static final String retryService = DefaultServerService.SERVER_SERVICE + "-" + QUEUE_NAME;
+
+ private final NotificationQueueService notificationQueueService;
+ private final InternalCallContextFactory internalCallContextFactory;
+ private final PushNotificationListener pushNotificationListener;
+
+ private NotificationQueue retryQueue;
+
+ @Inject
+ public PushNotificationRetryService(final NotificationQueueService notificationQueueService,
+ final InternalCallContextFactory internalCallContextFactory,
+ final PushNotificationListener pushNotificationListener) {
+ this.notificationQueueService = notificationQueueService;
+ this.internalCallContextFactory = internalCallContextFactory;
+ this.pushNotificationListener = pushNotificationListener;
+ }
+
+ public void initialize() throws NotificationQueueAlreadyExists {
+ retryQueue = notificationQueueService.createNotificationQueue(DefaultServerService.SERVER_SERVICE,
+ QUEUE_NAME,
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationEvent notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ if (!(notificationKey instanceof PushNotificationKey)) {
+ log.error("Push Notification service got an unexpected notification type {}", notificationKey.getClass().getName());
+ return;
+ }
+ final PushNotificationKey key = (PushNotificationKey) notificationKey;
+ final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, retryService, CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
+ try {
+ pushNotificationListener.resendPushNotification(key, callContext);
+ } catch (JsonProcessingException e) {
+ log.error("Failed to push notification url='{}', tenantId='{}'", key.getUrl(), key.getTenantId(), e);
+ }
+ }
+ }
+ );
+ }
+
+ public void start() {
+ retryQueue.startQueue();
+ }
+
+ public void stop() throws NoSuchNotificationQueue {
+ if (retryQueue != null) {
+ retryQueue.stopQueue();
+ notificationQueueService.deleteNotificationQueue(retryQueue.getServiceName(), retryQueue.getQueueName());
+ }
+ }
+
+}
diff --git a/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestPushNotification.java b/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestPushNotification.java
index 390e4ed..1c6b542 100644
--- a/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestPushNotification.java
+++ b/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestPushNotification.java
@@ -31,6 +31,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
+import org.killbill.billing.client.KillBillClientException;
import org.killbill.billing.client.model.TenantKey;
import org.killbill.billing.jaxrs.json.NotificationJson;
import org.killbill.billing.server.notifications.PushNotificationListener;
@@ -54,15 +55,18 @@ public class TestPushNotification extends TestJaxrsBase {
private volatile boolean callbackCompleted;
private volatile boolean callbackCompletedWithError;
+ private volatile int expectedNbCalls = 1;
+ private volatile boolean forceToFail = false;
+ private volatile int failedResponseStatus = HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
@Override
@BeforeMethod(groups = "slow")
public void beforeMethod() throws Exception {
super.beforeMethod();
callbackServer = new CallbackServer(this, SERVER_PORT, CALLBACK_ENDPOINT);
- callbackCompleted = false;
- callbackCompletedWithError = false;
+ resetCallbackStatusProperties();
callbackServer.startServer();
+ this.expectedNbCalls = 1;
}
@AfterMethod(groups = "slow")
@@ -85,7 +89,7 @@ public class TestPushNotification extends TestJaxrsBase {
public void retrieveAccountWithAsserts(final String accountId) {
try {
// Just check we can retrieve the account with the id from the callback
- killBillClient.getAccount(UUID.fromString(accountId));
+ killBillClient.getAccount(UUID.fromString(accountId), requestOptions);
} catch (final Exception e) {
Assert.fail(e.getMessage());
}
@@ -93,12 +97,12 @@ public class TestPushNotification extends TestJaxrsBase {
@Test(groups = "slow")
public void testPushNotification() throws Exception {
- // Register tenant for callback
- final String callback = "http://127.0.0.1:" + SERVER_PORT + CALLBACK_ENDPOINT;
- final TenantKey result0 = killBillClient.registerCallbackNotificationForTenant(callback, createdBy, reason, comment);
- Assert.assertEquals(result0.getKey(), TenantKV.TenantKey.PUSH_NOTIFICATION_CB.toString());
- Assert.assertEquals(result0.getValues().size(), 1);
- Assert.assertEquals(result0.getValues().get(0), callback);
+ final String callback = registerTenantForCallback();
+
+ // set expected number of calls
+ // 1st: was "eventType":"TENANT_CONFIG_CHANGE"
+ // 2nd: is "eventType":"ACCOUNT_CREATION"
+ this.expectedNbCalls = 2;
// Create account to trigger a push notification
createAccount();
@@ -112,17 +116,161 @@ public class TestPushNotification extends TestJaxrsBase {
Assert.fail("Assertion during callback failed...");
}
- final TenantKey result = killBillClient.getCallbackNotificationForTenant();
+ unregisterTenantForCallback(callback);
+ }
+
+ private void unregisterTenantForCallback(final String callback) throws KillBillClientException {
+ final TenantKey result = killBillClient.getCallbackNotificationForTenant(requestOptions);
Assert.assertEquals(result.getKey(), TenantKV.TenantKey.PUSH_NOTIFICATION_CB.toString());
Assert.assertEquals(result.getValues().size(), 1);
Assert.assertEquals(result.getValues().get(0), callback);
- killBillClient.unregisterCallbackNotificationForTenant(createdBy, reason, comment);
- final TenantKey result2 = killBillClient.getCallbackNotificationForTenant();
+ killBillClient.unregisterCallbackNotificationForTenant(requestOptions);
+ final TenantKey result2 = killBillClient.getCallbackNotificationForTenant(requestOptions);
Assert.assertEquals(result2.getKey(), TenantKV.TenantKey.PUSH_NOTIFICATION_CB.toString());
Assert.assertEquals(result2.getValues().size(), 0);
}
+ private String registerTenantForCallback() throws KillBillClientException, InterruptedException {// Register tenant for callback
+ final String callback = "http://127.0.0.1:" + SERVER_PORT + CALLBACK_ENDPOINT;
+ final TenantKey result0 = killBillClient.registerCallbackNotificationForTenant(callback, requestOptions);
+
+ Assert.assertTrue(waitForCallbacksToComplete());
+ Assert.assertTrue(callbackCompletedWithError); // expected true because is not an ACCOUNT_CREATION event
+
+ Assert.assertEquals(result0.getKey(), TenantKV.TenantKey.PUSH_NOTIFICATION_CB.toString());
+ Assert.assertEquals(result0.getValues().size(), 1);
+ Assert.assertEquals(result0.getValues().get(0), callback);
+
+ // reset values
+ resetCallbackStatusProperties();
+ return callback;
+ }
+
+ @Test(groups = "slow")
+ public void testPushNotificationRetries() throws Exception {
+ final String callback = registerTenantForCallback();
+
+ // force server to fail
+ // Notifications retries are set to:
+ // org.killbill.billing.server.notifications.retries=15m,1h,1d,2d
+ this.forceToFail = true;
+
+ // set expected number of calls
+ // 1st: was "eventType":"TENANT_CONFIG_CHANGE"
+ // 2nd: is original "eventType":"ACCOUNT_CREATION" call [force error]
+ // 3rd: is 1st notification retry (+ 15m) [force error]
+ // 4th: is 1st notification retry (+ 1h) [force error]
+ // 5th: is 1st notification retry (+ 1d) [success]
+ this.expectedNbCalls = 5;
+
+ // Create account to trigger a push notification
+ createAccount();
+
+ Assert.assertTrue(waitForCallbacksToComplete());
+ Assert.assertTrue(callbackCompletedWithError);
+
+ resetCallbackStatusProperties();
+
+ // move clock 15 minutes and get 1st retry
+ clock.addDeltaFromReality(900000);
+
+ Assert.assertTrue(waitForCallbacksToComplete());
+ Assert.assertTrue(callbackCompletedWithError);
+
+ resetCallbackStatusProperties();
+
+ // move clock an hour and get 2nd retry
+ clock.addDeltaFromReality(3600000);
+
+ Assert.assertTrue(waitForCallbacksToComplete());
+ Assert.assertTrue(callbackCompletedWithError);
+
+ resetCallbackStatusProperties();
+
+ // make call success
+ this.forceToFail = false;
+
+ // move clock a day, get 3rd retry and wait for a success push notification
+ clock.addDays(1);
+
+ Assert.assertTrue(waitForCallbacksToComplete());
+ Assert.assertFalse(callbackCompletedWithError);
+
+ unregisterTenantForCallback(callback);
+ }
+
+ @Test(groups = "slow")
+ public void testPushNotificationRetriesMaxAttemptNumber() throws Exception {
+ final String callback = registerTenantForCallback();
+
+ // force server to fail
+ // Notifications retries are set to:
+ // org.killbill.billing.server.notifications.retries=15m,1h,1d,2d
+ this.forceToFail = true;
+
+ // set expected number of calls
+ // 1st: was "eventType":"TENANT_CONFIG_CHANGE"
+ // 2nd: is original "eventType":"ACCOUNT_CREATION" call [force error]
+ // 3rd: is 1st notification retry (+ 15m) [force error]
+ // 4th: is 2nd notification retry (+ 1h) [force error]
+ // 5th: is 3rd notification retry (+ 1d) [force error]
+ // 6th: is 4th notification retry (+ 2d) [force error]
+ this.expectedNbCalls = 6;
+
+ // Create account to trigger a push notification
+ createAccount();
+
+ Assert.assertTrue(waitForCallbacksToComplete());
+ Assert.assertTrue(callbackCompletedWithError);
+
+ resetCallbackStatusProperties();
+
+ // move clock 15 minutes and get 1st retry
+ clock.addDeltaFromReality(900000);
+
+ Assert.assertTrue(waitForCallbacksToComplete());
+ Assert.assertTrue(callbackCompletedWithError);
+
+ resetCallbackStatusProperties();
+
+ // move clock an hour and get 2nd retry
+ clock.addDeltaFromReality(3600000);
+
+ Assert.assertTrue(waitForCallbacksToComplete());
+ Assert.assertTrue(callbackCompletedWithError);
+
+ resetCallbackStatusProperties();
+
+ // move clock a day and get 3rd retry
+ clock.addDays(1);
+
+ Assert.assertTrue(waitForCallbacksToComplete());
+ Assert.assertTrue(callbackCompletedWithError);
+
+ resetCallbackStatusProperties();
+
+ // move clock a day and get 4rd retry
+ clock.addDays(2);
+
+ Assert.assertTrue(waitForCallbacksToComplete());
+ Assert.assertTrue(callbackCompletedWithError);
+ resetCallbackStatusProperties();
+
+ clock.addDays(4);
+
+ Assert.assertFalse(waitForCallbacksToComplete());
+ Assert.assertFalse(callbackCompletedWithError);
+
+ unregisterTenantForCallback(callback);
+ }
+
+ private void resetCallbackStatusProperties() {
+ // reset values
+ this.callbackCompleted = false;
+ this.callbackCompletedWithError = false;
+ }
+
public void setCompleted(final boolean withError) {
callbackCompleted = true;
callbackCompletedWithError = withError;
@@ -144,7 +292,7 @@ public class TestPushNotification extends TestJaxrsBase {
final ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
server.setHandler(context);
- context.addServlet(new ServletHolder(new CallmebackServlet(test, 1)), callbackEndpoint);
+ context.addServlet(new ServletHolder(new CallmebackServlet(test)), callbackEndpoint);
server.start();
}
@@ -159,15 +307,13 @@ public class TestPushNotification extends TestJaxrsBase {
private static final Logger log = LoggerFactory.getLogger(CallmebackServlet.class);
- private final int expectedNbCalls;
private final AtomicInteger receivedCalls;
private final TestPushNotification test;
private final ObjectMapper objectMapper = new ObjectMapper();
private boolean withError;
- public CallmebackServlet(final TestPushNotification test, final int expectedNbCalls) {
- this.expectedNbCalls = expectedNbCalls;
+ public CallmebackServlet(final TestPushNotification test) {
this.test = test;
this.receivedCalls = new AtomicInteger(0);
this.withError = false;
@@ -176,8 +322,18 @@ public class TestPushNotification extends TestJaxrsBase {
@Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
final int current = receivedCalls.incrementAndGet();
-
final String body = CharStreams.toString(new InputStreamReader(request.getInputStream(), "UTF-8"));
+ withError = false;
+
+ log.info("CallmebackServlet received {} calls , current = {} at {}", current, body, getClock().getUTCNow());
+
+ if (test.forceToFail) {
+ response.setStatus(test.failedResponseStatus);
+ log.info("CallmebackServlet is force to fail for testing purposes");
+ test.setCompleted(true);
+ return;
+ }
+
response.setStatus(HttpServletResponse.SC_OK);
log.info("Got body {}", body);
@@ -197,15 +353,15 @@ public class TestPushNotification extends TestJaxrsBase {
withError = true;
}
- log.info("CallmebackServlet received {} calls , current = {}", current, body);
stopServerWhenComplete(current, withError);
}
private void stopServerWhenComplete(final int current, final boolean withError) {
- if (current == expectedNbCalls) {
+ if (current == test.expectedNbCalls) {
log.info("Excellent, we are done!");
test.setCompleted(withError);
}
}
+
}
}
diff --git a/profiles/killbill/src/test/resources/killbill.properties b/profiles/killbill/src/test/resources/killbill.properties
index c6def31..776243b 100644
--- a/profiles/killbill/src/test/resources/killbill.properties
+++ b/profiles/killbill/src/test/resources/killbill.properties
@@ -33,3 +33,5 @@ org.killbill.security.shiroNbHashIterations=10
org.killbill.tenant.broadcast.rate=1s
+# exponential delay retries for push notifications
+org.killbill.billing.server.notifications.retries=15m,1h,1d,2d
\ No newline at end of file
diff --git a/util/src/main/java/org/killbill/billing/util/config/definition/NotificationConfig.java b/util/src/main/java/org/killbill/billing/util/config/definition/NotificationConfig.java
new file mode 100644
index 0000000..b85670e
--- /dev/null
+++ b/util/src/main/java/org/killbill/billing/util/config/definition/NotificationConfig.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.config.definition;
+
+import java.util.List;
+
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.skife.config.Config;
+import org.skife.config.Default;
+import org.skife.config.Description;
+import org.skife.config.Param;
+import org.skife.config.TimeSpan;
+
+public interface NotificationConfig extends KillbillConfig {
+
+ @Config("org.killbill.billing.server.notifications.retries")
+ @Default("15m,30m,2h,12h,1d")
+ @Description("Delay before which unresolved push notifications should be retried")
+ List<TimeSpan> getPushNotificationsRetries();
+
+ @Config("org.killbill.billing.server.notifications.retries")
+ @Default("15m,30m,2h,12h,1d")
+ @Description("Delay before which unresolved push notifications should be retried")
+ List<TimeSpan> getPushNotificationsRetries(@Param("dummy") final InternalTenantContext tenantContext);
+
+}
\ No newline at end of file