killbill-aplcache

Merge pull request #589 from matias-aguero-hs/285-retries-push-notifications #285

8/3/2016 5:55:36 PM

Details

diff --git a/profiles/killbill/src/main/java/org/killbill/billing/server/config/MultiTenantNotificationConfig.java b/profiles/killbill/src/main/java/org/killbill/billing/server/config/MultiTenantNotificationConfig.java
new file mode 100644
index 0000000..cfea29b
--- /dev/null
+++ b/profiles/killbill/src/main/java/org/killbill/billing/server/config/MultiTenantNotificationConfig.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.server.config;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.payment.glue.PaymentModule;
+import org.killbill.billing.util.config.definition.NotificationConfig;
+import org.killbill.billing.util.config.tenant.CacheConfig;
+import org.killbill.billing.util.config.tenant.MultiTenantConfigBase;
+import org.skife.config.Param;
+import org.skife.config.TimeSpan;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class MultiTenantNotificationConfig extends MultiTenantConfigBase implements NotificationConfig {
+
+    private final NotificationConfig staticConfig;
+
+    @Inject
+    public MultiTenantNotificationConfig(@Named(PaymentModule.STATIC_CONFIG) final NotificationConfig staticConfig, final CacheConfig cacheConfig) {
+        super(cacheConfig);
+        this.staticConfig = staticConfig;
+    }
+
+    @Override
+    protected Method getConfigStaticMethod(final String methodName) {
+        try {
+            return NotificationConfig.class.getMethod(methodName, InternalTenantContext.class);
+        } catch (final NoSuchMethodException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public List<TimeSpan> getPushNotificationsRetries() {
+        return staticConfig.getPushNotificationsRetries();
+    }
+
+    @Override
+    public List<TimeSpan> getPushNotificationsRetries(@Param("dummy") final InternalTenantContext tenantContext) {
+        final Method method = new Object() {}.getClass().getEnclosingMethod();
+
+        final String result = getStringTenantConfig(method.getName(), tenantContext);
+        if (result != null) {
+            return convertToListTimeSpan(result, method.getName());
+        }
+        return getPushNotificationsRetries();
+    }
+}
diff --git a/profiles/killbill/src/main/java/org/killbill/billing/server/DefaultServerService.java b/profiles/killbill/src/main/java/org/killbill/billing/server/DefaultServerService.java
index a085bfa..cad84c9 100644
--- a/profiles/killbill/src/main/java/org/killbill/billing/server/DefaultServerService.java
+++ b/profiles/killbill/src/main/java/org/killbill/billing/server/DefaultServerService.java
@@ -25,8 +25,11 @@ import org.killbill.billing.lifecycle.glue.BusModule;
 import org.killbill.billing.platform.api.LifecycleHandlerType;
 import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
 import org.killbill.billing.server.notifications.PushNotificationListener;
+import org.killbill.billing.server.notifications.PushNotificationRetryService;
 import org.killbill.bus.api.PersistentBus;
 import org.killbill.bus.api.PersistentBus.EventBusException;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueAlreadyExists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,15 +37,19 @@ public class DefaultServerService implements ServerService {
 
     private static final Logger log = LoggerFactory.getLogger(DefaultServerService.class);
 
-    private static final String SERVER_SERVICE = "server-service";
+    public static final String SERVER_SERVICE = "server-service";
 
     private final PersistentBus bus;
     private final PushNotificationListener pushNotificationListener;
+    private final PushNotificationRetryService pushNotificationRetryService;
 
     @Inject
-    public DefaultServerService(@Named(BusModule.EXTERNAL_BUS_NAMED) final PersistentBus bus, final PushNotificationListener pushNotificationListener) {
+    public DefaultServerService(@Named(BusModule.EXTERNAL_BUS_NAMED) final PersistentBus bus,
+                                final PushNotificationListener pushNotificationListener,
+                                final PushNotificationRetryService pushNotificationRetryService) {
         this.bus = bus;
         this.pushNotificationListener = pushNotificationListener;
+        this.pushNotificationRetryService = pushNotificationRetryService;
     }
 
     @Override
@@ -51,20 +58,27 @@ public class DefaultServerService implements ServerService {
     }
 
     @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
-    public void registerForNotifications() {
+    public void registerForNotifications() throws NotificationQueueAlreadyExists {
         try {
             bus.register(pushNotificationListener);
         } catch (final EventBusException e) {
             log.warn("Failed to register PushNotificationListener", e);
         }
+        pushNotificationRetryService.initialize();
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+    public void start() {
+        pushNotificationRetryService.start();
     }
 
     @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
-    public void unregisterForNotifications() {
+    public void unregisterForNotifications() throws NoSuchNotificationQueue {
         try {
             bus.unregister(pushNotificationListener);
         } catch (final EventBusException e) {
             log.warn("Failed to unregister PushNotificationListener", e);
         }
+        pushNotificationRetryService.stop();
     }
 }
diff --git a/profiles/killbill/src/main/java/org/killbill/billing/server/modules/KillbillServerModule.java b/profiles/killbill/src/main/java/org/killbill/billing/server/modules/KillbillServerModule.java
index 8c32108..ff6de44 100644
--- a/profiles/killbill/src/main/java/org/killbill/billing/server/modules/KillbillServerModule.java
+++ b/profiles/killbill/src/main/java/org/killbill/billing/server/modules/KillbillServerModule.java
@@ -58,11 +58,14 @@ import org.killbill.billing.platform.api.KillbillConfigSource;
 import org.killbill.billing.server.DefaultServerService;
 import org.killbill.billing.server.ServerService;
 import org.killbill.billing.server.config.KillbillServerConfig;
+import org.killbill.billing.server.config.MultiTenantNotificationConfig;
 import org.killbill.billing.server.filters.ResponseCorsFilter;
 import org.killbill.billing.server.notifications.PushNotificationListener;
+import org.killbill.billing.server.notifications.PushNotificationRetryService;
 import org.killbill.billing.subscription.glue.DefaultSubscriptionModule;
 import org.killbill.billing.tenant.glue.DefaultTenantModule;
 import org.killbill.billing.usage.glue.UsageModule;
+import org.killbill.billing.util.config.definition.NotificationConfig;
 import org.killbill.billing.util.dao.AuditLogModelDaoMapper;
 import org.killbill.billing.util.dao.RecordIdIdMappingsMapper;
 import org.killbill.billing.util.email.EmailModule;
@@ -76,9 +79,9 @@ import org.killbill.billing.util.glue.ConfigModule;
 import org.killbill.billing.util.glue.CustomFieldModule;
 import org.killbill.billing.util.glue.ExportModule;
 import org.killbill.billing.util.glue.GlobalLockerModule;
-import org.killbill.billing.util.glue.NodesModule;
 import org.killbill.billing.util.glue.KillBillShiroAopModule;
 import org.killbill.billing.util.glue.KillbillApiAopModule;
+import org.killbill.billing.util.glue.NodesModule;
 import org.killbill.billing.util.glue.NonEntityDaoModule;
 import org.killbill.billing.util.glue.RecordIdModule;
 import org.killbill.billing.util.glue.SecurityModule;
@@ -88,15 +91,18 @@ import org.killbill.clock.Clock;
 import org.killbill.clock.ClockMock;
 import org.killbill.commons.embeddeddb.EmbeddedDB;
 import org.killbill.commons.jdbi.mapper.LowerToCamelBeanMapperFactory;
+import org.skife.config.ConfigurationObjectFactory;
 import org.skife.jdbi.v2.ResultSetMapperFactory;
 import org.skife.jdbi.v2.tweak.ResultSetMapper;
 
 import ch.qos.logback.classic.helpers.MDCInsertingServletFilter;
 import com.google.inject.multibindings.Multibinder;
+import com.google.inject.name.Names;
 
 public class KillbillServerModule extends KillbillPlatformModule {
 
     public static final String SHIRO_DATA_SOURCE_ID = "shiro";
+    public static final String STATIC_CONFIG = "StaticConfig";
 
     public KillbillServerModule(final ServletContext servletContext, final KillbillServerConfig serverConfig, final KillbillConfigSource configSource) {
         super(servletContext, serverConfig, configSource);
@@ -207,7 +213,12 @@ public class KillbillServerModule extends KillbillPlatformModule {
     }
 
     protected void configurePushNotification() {
-        bind(ServerService.class).to(DefaultServerService.class).asEagerSingleton();
+        final ConfigurationObjectFactory factory = new ConfigurationObjectFactory(skifeConfigSource);
+        final NotificationConfig notificationConfig = factory.build(NotificationConfig.class);
+        bind(NotificationConfig.class).annotatedWith(Names.named(STATIC_CONFIG)).toInstance(notificationConfig);
+        bind(NotificationConfig.class).to(MultiTenantNotificationConfig.class).asEagerSingleton();
         bind(PushNotificationListener.class).asEagerSingleton();
+        bind(PushNotificationRetryService.class).asEagerSingleton();
+        bind(ServerService.class).to(DefaultServerService.class).asEagerSingleton();
     }
 }
diff --git a/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationKey.java b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationKey.java
new file mode 100644
index 0000000..e72dec3
--- /dev/null
+++ b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationKey.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.server.notifications;
+
+import java.util.UUID;
+
+import org.killbill.notificationq.api.NotificationEvent;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class PushNotificationKey implements NotificationEvent {
+
+    private final UUID tenantId;
+    private final UUID accountId;
+    private final String eventType;
+    private final String objectType;
+    private final UUID objectId;
+    private final int attemptNumber;
+    private final String url;
+
+    @JsonCreator
+    public PushNotificationKey(@JsonProperty("tenantId") final UUID tenantId,
+                               @JsonProperty("accountId") final UUID accountId,
+                               @JsonProperty("eventType") final String eventType,
+                               @JsonProperty("objectType") final String objectType,
+                               @JsonProperty("objectId") final UUID objectId,
+                               @JsonProperty("attemptNumber")  final int attemptNumber,
+                               @JsonProperty("url") final String url) {
+        this.tenantId = tenantId;
+        this.accountId = accountId;
+        this.eventType = eventType;
+        this.objectType = objectType;
+        this.objectId = objectId;
+        this.attemptNumber = attemptNumber;
+        this.url = url;
+    }
+
+    public PushNotificationKey(final PushNotificationKey key, final int attemptNumber) {
+        this(key.getTenantId(), key.getAccountId(), key.getEventType(), key.getObjectType(), key.getObjectId(),
+             attemptNumber, key.getUrl());
+    }
+
+    public UUID getTenantId() {
+        return tenantId;
+    }
+
+    public UUID getAccountId() {
+        return accountId;
+    }
+
+    public String getEventType() {
+        return eventType;
+    }
+
+    public String getObjectType() {
+        return objectType;
+    }
+
+    public UUID getObjectId() {
+        return objectId;
+    }
+
+    public Integer getAttemptNumber() {
+        return attemptNumber;
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    @Override
+    public String toString() {
+        return "PushNotificationKey{" +
+               "tenantId=" + tenantId +
+               ", accountId=" + accountId +
+               ", eventType='" + eventType + '\'' +
+               ", objectType='" + objectType + '\'' +
+               ", objectId=" + objectId +
+               ", attemptNumber=" + attemptNumber +
+               ", url='" + url + '\'' +
+               '}';
+    }
+}
diff --git a/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java
index 7e3a993..afa18cf 100644
--- a/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java
+++ b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java
@@ -22,16 +22,28 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.inject.Inject;
 
+import org.joda.time.DateTime;
+import org.killbill.billing.ObjectType;
+import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.jaxrs.json.NotificationJson;
 import org.killbill.billing.notification.plugin.api.ExtBusEvent;
+import org.killbill.billing.server.DefaultServerService;
 import org.killbill.billing.tenant.api.TenantApiException;
 import org.killbill.billing.tenant.api.TenantKV.TenantKey;
 import org.killbill.billing.tenant.api.TenantUserApi;
 import org.killbill.billing.util.callcontext.CallContextFactory;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.TenantContext;
+import org.killbill.billing.util.config.definition.NotificationConfig;
+import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.skife.config.TimeSpan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,8 +54,10 @@ import com.ning.http.client.AsyncHttpClientConfig;
 import com.ning.http.client.ListenableFuture;
 import com.ning.http.client.Response;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
 import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 
@@ -62,13 +76,23 @@ public class PushNotificationListener {
     private final CallContextFactory contextFactory;
     private final AsyncHttpClient httpClient;
     private final ObjectMapper mapper;
+    private final NotificationQueueService notificationQueueService;
+    private final InternalCallContextFactory internalCallContextFactory;
+    private final Clock clock;
+    private final NotificationConfig notificationConfig;
 
     @Inject
-    public PushNotificationListener(final ObjectMapper mapper, final TenantUserApi tenantApi, final CallContextFactory contextFactory) {
+    public PushNotificationListener(final ObjectMapper mapper, final TenantUserApi tenantApi, final CallContextFactory contextFactory,
+                                    final NotificationQueueService notificationQueueService, final InternalCallContextFactory internalCallContextFactory,
+                                    final Clock clock, final NotificationConfig notificationConfig) {
         this.httpClient = new AsyncHttpClient(new AsyncHttpClientConfig.Builder().setRequestTimeout(TIMEOUT_NOTIFICATION * 1000).build());
         this.tenantApi = tenantApi;
         this.contextFactory = contextFactory;
         this.mapper = mapper;
+        this.notificationQueueService = notificationQueueService;
+        this.internalCallContextFactory = internalCallContextFactory;
+        this.clock = clock;
+        this.notificationConfig = notificationConfig;
     }
 
     @AllowConcurrentEvents
@@ -93,11 +117,12 @@ public class PushNotificationListener {
         final NotificationJson notification = new NotificationJson(event);
         final String body = mapper.writeValueAsString(notification);
         for (final String cur : callbacks) {
-            doPost(tenantId, cur, body, TIMEOUT_NOTIFICATION);
+            doPost(tenantId, cur, body, notification, TIMEOUT_NOTIFICATION, 0);
         }
     }
 
-    private boolean doPost(final UUID tenantId, final String url, final String body, final int timeoutSec) {
+    private boolean doPost(final UUID tenantId, final String url, final String body, final NotificationJson notification,
+                           final int timeoutSec, final int attemptRetryNumber) {
         final BoundRequestBuilder builder = httpClient.preparePost(url);
         builder.setBody(body == null ? "{}" : body);
         builder.addHeader(HTTP_HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON);
@@ -112,11 +137,69 @@ public class PushNotificationListener {
                         }
                     });
             response = futureStatus.get(timeoutSec, TimeUnit.SECONDS);
+        } catch (final TimeoutException toe) {
+            saveRetryPushNotificationInQueue(tenantId, url, notification, attemptRetryNumber);
+            return false;
         } catch (final Exception e) {
             log.warn("Failed to push notification url='{}', tenantId='{}'", url, tenantId, e);
             return false;
         }
-        return response.getStatusCode() >= 200 && response.getStatusCode() < 300;
+
+        if (response.getStatusCode() >= 200 && response.getStatusCode() < 300) {
+            return true;
+        } else {
+            saveRetryPushNotificationInQueue(tenantId, url, notification, attemptRetryNumber);
+            return false;
+        }
+    }
+
+    public void resendPushNotification(final PushNotificationKey key) throws JsonProcessingException {
+
+        final NotificationJson notification = new NotificationJson(key.getEventType() != null ? key.getEventType().toString() : null,
+                                                                   key.getAccountId() != null ? key.getAccountId().toString() : null,
+                                                                   key.getObjectType() != null ? key.getObjectType().toString() : null,
+                                                                   key.getObjectId() != null ? key.getObjectId().toString() : null);
+        final String body = mapper.writeValueAsString(notification);
+        doPost(key.getTenantId(), key.getUrl(), body, notification, TIMEOUT_NOTIFICATION, key.getAttemptNumber());
+    }
+
+    private void saveRetryPushNotificationInQueue(final UUID tenantId, final String url, final NotificationJson notificationJson, final int attemptRetryNumber) {
+        final PushNotificationKey key = new PushNotificationKey(tenantId,
+                                                                notificationJson.getAccountId() != null ? UUID.fromString(notificationJson.getAccountId()) : null,
+                                                                notificationJson.getEventType(),
+                                                                notificationJson.getObjectType(),
+                                                                notificationJson.getObjectId() != null ? UUID.fromString(notificationJson.getObjectId()) : null,
+                                                                attemptRetryNumber + 1, url);
+
+        final TenantContext tenantContext = contextFactory.createTenantContext(tenantId);
+        final DateTime nextNotificationTime = getNextNotificationTime(key.getAttemptNumber(), internalCallContextFactory.createInternalTenantContextWithoutAccountRecordId(tenantContext));
+
+        if (nextNotificationTime == null) {
+            log.warn("Max attempt number reached for push notification url='{}', tenantId='{}'", key.getUrl(), key.getTenantId());
+            return;
+        }
+        log.debug("Push notification is scheduled to send at {} for url='{}', tenantId='{}'", nextNotificationTime, key.getUrl(), key.getTenantId());
+
+        final Long accountRecordId = internalCallContextFactory.getRecordIdFromObject(key.getAccountId(), ObjectType.ACCOUNT, tenantContext);
+        final Long tenantRecordId = internalCallContextFactory.getRecordIdFromObject(key.getTenantId(), ObjectType.TENANT, tenantContext);
+        try {
+            final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(DefaultServerService.SERVER_SERVICE, PushNotificationRetryService.QUEUE_NAME);
+            notificationQueue.recordFutureNotification(nextNotificationTime, key, null, MoreObjects.firstNonNull(accountRecordId, new Long(0)), tenantRecordId);
+        } catch (NoSuchNotificationQueue noSuchNotificationQueue) {
+            log.error("Failed to push notification url='{}', tenantId='{}'", key.getUrl(), key.getTenantId(), noSuchNotificationQueue);
+        } catch (IOException e) {
+            log.error("Failed to push notification url='{}', tenantId='{}'", key.getUrl(), key.getTenantId(), e);
+        }
+    }
+
+    private DateTime getNextNotificationTime(final int attemptNumber, final InternalTenantContext tenantContext) {
+
+        final List<TimeSpan> retries = notificationConfig.getPushNotificationsRetries(tenantContext);
+        if (attemptNumber > retries.size()) {
+            return null;
+        }
+        final TimeSpan nextDelay = retries.get(attemptNumber - 1);
+        return clock.getUTCNow().plusMillis((int) nextDelay.getMillis());
     }
 
     private List<String> getCallbacksForTenant(final TenantContext context) throws TenantApiException {
diff --git a/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationRetryService.java b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationRetryService.java
new file mode 100644
index 0000000..5284a21
--- /dev/null
+++ b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationRetryService.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.server.notifications;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.server.DefaultServerService;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueAlreadyExists;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.inject.Inject;
+
+public class PushNotificationRetryService {
+
+    private static final Logger log = LoggerFactory.getLogger(PushNotificationRetryService.class);
+    public static final String QUEUE_NAME = "push-notification-queue";
+    private static final String retryService = DefaultServerService.SERVER_SERVICE + "-" + QUEUE_NAME;
+
+    private final NotificationQueueService notificationQueueService;
+    private final InternalCallContextFactory internalCallContextFactory;
+    private final PushNotificationListener pushNotificationListener;
+
+    private NotificationQueue retryQueue;
+
+    @Inject
+    public PushNotificationRetryService(final NotificationQueueService notificationQueueService,
+                                        final InternalCallContextFactory internalCallContextFactory,
+                                        final PushNotificationListener pushNotificationListener) {
+        this.notificationQueueService = notificationQueueService;
+        this.internalCallContextFactory = internalCallContextFactory;
+        this.pushNotificationListener = pushNotificationListener;
+    }
+
+    public void initialize() throws NotificationQueueAlreadyExists {
+        retryQueue = notificationQueueService.createNotificationQueue(DefaultServerService.SERVER_SERVICE,
+                                                                      QUEUE_NAME,
+                                                                      new NotificationQueueHandler() {
+                                                                          @Override
+                                                                          public void handleReadyNotification(final NotificationEvent notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+                                                                              if (!(notificationKey instanceof PushNotificationKey)) {
+                                                                                  log.error("Push Notification service got an unexpected notification type {}", notificationKey.getClass().getName());
+                                                                                  return;
+                                                                              }
+                                                                              final PushNotificationKey key = (PushNotificationKey) notificationKey;
+                                                                              try {
+                                                                                  pushNotificationListener.resendPushNotification(key);
+                                                                              } catch (JsonProcessingException e) {
+                                                                                  log.error("Failed to push notification url='{}', tenantId='{}'", key.getUrl(), key.getTenantId(), e);
+                                                                              }
+                                                                          }
+                                                                      }
+                                                                     );
+    }
+
+    public void start() {
+        retryQueue.startQueue();
+    }
+
+    public void stop() throws NoSuchNotificationQueue {
+        if (retryQueue != null) {
+            retryQueue.stopQueue();
+            notificationQueueService.deleteNotificationQueue(retryQueue.getServiceName(), retryQueue.getQueueName());
+        }
+    }
+
+}
diff --git a/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestPushNotification.java b/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestPushNotification.java
index 390e4ed..1c6b542 100644
--- a/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestPushNotification.java
+++ b/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestPushNotification.java
@@ -31,6 +31,7 @@ import javax.servlet.http.HttpServletResponse;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.killbill.billing.client.KillBillClientException;
 import org.killbill.billing.client.model.TenantKey;
 import org.killbill.billing.jaxrs.json.NotificationJson;
 import org.killbill.billing.server.notifications.PushNotificationListener;
@@ -54,15 +55,18 @@ public class TestPushNotification extends TestJaxrsBase {
 
     private volatile boolean callbackCompleted;
     private volatile boolean callbackCompletedWithError;
+    private volatile int expectedNbCalls = 1;
+    private volatile boolean forceToFail = false;
+    private volatile int failedResponseStatus = HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
 
     @Override
     @BeforeMethod(groups = "slow")
     public void beforeMethod() throws Exception {
         super.beforeMethod();
         callbackServer = new CallbackServer(this, SERVER_PORT, CALLBACK_ENDPOINT);
-        callbackCompleted = false;
-        callbackCompletedWithError = false;
+        resetCallbackStatusProperties();
         callbackServer.startServer();
+        this.expectedNbCalls = 1;
     }
 
     @AfterMethod(groups = "slow")
@@ -85,7 +89,7 @@ public class TestPushNotification extends TestJaxrsBase {
     public void retrieveAccountWithAsserts(final String accountId) {
         try {
             // Just check we can retrieve the account with the id from the callback
-            killBillClient.getAccount(UUID.fromString(accountId));
+            killBillClient.getAccount(UUID.fromString(accountId), requestOptions);
         } catch (final Exception e) {
             Assert.fail(e.getMessage());
         }
@@ -93,12 +97,12 @@ public class TestPushNotification extends TestJaxrsBase {
 
     @Test(groups = "slow")
     public void testPushNotification() throws Exception {
-        // Register tenant for callback
-        final String callback = "http://127.0.0.1:" + SERVER_PORT + CALLBACK_ENDPOINT;
-        final TenantKey result0 = killBillClient.registerCallbackNotificationForTenant(callback, createdBy, reason, comment);
-        Assert.assertEquals(result0.getKey(), TenantKV.TenantKey.PUSH_NOTIFICATION_CB.toString());
-        Assert.assertEquals(result0.getValues().size(), 1);
-        Assert.assertEquals(result0.getValues().get(0), callback);
+        final String callback = registerTenantForCallback();
+
+        // set expected number of calls
+        // 1st: was "eventType":"TENANT_CONFIG_CHANGE"
+        // 2nd: is "eventType":"ACCOUNT_CREATION"
+        this.expectedNbCalls = 2;
 
         // Create account to trigger a push notification
         createAccount();
@@ -112,17 +116,161 @@ public class TestPushNotification extends TestJaxrsBase {
             Assert.fail("Assertion during callback failed...");
         }
 
-        final TenantKey result = killBillClient.getCallbackNotificationForTenant();
+        unregisterTenantForCallback(callback);
+    }
+
+    private void unregisterTenantForCallback(final String callback) throws KillBillClientException {
+        final TenantKey result = killBillClient.getCallbackNotificationForTenant(requestOptions);
         Assert.assertEquals(result.getKey(), TenantKV.TenantKey.PUSH_NOTIFICATION_CB.toString());
         Assert.assertEquals(result.getValues().size(), 1);
         Assert.assertEquals(result.getValues().get(0), callback);
 
-        killBillClient.unregisterCallbackNotificationForTenant(createdBy, reason, comment);
-        final TenantKey result2 = killBillClient.getCallbackNotificationForTenant();
+        killBillClient.unregisterCallbackNotificationForTenant(requestOptions);
+        final TenantKey result2 = killBillClient.getCallbackNotificationForTenant(requestOptions);
         Assert.assertEquals(result2.getKey(), TenantKV.TenantKey.PUSH_NOTIFICATION_CB.toString());
         Assert.assertEquals(result2.getValues().size(), 0);
     }
 
+    private String registerTenantForCallback() throws KillBillClientException, InterruptedException {// Register tenant for callback
+        final String callback = "http://127.0.0.1:" + SERVER_PORT + CALLBACK_ENDPOINT;
+        final TenantKey result0 = killBillClient.registerCallbackNotificationForTenant(callback, requestOptions);
+
+        Assert.assertTrue(waitForCallbacksToComplete());
+        Assert.assertTrue(callbackCompletedWithError); // expected true because is not an ACCOUNT_CREATION event
+
+        Assert.assertEquals(result0.getKey(), TenantKV.TenantKey.PUSH_NOTIFICATION_CB.toString());
+        Assert.assertEquals(result0.getValues().size(), 1);
+        Assert.assertEquals(result0.getValues().get(0), callback);
+
+        // reset values
+        resetCallbackStatusProperties();
+        return callback;
+    }
+
+    @Test(groups = "slow")
+    public void testPushNotificationRetries() throws Exception {
+        final String callback = registerTenantForCallback();
+
+        // force server to fail
+        // Notifications retries are set to:
+        // org.killbill.billing.server.notifications.retries=15m,1h,1d,2d
+        this.forceToFail = true;
+
+        // set expected number of calls
+        // 1st: was "eventType":"TENANT_CONFIG_CHANGE"
+        // 2nd: is original "eventType":"ACCOUNT_CREATION" call [force error]
+        // 3rd: is 1st notification retry (+ 15m) [force error]
+        // 4th: is 1st notification retry (+ 1h) [force error]
+        // 5th: is 1st notification retry (+ 1d) [success]
+        this.expectedNbCalls = 5;
+
+        // Create account to trigger a push notification
+        createAccount();
+
+        Assert.assertTrue(waitForCallbacksToComplete());
+        Assert.assertTrue(callbackCompletedWithError);
+
+        resetCallbackStatusProperties();
+
+        // move clock 15 minutes and get 1st retry
+        clock.addDeltaFromReality(900000);
+
+        Assert.assertTrue(waitForCallbacksToComplete());
+        Assert.assertTrue(callbackCompletedWithError);
+
+        resetCallbackStatusProperties();
+
+        // move clock an hour and get 2nd retry
+        clock.addDeltaFromReality(3600000);
+
+        Assert.assertTrue(waitForCallbacksToComplete());
+        Assert.assertTrue(callbackCompletedWithError);
+
+        resetCallbackStatusProperties();
+
+        // make call success
+        this.forceToFail = false;
+
+        // move clock a day, get 3rd retry and wait for a success push notification
+        clock.addDays(1);
+
+        Assert.assertTrue(waitForCallbacksToComplete());
+        Assert.assertFalse(callbackCompletedWithError);
+
+        unregisterTenantForCallback(callback);
+    }
+
+    @Test(groups = "slow")
+    public void testPushNotificationRetriesMaxAttemptNumber() throws Exception {
+        final String callback = registerTenantForCallback();
+
+        // force server to fail
+        // Notifications retries are set to:
+        // org.killbill.billing.server.notifications.retries=15m,1h,1d,2d
+        this.forceToFail = true;
+
+        // set expected number of calls
+        // 1st: was "eventType":"TENANT_CONFIG_CHANGE"
+        // 2nd: is original "eventType":"ACCOUNT_CREATION" call [force error]
+        // 3rd: is 1st notification retry (+ 15m) [force error]
+        // 4th: is 2nd notification retry (+ 1h) [force error]
+        // 5th: is 3rd notification retry (+ 1d) [force error]
+        // 6th: is 4th notification retry (+ 2d) [force error]
+        this.expectedNbCalls = 6;
+
+        // Create account to trigger a push notification
+        createAccount();
+
+        Assert.assertTrue(waitForCallbacksToComplete());
+        Assert.assertTrue(callbackCompletedWithError);
+
+        resetCallbackStatusProperties();
+
+        // move clock 15 minutes and get 1st retry
+        clock.addDeltaFromReality(900000);
+
+        Assert.assertTrue(waitForCallbacksToComplete());
+        Assert.assertTrue(callbackCompletedWithError);
+
+        resetCallbackStatusProperties();
+
+        // move clock an hour and get 2nd retry
+        clock.addDeltaFromReality(3600000);
+
+        Assert.assertTrue(waitForCallbacksToComplete());
+        Assert.assertTrue(callbackCompletedWithError);
+
+        resetCallbackStatusProperties();
+
+        // move clock a day and get 3rd retry
+        clock.addDays(1);
+
+        Assert.assertTrue(waitForCallbacksToComplete());
+        Assert.assertTrue(callbackCompletedWithError);
+
+        resetCallbackStatusProperties();
+
+        // move clock a day and get 4rd retry
+        clock.addDays(2);
+
+        Assert.assertTrue(waitForCallbacksToComplete());
+        Assert.assertTrue(callbackCompletedWithError);
+        resetCallbackStatusProperties();
+
+        clock.addDays(4);
+
+        Assert.assertFalse(waitForCallbacksToComplete());
+        Assert.assertFalse(callbackCompletedWithError);
+
+        unregisterTenantForCallback(callback);
+    }
+
+    private void resetCallbackStatusProperties() {
+        // reset values
+        this.callbackCompleted = false;
+        this.callbackCompletedWithError = false;
+    }
+
     public void setCompleted(final boolean withError) {
         callbackCompleted = true;
         callbackCompletedWithError = withError;
@@ -144,7 +292,7 @@ public class TestPushNotification extends TestJaxrsBase {
             final ServletContextHandler context = new ServletContextHandler();
             context.setContextPath("/");
             server.setHandler(context);
-            context.addServlet(new ServletHolder(new CallmebackServlet(test, 1)), callbackEndpoint);
+            context.addServlet(new ServletHolder(new CallmebackServlet(test)), callbackEndpoint);
             server.start();
         }
 
@@ -159,15 +307,13 @@ public class TestPushNotification extends TestJaxrsBase {
 
         private static final Logger log = LoggerFactory.getLogger(CallmebackServlet.class);
 
-        private final int expectedNbCalls;
         private final AtomicInteger receivedCalls;
         private final TestPushNotification test;
         private final ObjectMapper objectMapper = new ObjectMapper();
 
         private boolean withError;
 
-        public CallmebackServlet(final TestPushNotification test, final int expectedNbCalls) {
-            this.expectedNbCalls = expectedNbCalls;
+        public CallmebackServlet(final TestPushNotification test) {
             this.test = test;
             this.receivedCalls = new AtomicInteger(0);
             this.withError = false;
@@ -176,8 +322,18 @@ public class TestPushNotification extends TestJaxrsBase {
         @Override
         protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
             final int current = receivedCalls.incrementAndGet();
-
             final String body = CharStreams.toString(new InputStreamReader(request.getInputStream(), "UTF-8"));
+            withError = false;
+
+            log.info("CallmebackServlet received {} calls , current = {} at {}", current, body, getClock().getUTCNow());
+
+            if (test.forceToFail) {
+                response.setStatus(test.failedResponseStatus);
+                log.info("CallmebackServlet is force to fail for testing purposes");
+                test.setCompleted(true);
+                return;
+            }
+
             response.setStatus(HttpServletResponse.SC_OK);
 
             log.info("Got body {}", body);
@@ -197,15 +353,15 @@ public class TestPushNotification extends TestJaxrsBase {
                 withError = true;
             }
 
-            log.info("CallmebackServlet received {} calls , current = {}", current, body);
             stopServerWhenComplete(current, withError);
         }
 
         private void stopServerWhenComplete(final int current, final boolean withError) {
-            if (current == expectedNbCalls) {
+            if (current == test.expectedNbCalls) {
                 log.info("Excellent, we are done!");
                 test.setCompleted(withError);
             }
         }
+
     }
 }
diff --git a/profiles/killbill/src/test/resources/killbill.properties b/profiles/killbill/src/test/resources/killbill.properties
index c6def31..776243b 100644
--- a/profiles/killbill/src/test/resources/killbill.properties
+++ b/profiles/killbill/src/test/resources/killbill.properties
@@ -33,3 +33,5 @@ org.killbill.security.shiroNbHashIterations=10
 
 org.killbill.tenant.broadcast.rate=1s
 
+# exponential delay retries for push notifications
+org.killbill.billing.server.notifications.retries=15m,1h,1d,2d
\ No newline at end of file
diff --git a/util/src/main/java/org/killbill/billing/util/config/definition/NotificationConfig.java b/util/src/main/java/org/killbill/billing/util/config/definition/NotificationConfig.java
new file mode 100644
index 0000000..b85670e
--- /dev/null
+++ b/util/src/main/java/org/killbill/billing/util/config/definition/NotificationConfig.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.config.definition;
+
+import java.util.List;
+
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.skife.config.Config;
+import org.skife.config.Default;
+import org.skife.config.Description;
+import org.skife.config.Param;
+import org.skife.config.TimeSpan;
+
+public interface NotificationConfig extends KillbillConfig {
+
+    @Config("org.killbill.billing.server.notifications.retries")
+    @Default("15m,30m,2h,12h,1d")
+    @Description("Delay before which unresolved push notifications should be retried")
+    List<TimeSpan> getPushNotificationsRetries();
+
+    @Config("org.killbill.billing.server.notifications.retries")
+    @Default("15m,30m,2h,12h,1d")
+    @Description("Delay before which unresolved push notifications should be retried")
+    List<TimeSpan> getPushNotificationsRetries(@Param("dummy") final InternalTenantContext tenantContext);
+
+}
\ No newline at end of file