killbill-aplcache

util: add tests to verify retry behavior for bus events This

8/3/2017 2:14:47 PM

Details

diff --git a/util/src/main/java/org/killbill/billing/util/listener/RetryableService.java b/util/src/main/java/org/killbill/billing/util/listener/RetryableService.java
index 65e2626..307696c 100644
--- a/util/src/main/java/org/killbill/billing/util/listener/RetryableService.java
+++ b/util/src/main/java/org/killbill/billing/util/listener/RetryableService.java
@@ -146,11 +146,8 @@ public abstract class RetryableService {
 
     private DateTime computeRetryDate(final RetryException retryException, final DateTime initialEventDateTime, final int retryNb) {
         final List<Period> retrySchedule = retryException.getRetrySchedule();
-        if (retrySchedule == null) {
+        if (retrySchedule == null || retryNb > retrySchedule.size()) {
             return null;
-        }
-        if (retryNb > retrySchedule.size()) {
-            return initialEventDateTime.plusDays(retryNb - retrySchedule.size());
         } else {
             final Period nextDelay = retrySchedule.get(retryNb - 1);
             return initialEventDateTime.plus(nextDelay);
diff --git a/util/src/main/java/org/killbill/billing/util/listener/RetryableSubscriber.java b/util/src/main/java/org/killbill/billing/util/listener/RetryableSubscriber.java
index cf0df1a..2166acc 100644
--- a/util/src/main/java/org/killbill/billing/util/listener/RetryableSubscriber.java
+++ b/util/src/main/java/org/killbill/billing/util/listener/RetryableSubscriber.java
@@ -77,7 +77,7 @@ public class RetryableSubscriber extends RetryableHandler {
 
         public SubscriberQueueHandler() {}
 
-        public void subscribe(final Class<? extends BusEvent> busEventClass, final SubscriberAction<? extends BusEvent> action) {
+        public <B extends BusEvent> void subscribe(final Class<B> busEventClass, final SubscriberAction<B> action) {
             actions.put(busEventClass, action);
         }
 
diff --git a/util/src/main/java/org/killbill/billing/util/listener/RetryException.java b/util/src/main/java/org/killbill/billing/util/listener/RetryException.java
index 266f7c2..3cf4c81 100644
--- a/util/src/main/java/org/killbill/billing/util/listener/RetryException.java
+++ b/util/src/main/java/org/killbill/billing/util/listener/RetryException.java
@@ -42,6 +42,10 @@ public class RetryException extends RuntimeException {
         this(e, null);
     }
 
+    public RetryException(@Nullable final List<Period> retrySchedule) {
+        this(null, retrySchedule);
+    }
+
     public RetryException(final Exception e, @Nullable final List<Period> retrySchedule) {
         super(e);
         this.retrySchedule = retrySchedule != null ? retrySchedule : DEFAULT_RETRY_SCHEDULE;
diff --git a/util/src/test/java/org/killbill/billing/util/listener/TestRetryableService.java b/util/src/test/java/org/killbill/billing/util/listener/TestRetryableService.java
new file mode 100644
index 0000000..c09312f
--- /dev/null
+++ b/util/src/test/java/org/killbill/billing/util/listener/TestRetryableService.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.listener;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.killbill.billing.ObjectType;
+import org.killbill.billing.account.api.ImmutableAccountData;
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.events.BusInternalEvent;
+import org.killbill.billing.events.ControlTagCreationInternalEvent;
+import org.killbill.billing.events.ControlTagDeletionInternalEvent;
+import org.killbill.billing.util.UtilTestSuiteWithEmbeddedDB;
+import org.killbill.billing.util.listener.RetryableSubscriber.SubscriberAction;
+import org.killbill.billing.util.listener.RetryableSubscriber.SubscriberQueueHandler;
+import org.killbill.billing.util.tag.DefaultTagDefinition;
+import org.killbill.billing.util.tag.api.user.DefaultControlTagCreationEvent;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class TestRetryableService extends UtilTestSuiteWithEmbeddedDB {
+
+    private static final String TEST_LISTENER = "TestListener";
+    private static final ImmutableList<Period> RETRY_SCHEDULE = ImmutableList.<Period>of(Period.hours(1), Period.days(1));
+
+    private ControlTagCreationInternalEvent event;
+    private TestListener testListener;
+
+    @BeforeClass(groups = "slow")
+    public void setUpClass() throws Exception {
+        final ImmutableAccountData immutableAccountData = Mockito.mock(ImmutableAccountData.class);
+        Mockito.when(immutableAccountInternalApi.getImmutableAccountDataByRecordId(Mockito.<Long>eq(internalCallContext.getAccountRecordId()), Mockito.<InternalTenantContext>any())).thenReturn(immutableAccountData);
+    }
+
+    @BeforeMethod(groups = "slow")
+    public void setUp() throws Exception {
+        event = new DefaultControlTagCreationEvent(UUID.randomUUID(),
+                                                   UUID.randomUUID(),
+                                                   ObjectType.ACCOUNT,
+                                                   new DefaultTagDefinition("name", "description", false),
+                                                   internalCallContext.getAccountRecordId(),
+                                                   internalCallContext.getTenantRecordId(),
+                                                   UUID.randomUUID());
+
+        testListener = new TestListener();
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+    }
+
+    @AfterMethod(groups = "slow")
+    public void tearDown() throws Exception {
+        testListener.stop();
+    }
+
+    @Test(groups = "slow")
+    public void testFixUp() throws Exception {
+        testListener.throwRetryableException = true;
+
+        final DateTime startTime = clock.getUTCNow();
+        testListener.handleEvent(event);
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        List<NotificationEventWithMetadata> futureRetryableEvents = getFutureRetryableEvents();
+        Assert.assertEquals(futureRetryableEvents.size(), 1);
+        Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(0))), 0);
+
+        clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        futureRetryableEvents = getFutureRetryableEvents();
+        Assert.assertEquals(futureRetryableEvents.size(), 1);
+        Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(1))), 0);
+
+        testListener.throwRetryableException = false;
+
+        clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 1);
+        Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+    }
+
+    @Test(groups = "slow")
+    public void testGiveUp() throws Exception {
+        testListener.throwRetryableException = true;
+
+        final DateTime startTime = clock.getUTCNow();
+        testListener.handleEvent(event);
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        List<NotificationEventWithMetadata> futureRetryableEvents = getFutureRetryableEvents();
+        Assert.assertEquals(futureRetryableEvents.size(), 1);
+        Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(0))), 0);
+
+        clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        futureRetryableEvents = getFutureRetryableEvents();
+        Assert.assertEquals(futureRetryableEvents.size(), 1);
+        Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(1))), 0);
+
+        clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        // Give up
+        Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+    }
+
+    @Test(groups = "slow")
+    public void testNotRetryableException() throws Exception {
+        testListener.throwOtherException = true;
+
+        try {
+            testListener.handleEvent(event);
+            Assert.fail("Expected exception");
+        } catch (final IllegalArgumentException e) {
+            Assert.assertTrue(true);
+        }
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+    }
+
+    private List<NotificationEventWithMetadata> getFutureRetryableEvents() throws NoSuchNotificationQueue {
+        final NotificationQueue notificationQueue = queueService.getNotificationQueue(RetryableService.RETRYABLE_SERVICE_NAME, TEST_LISTENER);
+        return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+    }
+
+    private final class TestListener extends RetryableService {
+
+        private final SubscriberQueueHandler subscriberQueueHandler = new SubscriberQueueHandler();
+        private final Collection<BusInternalEvent> eventsSeen = new LinkedList<BusInternalEvent>();
+
+        private final RetryableSubscriber retryableSubscriber;
+
+        private boolean throwRetryableException = false;
+        private boolean throwOtherException = false;
+
+        public TestListener() {
+            super(queueService, internalCallContextFactory);
+
+            subscriberQueueHandler.subscribe(ControlTagDeletionInternalEvent.class,
+                                             new SubscriberAction<ControlTagDeletionInternalEvent>() {
+                                                 @Override
+                                                 public void run(final ControlTagDeletionInternalEvent event) {
+                                                     Assert.fail("No handler registered");
+                                                 }
+                                             });
+            subscriberQueueHandler.subscribe(ControlTagCreationInternalEvent.class,
+                                             new SubscriberAction<ControlTagCreationInternalEvent>() {
+                                                 @Override
+                                                 public void run(final ControlTagCreationInternalEvent event) {
+                                                     if (throwRetryableException) {
+                                                         throw new RetryException(RETRY_SCHEDULE);
+                                                     } else if (throwOtherException) {
+                                                         throw new IllegalArgumentException("EXPECTED");
+                                                     } else {
+                                                         eventsSeen.add(event);
+                                                     }
+                                                 }
+                                             });
+            this.retryableSubscriber = new RetryableSubscriber(clock, this, subscriberQueueHandler, internalCallContextFactory);
+
+            initialize(TEST_LISTENER, subscriberQueueHandler);
+            start();
+        }
+
+        public void handleEvent(final ControlTagCreationInternalEvent event) {
+            retryableSubscriber.handleEvent(event);
+        }
+    }
+}