killbill-aplcache

Major rework around entitlement unit tests Refactored BusEvent

5/7/2012 12:17:16 AM

Changes

Details

diff --git a/api/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java b/api/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
new file mode 100644
index 0000000..8db4ec7
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
@@ -0,0 +1,34 @@
+/* 
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.ning.billing.util.queue;
+
+public interface QueueLifecycle {
+    /**
+     * Starts the queue
+     */
+    public void startQueue();
+
+    /**
+     * Stop the queue
+     *
+     */
+    public void stopQueue();
+    
+    /**
+     *  Processes event from queue
+     */
+    public int doProcessEvents();
+}
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/migration/TestMigration.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/migration/TestMigration.java
index 7380ec9..882b2cb 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/migration/TestMigration.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/migration/TestMigration.java
@@ -29,13 +29,13 @@ import java.util.UUID;
 import org.joda.time.DateTime;
 import org.testng.Assert;
 
+import com.ning.billing.api.TestApiListener.NextEvent;
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.Duration;
 import com.ning.billing.catalog.api.PhaseType;
 import com.ning.billing.catalog.api.PlanPhaseSpecifier;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.ApiTestListener.NextEvent;
 import com.ning.billing.entitlement.api.TestApiBase;
 import com.ning.billing.entitlement.api.migration.EntitlementMigrationApi.EntitlementAccountMigration;
 import com.ning.billing.entitlement.api.migration.EntitlementMigrationApi.EntitlementBundleMigration;
@@ -55,9 +55,9 @@ public abstract class TestMigration extends TestApiBase {
             EntitlementAccountMigration toBeMigrated = createAccountWithRegularBasePlan(startDate);
             DateTime afterMigration = clock.getUTCNow();
 
-            testListener.pushNextApiExpectedEvent(NextEvent.MIGRATE_ENTITLEMENT);
+            testListener.pushExpectedEvent(NextEvent.MIGRATE_ENTITLEMENT);
             migrationApi.migrate(toBeMigrated, context);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             List<SubscriptionBundle> bundles = entitlementApi.getBundlesForAccount(toBeMigrated.getAccountKey());
             assertEquals(bundles.size(), 1);
@@ -73,6 +73,8 @@ public abstract class TestMigration extends TestApiBase {
             assertEquals(subscription.getState(), SubscriptionState.ACTIVE);
             assertEquals(subscription.getCurrentPlan().getName(), "assault-rifle-annual");
             assertEquals(subscription.getChargedThroughDate(), startDate.plusYears(1));
+            
+            assertListenerStatus();
         } catch (EntitlementMigrationApiException e) {
             Assert.fail("", e);
         }
@@ -86,10 +88,10 @@ public abstract class TestMigration extends TestApiBase {
             EntitlementAccountMigration toBeMigrated = createAccountWithRegularBasePlanAndAddons(initalBPStart, initalAddonStart);
             DateTime afterMigration = clock.getUTCNow();
 
-            testListener.pushNextApiExpectedEvent(NextEvent.MIGRATE_ENTITLEMENT);
-            testListener.pushNextApiExpectedEvent(NextEvent.MIGRATE_ENTITLEMENT);
+            testListener.pushExpectedEvent(NextEvent.MIGRATE_ENTITLEMENT);
+            testListener.pushExpectedEvent(NextEvent.MIGRATE_ENTITLEMENT);
             migrationApi.migrate(toBeMigrated, context);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             List<SubscriptionBundle> bundles = entitlementApi.getBundlesForAccount(toBeMigrated.getAccountKey());
             assertEquals(bundles.size(), 1);
@@ -120,6 +122,7 @@ public abstract class TestMigration extends TestApiBase {
             assertEquals(aoSubscription.getCurrentPlan().getName(), "telescopic-scope-monthly");
             assertEquals(aoSubscription.getChargedThroughDate(), initalAddonStart.plusMonths(1));
 
+            assertListenerStatus();
         } catch (EntitlementMigrationApiException e) {
             Assert.fail("", e);
         }
@@ -134,9 +137,9 @@ public abstract class TestMigration extends TestApiBase {
             EntitlementAccountMigration toBeMigrated = createAccountWithRegularBasePlanFutreCancelled(startDate);
             DateTime afterMigration = clock.getUTCNow();
 
-            testListener.pushNextApiExpectedEvent(NextEvent.MIGRATE_ENTITLEMENT);
+            testListener.pushExpectedEvent(NextEvent.MIGRATE_ENTITLEMENT);
             migrationApi.migrate(toBeMigrated, context);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             List<SubscriptionBundle> bundles = entitlementApi.getBundlesForAccount(toBeMigrated.getAccountKey());
             assertEquals(bundles.size(), 1);
@@ -153,11 +156,11 @@ public abstract class TestMigration extends TestApiBase {
             assertEquals(subscription.getCurrentPlan().getName(), "assault-rifle-annual");
             assertEquals(subscription.getChargedThroughDate(), startDate.plusYears(1));
 
-            testListener.pushNextApiExpectedEvent(NextEvent.MIGRATE_BILLING);
-            testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
+            testListener.pushExpectedEvent(NextEvent.MIGRATE_BILLING);
+            testListener.pushExpectedEvent(NextEvent.CANCEL);
             Duration oneYear = getDurationYear(1);
             clock.setDeltaFromReality(oneYear, 0);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             assertDateWithin(subscription.getStartDate(), beforeMigration, afterMigration);
             assertNotNull(subscription.getEndDate());
@@ -167,6 +170,8 @@ public abstract class TestMigration extends TestApiBase {
             assertEquals(subscription.getState(), SubscriptionState.CANCELLED);
             assertNull(subscription.getCurrentPlan());
 
+            assertListenerStatus();
+            
         } catch (EntitlementMigrationApiException e) {
             Assert.fail("", e);
         }
@@ -178,9 +183,9 @@ public abstract class TestMigration extends TestApiBase {
             final DateTime trialDate = clock.getUTCNow().minusDays(10);
             EntitlementAccountMigration toBeMigrated = createAccountFuturePendingPhase(trialDate);
 
-            testListener.pushNextApiExpectedEvent(NextEvent.MIGRATE_ENTITLEMENT);
+            testListener.pushExpectedEvent(NextEvent.MIGRATE_ENTITLEMENT);
             migrationApi.migrate(toBeMigrated, context);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             List<SubscriptionBundle> bundles = entitlementApi.getBundlesForAccount(toBeMigrated.getAccountKey());
             assertEquals(bundles.size(), 1);
@@ -198,11 +203,11 @@ public abstract class TestMigration extends TestApiBase {
             assertEquals(subscription.getCurrentPlan().getName(), "assault-rifle-monthly");
             assertEquals(subscription.getChargedThroughDate(), trialDate.plusDays(30));
 
-            testListener.pushNextApiExpectedEvent(NextEvent.MIGRATE_BILLING);
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.MIGRATE_BILLING);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             Duration thirtyDays = getDurationDay(30);
             clock.setDeltaFromReality(thirtyDays, 0);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             assertEquals(subscription.getStartDate(), trialDate);
             assertEquals(subscription.getEndDate(), null);
@@ -212,6 +217,8 @@ public abstract class TestMigration extends TestApiBase {
             assertEquals(subscription.getCurrentPlan().getName(), "assault-rifle-monthly");
             assertEquals(subscription.getCurrentPhase().getName(), "assault-rifle-monthly-evergreen");
 
+            assertListenerStatus();
+            
         } catch (EntitlementMigrationApiException e) {
             Assert.fail("", e);
         }
@@ -224,9 +231,9 @@ public abstract class TestMigration extends TestApiBase {
             EntitlementAccountMigration toBeMigrated = createAccountFuturePendingChange();
             DateTime afterMigration = clock.getUTCNow();
 
-            testListener.pushNextApiExpectedEvent(NextEvent.MIGRATE_ENTITLEMENT);
+            testListener.pushExpectedEvent(NextEvent.MIGRATE_ENTITLEMENT);
             migrationApi.migrate(toBeMigrated, context);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             List<SubscriptionBundle> bundles = entitlementApi.getBundlesForAccount(toBeMigrated.getAccountKey());
             assertEquals(bundles.size(), 1);
@@ -242,10 +249,10 @@ public abstract class TestMigration extends TestApiBase {
             assertEquals(subscription.getState(), SubscriptionState.ACTIVE);
             assertEquals(subscription.getCurrentPlan().getName(), "assault-rifle-monthly");
 
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             Duration oneMonth = getDurationMonth(1);
             clock.setDeltaFromReality(oneMonth, 0);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             assertDateWithin(subscription.getStartDate(), beforeMigration, afterMigration);
             assertEquals(subscription.getEndDate(), null);
@@ -255,6 +262,8 @@ public abstract class TestMigration extends TestApiBase {
             assertEquals(subscription.getState(), SubscriptionState.ACTIVE);
             assertEquals(subscription.getCurrentPlan().getName(), "shotgun-annual");
 
+            assertListenerStatus();
+            
         } catch (EntitlementMigrationApiException e) {
             Assert.fail("", e);
         }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
index ef82964..42f85ea 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
@@ -34,6 +34,7 @@ import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -42,6 +43,9 @@ import org.testng.annotations.BeforeMethod;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.ning.billing.account.api.AccountData;
+import com.ning.billing.api.TestApiListener;
+import com.ning.billing.api.TestApiListener.NextEvent;
+import com.ning.billing.api.TestListenerStatus;
 import com.ning.billing.catalog.DefaultCatalogService;
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.Catalog;
@@ -54,7 +58,6 @@ import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.catalog.api.TimeUnit;
 import com.ning.billing.config.EntitlementConfig;
 import com.ning.billing.dbi.MysqlTestingHelper;
-import com.ning.billing.entitlement.api.ApiTestListener.NextEvent;
 import com.ning.billing.entitlement.api.billing.ChargeThruApi;
 import com.ning.billing.entitlement.api.migration.EntitlementMigrationApi;
 import com.ning.billing.entitlement.api.timeline.EntitlementTimelineApi;
@@ -80,7 +83,8 @@ import com.ning.billing.util.clock.ClockMock;
 import com.ning.billing.util.glue.RealImplementation;
 
 
-public abstract class TestApiBase {
+public abstract class TestApiBase implements TestListenerStatus {
+    
     protected static final Logger log = LoggerFactory.getLogger(TestApiBase.class);
 
     protected static final long DAY_IN_MS = (24 * 3600 * 1000);
@@ -100,12 +104,16 @@ public abstract class TestApiBase {
 
     protected AccountData accountData;
     protected Catalog catalog;
-    protected ApiTestListener testListener;
+    protected TestApiListener testListener;
     protected SubscriptionBundle bundle;
 
     private MysqlTestingHelper helper;
     protected CallContext context = new TestCallContext("Api Test");
 
+    private boolean isListenerFailed;
+    private String listenerFailedMsg;    
+
+    
     public static void loadSystemPropertiesFromClasspath(final String resource) {
         final URL url = TestApiBase.class.getResource(resource);
         assertNotNull(url);
@@ -131,6 +139,19 @@ public abstract class TestApiBase {
         }
     }
 
+    
+    @Override
+    public void failed(final String msg) {
+        this.isListenerFailed = true;
+        this.listenerFailedMsg = msg;
+    }
+
+    @Override
+    public void resetTestListenerStatus() {
+        this.isListenerFailed = false;
+        this.listenerFailedMsg = null;
+    }
+    
     @BeforeClass(alwaysRun = true)
     public void setup() throws Exception {
 
@@ -149,64 +170,94 @@ public abstract class TestApiBase {
         dao = g.getInstance(EntitlementDao.class);
         clock = (ClockMock) g.getInstance(Clock.class);
         helper = (isSqlTest(dao)) ? g.getInstance(MysqlTestingHelper.class) : null;
+        init();
+    }
+
+    private void init() throws Exception {
+
+        setupDao();
 
         ((DefaultCatalogService) catalogService).loadCatalog();
         ((Engine) entitlementService).initialize();
-        init(g);
-        ((DefaultBusService) busService).startBus();
-    }
 
-    private static boolean isSqlTest(EntitlementDao theDao) {
-        return (! (theDao instanceof MockEntitlementDaoMemory));
+        accountData = getAccountData();
+        assertNotNull(accountData);
+        catalog = catalogService.getFullCatalog();
+        assertNotNull(catalog);
+        testListener = new TestApiListener(this);
     }
 
-    private void setupMySQL() throws IOException {
+    private void setupDao() throws IOException {
         if (helper != null) {
             final String entitlementDdl = IOUtils.toString(TestApiBase.class.getResourceAsStream("/com/ning/billing/entitlement/ddl.sql"));
             final String utilDdl = IOUtils.toString(TestApiBase.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
             helper.startMysql();
-            helper.cleanupAllTables();
             helper.initDb(entitlementDdl);
             helper.initDb(utilDdl);
         }
     }
 
-    private void init(Injector g) throws Exception {
-
-        setupMySQL();
-        accountData = getAccountData();
-        assertNotNull(accountData);
-        catalog = catalogService.getFullCatalog();
-        assertNotNull(catalog);
-        testListener = new ApiTestListener(busService.getBus());
+    private static boolean isSqlTest(EntitlementDao theDao) {
+        return (! (theDao instanceof MockEntitlementDaoMemory));
     }
-
+   
     @BeforeMethod(alwaysRun = true)
     public void setupTest() throws Exception {
 
         log.warn("RESET TEST FRAMEWORK\n\n");
 
+        // CLEANUP ALL DB TABLES OR IN MEMORY STRUCTURES
+        cleanupDao();
+        
+        // RESET LIST OF EXPECTED EVENTS
         if (testListener != null) {
             testListener.reset();
+            resetTestListenerStatus();
         }
-
+        
+        // RESET CLOCK
         clock.resetDeltaFromReality();
-        ((MockEntitlementDao) dao).reset();
 
+        // START BUS AND REGISTER LISTENER
+        busService.getBus().start();
         busService.getBus().register(testListener);
+        
+        // START NOTIFICATION QUEUE FOR ENTITLEMENT
+        ((Engine)entitlementService).start();
+        
+        // CREATE NEW BUNDLE FOR TEST
         UUID accountId = UUID.randomUUID();
         bundle = entitlementApi.createBundleForAccount(accountId, "myDefaultBundle", context);
         assertNotNull(bundle);
-
-        ((Engine)entitlementService).start();
     }
 
     @AfterMethod(alwaysRun = true)
     public void cleanupTest() throws Exception {
+        
+        // UNREGISTER TEST LISTENER AND STOP BUS
         busService.getBus().unregister(testListener);
+        busService.getBus().stop();
+        
+        // STOP NOTIFICATION QUEUE
         ((Engine)entitlementService).stop();
+
         log.warn("DONE WITH TEST\n");
     }
+    
+    protected void assertListenerStatus() {
+        if (isListenerFailed) {
+            log.error(listenerFailedMsg);
+            Assert.fail(listenerFailedMsg);
+        }
+    }
+    
+    private void cleanupDao() {
+        if (helper != null) {
+            helper.cleanupAllTables();
+        } else {
+            ((MockEntitlementDao) dao).reset();
+        }
+    }
 
     protected SubscriptionData createSubscription(final String productName, final BillingPeriod term, final String planSet, final DateTime requestedDate)
         throws EntitlementUserApiException {
@@ -219,13 +270,13 @@ public abstract class TestApiBase {
 
     protected SubscriptionData createSubscriptionWithBundle(final UUID bundleId, final String productName, final BillingPeriod term, final String planSet, final DateTime requestedDate)
         throws EntitlementUserApiException {
-        testListener.pushNextApiExpectedEvent(NextEvent.CREATE);
+        testListener.pushExpectedEvent(NextEvent.CREATE);
 
         SubscriptionData subscription = (SubscriptionData) entitlementApi.createSubscription(bundleId,
                 new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSet, null),
                 requestedDate == null ? clock.getUTCNow() : requestedDate, context);
         assertNotNull(subscription);
-        assertTrue(testListener.isApiCompleted(5000));
+        assertTrue(testListener.isCompleted(5000));
         return subscription;
     }
 
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java
index e988805..76d7e77 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java
@@ -34,6 +34,7 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Stage;
 import com.ning.billing.ErrorCode;
+import com.ning.billing.api.TestApiListener.NextEvent;
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.Duration;
 import com.ning.billing.catalog.api.PhaseType;
@@ -43,7 +44,6 @@ import com.ning.billing.catalog.api.PlanPhaseSpecifier;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
-import com.ning.billing.entitlement.api.ApiTestListener.NextEvent;
 import com.ning.billing.entitlement.api.timeline.BundleTimeline;
 import com.ning.billing.entitlement.api.timeline.EntitlementRepairException;
 import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline;
@@ -67,6 +67,8 @@ public class TestRepairBP extends TestApiBaseRepair {
     @Test(groups={"slow"})
     public void testFetchBundleRepair() throws Exception  {
 
+        log.info("Starting testFetchBundleRepair");
+        
         String baseProduct = "Shotgun";
         BillingPeriod baseTerm = BillingPeriod.MONTHLY;
         String basePriceList = PriceListSet.DEFAULT_PRICELIST_NAME;
@@ -125,11 +127,14 @@ public class TestRepairBP extends TestApiBaseRepair {
                 assertEquals(events.get(1).getPlanPhaseSpecifier().getBillingPeriod(), aoTerm);                    
             }
         }
+        assertListenerStatus();
     }
     
     @Test(groups={"slow"})
     public void testBPRepairWithCancellationOnstart() throws Exception {
 
+        log.info("Starting testBPRepairWithCancellationOnstart");
+        
         String baseProduct = "Shotgun";
         DateTime startDate = clock.getUTCNow();
         
@@ -190,9 +195,9 @@ public class TestRepairBP extends TestApiBaseRepair {
         
        // SECOND RE-ISSUE CALL-- NON DRY RUN
         dryRun = false;
-        testListener.expectRepairCompletion();
+        testListener.pushExpectedEvent(NextEvent.REPAIR_BUNDLE);
         BundleTimeline realRunBundleRepair = repairApi.repairBundle(bRepair, dryRun, context);
-        assertTrue(testListener.isRepairCompleted(5000));
+        assertTrue(testListener.isCompleted(5000));
         
         subscriptionRepair = realRunBundleRepair.getSubscriptions();
         assertEquals(subscriptionRepair.size(), 1);
@@ -211,10 +216,15 @@ public class TestRepairBP extends TestApiBaseRepair {
         assertEquals(realRunBaseSubscription.getStartDate(), startDate);
 
         assertEquals(realRunBaseSubscription.getState(), SubscriptionState.CANCELLED);
+        
+        assertListenerStatus();
     }
     
     @Test(groups={"slow"})
     public void testBPRepairReplaceCreateBeforeTrial() throws Exception {
+        
+        log.info("Starting testBPRepairReplaceCreateBeforeTrial");
+        
         String baseProduct = "Shotgun";
         String newBaseProduct = "Assault-Rifle";
         
@@ -229,10 +239,14 @@ public class TestRepairBP extends TestApiBaseRepair {
                     ProductCategory.BASE, PriceListSet.DEFAULT_PRICELIST_NAME, BillingPeriod.MONTHLY, restartDate.plusDays(30)));
 
         testBPRepairCreate(true, startDate, clockShift, baseProduct, newBaseProduct, expected);
+        assertListenerStatus();
     }
 
     @Test(groups={"slow"}, enabled=true)
     public void testBPRepairReplaceCreateInTrial() throws Exception {
+        
+        log.info("Starting testBPRepairReplaceCreateInTrial");
+        
         String baseProduct = "Shotgun";
         String newBaseProduct = "Assault-Rifle";
         
@@ -248,9 +262,9 @@ public class TestRepairBP extends TestApiBaseRepair {
 
         UUID baseSubscriptionId = testBPRepairCreate(true, startDate, clockShift, baseProduct, newBaseProduct, expected);
         
-        testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+        testListener.pushExpectedEvent(NextEvent.PHASE);
         clock.addDeltaFromReality(getDurationDay(32));
-        assertTrue(testListener.isApiCompleted(5000));
+        assertTrue(testListener.isCompleted(5000));
         
         // CHECK WHAT"S GOING ON AFTER WE MOVE CLOCK-- FUTURE MOTIFICATION SHOULD KICK IN
         SubscriptionData subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(baseSubscriptionId);
@@ -269,11 +283,16 @@ public class TestRepairBP extends TestApiBaseRepair {
         PlanPhase currentPhase = subscription.getCurrentPhase();
         assertNotNull(currentPhase);
         assertEquals(currentPhase.getPhaseType(), PhaseType.EVERGREEN);
+        
+        assertListenerStatus();
     }
 
     
     @Test(groups={"slow"})
     public void testBPRepairReplaceCreateAfterTrial() throws Exception {
+        
+        log.info("Starting testBPRepairReplaceCreateAfterTrial");
+        
         String baseProduct = "Shotgun";
         String newBaseProduct = "Assault-Rifle";
         
@@ -288,25 +307,27 @@ public class TestRepairBP extends TestApiBaseRepair {
                     ProductCategory.BASE, PriceListSet.DEFAULT_PRICELIST_NAME, BillingPeriod.MONTHLY, restartDate.plusDays(30)));
 
         testBPRepairCreate(false, startDate, clockShift, baseProduct, newBaseProduct, expected);
-        
+        assertListenerStatus();
     }
     
     
     private UUID testBPRepairCreate(boolean inTrial, DateTime startDate, int clockShift, 
             String baseProduct, String newBaseProduct, List<ExistingEvent> expectedEvents) throws Exception {
 
+        log.info("Starting testBPRepairCreate");
+        
         // CREATE BP
         Subscription baseSubscription = createSubscription(baseProduct, BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME, startDate);
 
         // MOVE CLOCK
         if (clockShift > 0) {
             if (!inTrial) {
-                testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+                testListener.pushExpectedEvent(NextEvent.PHASE);
             }               
             Duration durationShift = getDurationDay(clockShift);
             clock.setDeltaFromReality(durationShift, 0);
             if (!inTrial) {
-                assertTrue(testListener.isApiCompleted(5000));
+                assertTrue(testListener.isCompleted(5000));
             }
         }
 
@@ -327,7 +348,7 @@ public class TestRepairBP extends TestApiBaseRepair {
         // FIRST ISSUE DRY RUN
         BundleTimeline bRepair =  createBundleRepair(bundle.getId(), bundleRepair.getViewId(), Collections.singletonList(sRepair));
         
-        boolean dryRun = true;
+        boolean dryRun = true;        
         BundleTimeline dryRunBundleRepair = repairApi.repairBundle(bRepair, dryRun, context);
         List<SubscriptionTimeline> subscriptionRepair = dryRunBundleRepair.getSubscriptions();
         assertEquals(subscriptionRepair.size(), 1);
@@ -362,8 +383,9 @@ public class TestRepairBP extends TestApiBaseRepair {
         
        // SECOND RE-ISSUE CALL-- NON DRY RUN
         dryRun = false;
+        testListener.pushExpectedEvent(NextEvent.REPAIR_BUNDLE);
         BundleTimeline realRunBundleRepair = repairApi.repairBundle(bRepair, dryRun, context);
-
+        assertTrue(testListener.isCompleted(5000));
         subscriptionRepair = realRunBundleRepair.getSubscriptions();
         assertEquals(subscriptionRepair.size(), 1);
         cur = subscriptionRepair.get(0);
@@ -402,6 +424,8 @@ public class TestRepairBP extends TestApiBaseRepair {
     @Test(groups={"slow"})
     public void testBPRepairAddChangeInTrial() throws Exception {
         
+        log.info("Starting testBPRepairAddChangeInTrial");
+        
         String baseProduct = "Shotgun";
         String newBaseProduct = "Assault-Rifle";
         
@@ -420,9 +444,9 @@ public class TestRepairBP extends TestApiBaseRepair {
         UUID baseSubscriptionId = testBPRepairAddChange(true, startDate, clockShift, baseProduct, newBaseProduct, expected, 3);
         
         // CHECK WHAT"S GOING ON AFTER WE MOVE CLOCK-- FUTURE MOTIFICATION SHOULD KICK IN
-        testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+        testListener.pushExpectedEvent(NextEvent.PHASE);
         clock.addDeltaFromReality(getDurationDay(32));
-        assertTrue(testListener.isApiCompleted(5000));
+        assertTrue(testListener.isCompleted(5000));
         SubscriptionData subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(baseSubscriptionId);
         
         assertEquals(subscription.getActiveVersion(), SubscriptionEvents.INITIAL_VERSION + 1);
@@ -439,11 +463,15 @@ public class TestRepairBP extends TestApiBaseRepair {
         PlanPhase currentPhase = subscription.getCurrentPhase();
         assertNotNull(currentPhase);
         assertEquals(currentPhase.getPhaseType(), PhaseType.EVERGREEN);
+        
+        assertListenerStatus();
     }
 
     @Test(groups={"slow"})
     public void testBPRepairAddChangeAfterTrial() throws Exception {
         
+        log.info("Starting testBPRepairAddChangeAfterTrial");
+        
         String baseProduct = "Shotgun";
         String newBaseProduct = "Assault-Rifle";
         
@@ -459,24 +487,27 @@ public class TestRepairBP extends TestApiBaseRepair {
         expected.add(createExistingEventForAssertion(SubscriptionTransitionType.CHANGE, newBaseProduct, PhaseType.EVERGREEN,
                 ProductCategory.BASE, PriceListSet.DEFAULT_PRICELIST_NAME, BillingPeriod.MONTHLY, changeDate));
         testBPRepairAddChange(false, startDate, clockShift, baseProduct, newBaseProduct, expected, 3);
+    
+        assertListenerStatus();
     }
     
 
     private UUID testBPRepairAddChange(boolean inTrial, DateTime startDate, int clockShift, 
             String baseProduct, String newBaseProduct, List<ExistingEvent> expectedEvents, int expectedTransitions) throws Exception {
 
+        
         // CREATE BP
         Subscription baseSubscription = createSubscription(baseProduct, BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME, startDate);
 
         // MOVE CLOCK
         if (!inTrial) {
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
         }               
 
         Duration durationShift = getDurationDay(clockShift);
         clock.setDeltaFromReality(durationShift, 0);
         if (!inTrial) {
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
         }
 
         BundleTimeline bundleRepair = repairApi.getBundleRepair(bundle.getId());
@@ -498,6 +529,7 @@ public class TestRepairBP extends TestApiBaseRepair {
         
         boolean dryRun = true;
         BundleTimeline dryRunBundleRepair = repairApi.repairBundle(bRepair, dryRun, context);
+        
         List<SubscriptionTimeline> subscriptionRepair = dryRunBundleRepair.getSubscriptions();
         assertEquals(subscriptionRepair.size(), 1);
         SubscriptionTimeline cur = subscriptionRepair.get(0);
@@ -532,7 +564,9 @@ public class TestRepairBP extends TestApiBaseRepair {
         
        // SECOND RE-ISSUE CALL-- NON DRY RUN
         dryRun = false;
+        testListener.pushExpectedEvent(NextEvent.REPAIR_BUNDLE);
         BundleTimeline realRunBundleRepair = repairApi.repairBundle(bRepair, dryRun, context);
+        assertTrue(testListener.isCompleted(5000));
 
         subscriptionRepair = realRunBundleRepair.getSubscriptions();
         assertEquals(subscriptionRepair.size(), 1);
@@ -572,15 +606,17 @@ public class TestRepairBP extends TestApiBaseRepair {
     @Test(groups={"slow"})
     public void testRepairWithFurureCancelEvent() throws Exception {
       
+        log.info("Starting testRepairWithFurureCancelEvent");
+        
         DateTime startDate = clock.getUTCNow();
         
         // CREATE BP
         Subscription baseSubscription = createSubscription("Shotgun", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME, startDate);
 
         // MOVE CLOCK -- OUT OF TRIAL
-        testListener.pushNextApiExpectedEvent(NextEvent.PHASE);                
+        testListener.pushExpectedEvent(NextEvent.PHASE);                
         clock.setDeltaFromReality(getDurationDay(35), 0);
-        assertTrue(testListener.isApiCompleted(5000));
+        assertTrue(testListener.isCompleted(5000));
         
         // SET CTD to BASE SUBSCRIPTION SP CANCEL OCCURS EOT
         DateTime newChargedThroughDate = baseSubscription.getStartDate().plusDays(30).plusMonths(1);
@@ -616,7 +652,9 @@ public class TestRepairBP extends TestApiBaseRepair {
         BundleTimeline bRepair =  createBundleRepair(bundle.getId(), bundleRepair.getViewId(), Collections.singletonList(sRepair));
         
         boolean dryRun = false;
+        testListener.pushExpectedEvent(NextEvent.REPAIR_BUNDLE);
         repairApi.repairBundle(bRepair, dryRun, context);
+        assertTrue(testListener.isCompleted(5000));
      
         baseSubscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(baseSubscription.getId());
         
@@ -633,6 +671,8 @@ public class TestRepairBP extends TestApiBaseRepair {
         PlanPhase currentPhase = baseSubscription.getCurrentPhase();
         assertNotNull(currentPhase);
         assertEquals(currentPhase.getPhaseType(), PhaseType.EVERGREEN);
+        
+        assertListenerStatus();
     }
     
     
@@ -640,6 +680,8 @@ public class TestRepairBP extends TestApiBaseRepair {
     @Test(groups={"slow"})
     public void testENT_REPAIR_VIEW_CHANGED_newEvent() throws Exception {
        
+        log.info("Starting testENT_REPAIR_VIEW_CHANGED_newEvent");
+        
         TestWithException test = new TestWithException();
         DateTime startDate = clock.getUTCNow();
         
@@ -663,12 +705,13 @@ public class TestRepairBP extends TestApiBaseRepair {
 
                 BundleTimeline bRepair =  createBundleRepair(bundle.getId(), bundleRepair.getViewId(), Collections.singletonList(sRepair));
 
-                testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+                testListener.pushExpectedEvent(NextEvent.CHANGE);
                 DateTime changeTime = clock.getUTCNow();
                 baseSubscription.changePlan("Assault-Rifle", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME, changeTime, context);
-                assertTrue(testListener.isApiCompleted(5000));
-                                
+                assertTrue(testListener.isCompleted(5000));
+                
                 repairApi.repairBundle(bRepair, true, context);
+                assertListenerStatus();
             }
         }, ErrorCode.ENT_REPAIR_VIEW_CHANGED);
     }
@@ -676,6 +719,8 @@ public class TestRepairBP extends TestApiBaseRepair {
     @Test(groups={"slow"}, enabled=false)
     public void testENT_REPAIR_VIEW_CHANGED_ctd() throws Exception {
        
+        log.info("Starting testENT_REPAIR_VIEW_CHANGED_ctd");
+        
         TestWithException test = new TestWithException();
         DateTime startDate = clock.getUTCNow();
         
@@ -704,6 +749,8 @@ public class TestRepairBP extends TestApiBaseRepair {
                 entitlementApi.getSubscriptionFromId(baseSubscription.getId());
 
                 repairApi.repairBundle(bRepair, true, context);
+                
+                assertListenerStatus();
             }
         }, ErrorCode.ENT_REPAIR_VIEW_CHANGED);
     }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairWithAO.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairWithAO.java
index 57a78e6..b6a8052 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairWithAO.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairWithAO.java
@@ -29,6 +29,7 @@ import org.testng.annotations.Test;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Stage;
+import com.ning.billing.api.TestApiListener.NextEvent;
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.Duration;
 import com.ning.billing.catalog.api.PhaseType;
@@ -38,7 +39,6 @@ import com.ning.billing.catalog.api.PlanPhaseSpecifier;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
-import com.ning.billing.entitlement.api.ApiTestListener.NextEvent;
 import com.ning.billing.entitlement.api.timeline.BundleTimeline;
 import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline;
 import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline.DeletedEvent;
@@ -59,6 +59,8 @@ public class TestRepairWithAO extends TestApiBaseRepair {
     @Test(groups={"slow"})
     public void testRepairChangeBPWithAddonIncluded() throws Exception {
         
+        log.info("Starting testRepairChangeBPWithAddonIncluded");
+        
         String baseProduct = "Shotgun";
         BillingPeriod baseTerm = BillingPeriod.MONTHLY;
         String basePriceList = PriceListSet.DEFAULT_PRICELIST_NAME;
@@ -165,8 +167,11 @@ public class TestRepairWithAO extends TestApiBaseRepair {
         assertEquals(newBaseSubscription.getAllTransitions().size(), 2);
         assertEquals(newBaseSubscription.getActiveVersion(), SubscriptionEvents.INITIAL_VERSION);
         
-        dryRun = false;
+        dryRun = false;        
+        testListener.pushExpectedEvent(NextEvent.REPAIR_BUNDLE);
         BundleTimeline realRunBundleRepair = repairApi.repairBundle(bundleRepair, dryRun, context);
+        assertTrue(testListener.isCompleted(5000));
+
         
         
         aoRepair = getSubscriptionRepair(aoSubscription.getId(), realRunBundleRepair);
@@ -210,6 +215,8 @@ public class TestRepairWithAO extends TestApiBaseRepair {
     @Test(groups={"slow"})
     public void testRepairChangeBPWithAddonNonAvailable() throws Exception {
         
+        log.info("Starting testRepairChangeBPWithAddonNonAvailable");
+        
         String baseProduct = "Shotgun";
         BillingPeriod baseTerm = BillingPeriod.MONTHLY;
         String basePriceList = PriceListSet.DEFAULT_PRICELIST_NAME;
@@ -224,11 +231,11 @@ public class TestRepairWithAO extends TestApiBaseRepair {
         SubscriptionData aoSubscription = createSubscription("Telescopic-Scope", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME);
         
         // MOVE CLOCK A LITTLE BIT MORE -- AFTER TRIAL
-        testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
-        testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+        testListener.pushExpectedEvent(NextEvent.PHASE);
+        testListener.pushExpectedEvent(NextEvent.PHASE);
         someTimeLater = getDurationDay(32);
         clock.addDeltaFromReality(someTimeLater);
-        assertTrue(testListener.isApiCompleted(7000));        
+        assertTrue(testListener.isCompleted(7000));        
 
         BundleTimeline bundleRepair = repairApi.getBundleRepair(bundle.getId());
         sortEventsOnBundle(bundleRepair);
@@ -295,8 +302,10 @@ public class TestRepairWithAO extends TestApiBaseRepair {
         assertEquals(newBaseSubscription.getActiveVersion(), SubscriptionEvents.INITIAL_VERSION);
         
         dryRun = false;
+        testListener.pushExpectedEvent(NextEvent.REPAIR_BUNDLE);
         BundleTimeline realRunBundleRepair = repairApi.repairBundle(bundleRepair, dryRun, context);
-        
+        assertTrue(testListener.isCompleted(5000));
+
         aoRepair = getSubscriptionRepair(aoSubscription.getId(), realRunBundleRepair);
         assertEquals(aoRepair.getExistingEvents().size(), 3);
 
@@ -327,6 +336,8 @@ public class TestRepairWithAO extends TestApiBaseRepair {
     @Test(groups={"slow"})
     public void testRepairCancelBP_EOT_WithAddons() throws Exception {
         
+        log.info("Starting testRepairCancelBP_EOT_WithAddons");
+        
         String baseProduct = "Shotgun";
         BillingPeriod baseTerm = BillingPeriod.MONTHLY;
         String basePriceList = PriceListSet.DEFAULT_PRICELIST_NAME;
@@ -341,11 +352,11 @@ public class TestRepairWithAO extends TestApiBaseRepair {
         SubscriptionData aoSubscription = createSubscription("Telescopic-Scope", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME);
         
         // MOVE CLOCK A LITTLE BIT MORE -- AFTER TRIAL
-        testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
-        testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+        testListener.pushExpectedEvent(NextEvent.PHASE);
+        testListener.pushExpectedEvent(NextEvent.PHASE);
         someTimeLater = getDurationDay(40);
         clock.addDeltaFromReality(someTimeLater);
-        assertTrue(testListener.isApiCompleted(7000));
+        assertTrue(testListener.isCompleted(7000));
         
         // SET CTD to BASE SUBSCRIPTION SP CANCEL OCCURS EOT
         DateTime newChargedThroughDate = baseSubscription.getStartDate().plusDays(30).plusMonths(1);
@@ -414,7 +425,9 @@ public class TestRepairWithAO extends TestApiBaseRepair {
         assertEquals(newBaseSubscription.getActiveVersion(), SubscriptionEvents.INITIAL_VERSION);
         
         dryRun = false;
+        testListener.pushExpectedEvent(NextEvent.REPAIR_BUNDLE);
         BundleTimeline realRunBundleRepair = repairApi.repairBundle(bundleRepair, dryRun, context);
+        assertTrue(testListener.isCompleted(5000));
         
         aoRepair = getSubscriptionRepair(aoSubscription.getId(), realRunBundleRepair);
         assertEquals(aoRepair.getExistingEvents().size(), 3);
@@ -443,11 +456,11 @@ public class TestRepairWithAO extends TestApiBaseRepair {
         assertEquals(newBaseSubscription.getActiveVersion(), SubscriptionEvents.INITIAL_VERSION + 1);
         
         // MOVE CLOCK AFTER CANCEL DATE
-        testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
-        testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
+        testListener.pushExpectedEvent(NextEvent.CANCEL);
+        testListener.pushExpectedEvent(NextEvent.CANCEL);
         someTimeLater = getDurationDay(32);
         clock.addDeltaFromReality(someTimeLater);
-        assertTrue(testListener.isApiCompleted(7000));
+        assertTrue(testListener.isCompleted(7000));
 
         newAoSubscription = (SubscriptionData)  entitlementApi.getSubscriptionFromId(aoSubscription.getId());
         assertEquals(newAoSubscription.getState(), SubscriptionState.CANCELLED);
@@ -464,6 +477,9 @@ public class TestRepairWithAO extends TestApiBaseRepair {
     
     @Test(groups={"slow"})
     public void testRepairCancelAO() throws Exception {
+        
+        log.info("Starting testRepairCancelAO");
+        
         String baseProduct = "Shotgun";
         BillingPeriod baseTerm = BillingPeriod.MONTHLY;
         String basePriceList = PriceListSet.DEFAULT_PRICELIST_NAME;
@@ -530,7 +546,10 @@ public class TestRepairWithAO extends TestApiBaseRepair {
         assertEquals(newBaseSubscription.getActiveVersion(), SubscriptionEvents.INITIAL_VERSION);
         
         dryRun = false;
+        testListener.pushExpectedEvent(NextEvent.REPAIR_BUNDLE);
         BundleTimeline realRunBundleRepair = repairApi.repairBundle(bRepair, dryRun, context);
+        assertTrue(testListener.isCompleted(5000));
+
         
         aoRepair = getSubscriptionRepair(aoSubscription.getId(), realRunBundleRepair);
         assertEquals(aoRepair.getExistingEvents().size(), 2);
@@ -553,6 +572,9 @@ public class TestRepairWithAO extends TestApiBaseRepair {
     
     @Test(groups={"slow"})
     public void testRepairRecreateAO() throws Exception {
+        
+        log.info("Starting testRepairRecreateAO");
+        
         String baseProduct = "Shotgun";
         BillingPeriod baseTerm = BillingPeriod.MONTHLY;
         String basePriceList = PriceListSet.DEFAULT_PRICELIST_NAME;
@@ -616,7 +638,9 @@ public class TestRepairWithAO extends TestApiBaseRepair {
         
         // NOW COMMIT
         dryRun = false;
+        testListener.pushExpectedEvent(NextEvent.REPAIR_BUNDLE);
         BundleTimeline realRunBundleRepair = repairApi.repairBundle(bRepair, dryRun, context);
+        assertTrue(testListener.isCompleted(5000));
         
         aoRepair = getSubscriptionRepair(aoSubscription.getId(), realRunBundleRepair);
         assertEquals(aoRepair.getExistingEvents().size(), 2);
@@ -644,6 +668,8 @@ public class TestRepairWithAO extends TestApiBaseRepair {
     @Test(groups={"slow"})
     public void testRepairChangeAOOK() throws Exception {
         
+        log.info("Starting testRepairChangeAOOK");
+        
         String baseProduct = "Shotgun";
         BillingPeriod baseTerm = BillingPeriod.MONTHLY;
         String basePriceList = PriceListSet.DEFAULT_PRICELIST_NAME;
@@ -706,7 +732,9 @@ public class TestRepairWithAO extends TestApiBaseRepair {
         
         // AND NOW COMMIT
         dryRun = false;
+        testListener.pushExpectedEvent(NextEvent.REPAIR_BUNDLE);
         BundleTimeline realRunBundleRepair = repairApi.repairBundle(bRepair, dryRun, context);
+        assertTrue(testListener.isCompleted(5000));
         
         aoRepair = getSubscriptionRepair(aoSubscription.getId(), realRunBundleRepair);
         assertEquals(aoRepair.getExistingEvents().size(), 3);
@@ -734,10 +762,10 @@ public class TestRepairWithAO extends TestApiBaseRepair {
         assertNotNull(currentPhase);
         assertEquals(currentPhase.getPhaseType(), PhaseType.DISCOUNT);
         
-        testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+        testListener.pushExpectedEvent(NextEvent.PHASE);
         someTimeLater = getDurationDay(60);
         clock.addDeltaFromReality(someTimeLater);
-        assertTrue(testListener.isApiCompleted(5000));
+        assertTrue(testListener.isCompleted(5000));
         
         newAoSubscription = (SubscriptionData)  entitlementApi.getSubscriptionFromId(aoSubscription.getId());
         currentPhase = newAoSubscription.getCurrentPhase();
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairWithError.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairWithError.java
index 285b17d..8fbbb3a 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairWithError.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairWithError.java
@@ -31,6 +31,7 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Stage;
 import com.ning.billing.ErrorCode;
+import com.ning.billing.api.TestApiListener.NextEvent;
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.Duration;
 import com.ning.billing.catalog.api.PhaseType;
@@ -38,7 +39,6 @@ import com.ning.billing.catalog.api.PlanPhaseSpecifier;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
-import com.ning.billing.entitlement.api.ApiTestListener.NextEvent;
 import com.ning.billing.entitlement.api.timeline.BundleTimeline;
 import com.ning.billing.entitlement.api.timeline.EntitlementRepairException;
 import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline;
@@ -73,15 +73,18 @@ public class TestRepairWithError extends TestApiBaseRepair {
   
     @Test(groups={"fast"})
     public void testENT_REPAIR_NEW_EVENT_BEFORE_LAST_BP_REMAINING() throws Exception {
+        
+        log.info("Starting testENT_REPAIR_NEW_EVENT_BEFORE_LAST_BP_REMAINING");
+        
         test.withException(new TestWithExceptionCallback() {
             @Override
             public void doTest() throws EntitlementRepairException {
 
                 // MOVE AFTER TRIAL
-                testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+                testListener.pushExpectedEvent(NextEvent.PHASE);
                 Duration durationShift = getDurationDay(40);
                 clock.setDeltaFromReality(durationShift, 0);
-                assertTrue(testListener.isApiCompleted(5000));
+                assertTrue(testListener.isCompleted(5000));
                 
                 BundleTimeline bundleRepair = repairApi.getBundleRepair(bundle.getId());
                 sortEventsOnBundle(bundleRepair);
@@ -99,6 +102,9 @@ public class TestRepairWithError extends TestApiBaseRepair {
     
     @Test(groups={"fast"})
     public void testENT_REPAIR_INVALID_DELETE_SET() throws Exception {
+        
+        log.info("Starting testENT_REPAIR_INVALID_DELETE_SET");
+        
         test.withException(new TestWithExceptionCallback() {
             @Override
             public void doTest() throws EntitlementRepairException, EntitlementUserApiException {
@@ -106,16 +112,16 @@ public class TestRepairWithError extends TestApiBaseRepair {
                 Duration durationShift = getDurationDay(3);
                 clock.setDeltaFromReality(durationShift, 0);
                 
-                testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+                testListener.pushExpectedEvent(NextEvent.CHANGE);
                 DateTime changeTime = clock.getUTCNow();
                 baseSubscription.changePlan("Assault-Rifle", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME, changeTime, context);
-                assertTrue(testListener.isApiCompleted(5000));
+                assertTrue(testListener.isCompleted(5000));
                 
                 // MOVE AFTER TRIAL
-                testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+                testListener.pushExpectedEvent(NextEvent.PHASE);
                 durationShift = getDurationDay(40);
                 clock.addDeltaFromReality(durationShift);
-                assertTrue(testListener.isApiCompleted(5000));
+                assertTrue(testListener.isCompleted(5000));
                 
                 BundleTimeline bundleRepair = repairApi.getBundleRepair(bundle.getId());
                 sortEventsOnBundle(bundleRepair);
@@ -133,6 +139,9 @@ public class TestRepairWithError extends TestApiBaseRepair {
 
     @Test(groups={"fast"})
     public void testENT_REPAIR_NON_EXISTENT_DELETE_EVENT() throws Exception {
+        
+        log.info("Starting testENT_REPAIR_NON_EXISTENT_DELETE_EVENT");
+        
         test.withException(new TestWithExceptionCallback() {
             @Override
             public void doTest() throws EntitlementRepairException {
@@ -153,15 +162,18 @@ public class TestRepairWithError extends TestApiBaseRepair {
     
     @Test(groups={"fast"})
     public void testENT_REPAIR_SUB_RECREATE_NOT_EMPTY() throws Exception {
+        
+        log.info("Starting testENT_REPAIR_SUB_RECREATE_NOT_EMPTY");
+        
         test.withException(new TestWithExceptionCallback() {
             @Override
             public void doTest() throws EntitlementRepairException {
                 
                 // MOVE AFTER TRIAL
-                   testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+                   testListener.pushExpectedEvent(NextEvent.PHASE);
                    Duration durationShift = getDurationDay(40);
                    clock.setDeltaFromReality(durationShift, 0);
-                   assertTrue(testListener.isApiCompleted(5000));
+                   assertTrue(testListener.isCompleted(5000));
                    
                    BundleTimeline bundleRepair = repairApi.getBundleRepair(bundle.getId());
                    sortEventsOnBundle(bundleRepair);
@@ -181,15 +193,19 @@ public class TestRepairWithError extends TestApiBaseRepair {
 
     @Test(groups={"fast"})
     public void testENT_REPAIR_SUB_EMPTY() throws Exception {
+
+        log.info("Starting testENT_REPAIR_SUB_EMPTY");
+        
         test.withException(new TestWithExceptionCallback() {
+
             @Override
             public void doTest() throws EntitlementRepairException {
                 
              // MOVE AFTER TRIAL
-                testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+                testListener.pushExpectedEvent(NextEvent.PHASE);
                 Duration durationShift = getDurationDay(40);
                 clock.setDeltaFromReality(durationShift, 0);
-                assertTrue(testListener.isApiCompleted(5000));
+                assertTrue(testListener.isCompleted(5000));
                 
                 BundleTimeline bundleRepair = repairApi.getBundleRepair(bundle.getId());
                 sortEventsOnBundle(bundleRepair);
@@ -209,6 +225,9 @@ public class TestRepairWithError extends TestApiBaseRepair {
     
     @Test(groups={"fast"})
     public void testENT_REPAIR_AO_CREATE_BEFORE_BP_START() throws Exception {
+        
+        log.info("Starting testENT_REPAIR_AO_CREATE_BEFORE_BP_START");
+        
         test.withException(new TestWithExceptionCallback() {
             @Override
             public void doTest() throws EntitlementRepairException, EntitlementUserApiException {
@@ -254,6 +273,9 @@ public class TestRepairWithError extends TestApiBaseRepair {
     
     @Test(groups={"fast"})
     public void testENT_REPAIR_NEW_EVENT_BEFORE_LAST_AO_REMAINING() throws Exception {
+        
+        log.info("Starting testENT_REPAIR_NEW_EVENT_BEFORE_LAST_AO_REMAINING");
+        
         test.withException(new TestWithExceptionCallback() {
             @Override
             public void doTest() throws EntitlementRepairException, EntitlementUserApiException {
@@ -298,6 +320,9 @@ public class TestRepairWithError extends TestApiBaseRepair {
 
     @Test(groups={"fast"})
     public void testENT_REPAIR_BP_RECREATE_MISSING_AO() throws Exception {
+        
+        log.info("Starting testENT_REPAIR_BP_RECREATE_MISSING_AO");
+        
         test.withException(new TestWithExceptionCallback() {
             @Override
             public void doTest() throws EntitlementRepairException, EntitlementUserApiException {
@@ -337,6 +362,9 @@ public class TestRepairWithError extends TestApiBaseRepair {
     //
     @Test(groups={"fast"}, enabled=false)
     public void testENT_REPAIR_BP_RECREATE_MISSING_AO_CREATE() throws Exception {
+        
+        log.info("Starting testENT_REPAIR_BP_RECREATE_MISSING_AO_CREATE");
+        
         test.withException(new TestWithExceptionCallback() {
             @Override
             public void doTest() throws EntitlementRepairException, EntitlementUserApiException {
@@ -382,6 +410,9 @@ public class TestRepairWithError extends TestApiBaseRepair {
     
     @Test(groups={"fast"}, enabled=false)
     public void testENT_REPAIR_MISSING_AO_DELETE_EVENT() throws Exception {
+        
+        log.info("Starting testENT_REPAIR_MISSING_AO_DELETE_EVENT");
+        
         test.withException(new TestWithExceptionCallback() {
             @Override
             public void doTest() throws EntitlementRepairException, EntitlementUserApiException {
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiAddOn.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiAddOn.java
index 2780fe5..fd74c2d 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiAddOn.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiAddOn.java
@@ -30,6 +30,7 @@ import org.testng.annotations.Test;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Stage;
+import com.ning.billing.api.TestApiListener.NextEvent;
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.CatalogApiException;
 import com.ning.billing.catalog.api.Duration;
@@ -41,7 +42,6 @@ import com.ning.billing.catalog.api.PlanSpecifier;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.catalog.api.TimeUnit;
-import com.ning.billing.entitlement.api.ApiTestListener.NextEvent;
 import com.ning.billing.entitlement.api.TestApiBase;
 import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
 import com.ning.billing.entitlement.glue.MockEngineModuleSql;
@@ -57,6 +57,8 @@ public class TestUserApiAddOn extends TestApiBase {
     @Test(enabled=true, groups={"slow"})
     public void testCreateCancelAddon() {
 
+        log.info("Starting testCreateCancelAddon");
+
         try {
             String baseProduct = "Shotgun";
             BillingPeriod baseTerm = BillingPeriod.MONTHLY;
@@ -75,11 +77,12 @@ public class TestUserApiAddOn extends TestApiBase {
             aoSubscription.cancel(now, false, context);
 
             testListener.reset();
-            testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
-            assertTrue(testListener.isApiCompleted(5000));
+            testListener.pushExpectedEvent(NextEvent.CANCEL);
+            assertTrue(testListener.isCompleted(5000));
 
             assertEquals(aoSubscription.getState(), SubscriptionState.CANCELLED);
 
+            assertListenerStatus();
         } catch (Exception e) {
             Assert.fail(e.getMessage());
         }
@@ -87,6 +90,9 @@ public class TestUserApiAddOn extends TestApiBase {
 
     @Test(enabled=true, groups={"slow"})
     public void testCancelBPWithAddon() {
+
+        log.info("Starting testCancelBPWithAddon");
+
         try {
 
             String baseProduct = "Shotgun";
@@ -103,13 +109,13 @@ public class TestUserApiAddOn extends TestApiBase {
             SubscriptionData aoSubscription = createSubscription(aoProduct, aoTerm, aoPriceList);
 
             testListener.reset();
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
 
             // MOVE CLOCK AFTER TRIAL + AO DISCOUNT
             Duration twoMonths = getDurationMonth(2);
             clock.setDeltaFromReality(twoMonths, DAY_IN_MS);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             // SET CTD TO CANCEL IN FUTURE
             DateTime now = clock.getUTCNow();
@@ -129,15 +135,17 @@ public class TestUserApiAddOn extends TestApiBase {
 
             // MOVE AFTER CANCELLATION
             testListener.reset();
-            testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
-            testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
+            testListener.pushExpectedEvent(NextEvent.CANCEL);
+            testListener.pushExpectedEvent(NextEvent.CANCEL);
             clock.addDeltaFromReality(ctd);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             // REFETCH AO SUBSCRIPTION AND CHECK THIS IS CANCELLED
             aoSubscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(aoSubscription.getId());
             assertEquals(aoSubscription.getState(), SubscriptionState.CANCELLED);
 
+            assertListenerStatus();
+
         } catch (Exception e) {
             Assert.fail(e.getMessage());
         }
@@ -146,6 +154,9 @@ public class TestUserApiAddOn extends TestApiBase {
 
     @Test(enabled=true, groups={"slow"})
     public void testChangeBPWithAddonNonIncluded() {
+
+        log.info("Starting testChangeBPWithAddonNonIncluded");
+
         try {
 
             String baseProduct = "Shotgun";
@@ -162,13 +173,13 @@ public class TestUserApiAddOn extends TestApiBase {
             SubscriptionData aoSubscription = createSubscription(aoProduct, aoTerm, aoPriceList);
 
             testListener.reset();
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
 
             // MOVE CLOCK AFTER TRIAL + AO DISCOUNT
             Duration twoMonths = getDurationMonth(2);
             clock.setDeltaFromReality(twoMonths, DAY_IN_MS);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             // SET CTD TO CHANGE IN FUTURE
             DateTime now = clock.getUTCNow();
@@ -183,15 +194,16 @@ public class TestUserApiAddOn extends TestApiBase {
             String newBasePriceList = PriceListSet.DEFAULT_PRICELIST_NAME;
 
             testListener.reset();
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
-            testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CANCEL);
             baseSubscription.changePlan(newBaseProduct, newBaseTerm, newBasePriceList, now, context);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             // REFETCH AO SUBSCRIPTION AND CHECK THIS CANCELLED
             aoSubscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(aoSubscription.getId());
             assertEquals(aoSubscription.getState(), SubscriptionState.CANCELLED);
 
+            assertListenerStatus();
         } catch (Exception e) {
             Assert.fail(e.getMessage());
         }
@@ -199,6 +211,9 @@ public class TestUserApiAddOn extends TestApiBase {
 
     @Test(enabled=true, groups={"slow"})
     public void testChangeBPWithAddonNonAvailable() {
+
+        log.info("Starting testChangeBPWithAddonNonAvailable");
+
         try {
 
             String baseProduct = "Shotgun";
@@ -215,13 +230,13 @@ public class TestUserApiAddOn extends TestApiBase {
             SubscriptionData aoSubscription = createSubscription(aoProduct, aoTerm, aoPriceList);
 
             testListener.reset();
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
 
             // MOVE CLOCK AFTER TRIAL + AO DISCOUNT
             Duration twoMonths = getDurationMonth(2);
             clock.setDeltaFromReality(twoMonths, DAY_IN_MS);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             // SET CTD TO CANCEL IN FUTURE
             DateTime now = clock.getUTCNow();
@@ -245,16 +260,17 @@ public class TestUserApiAddOn extends TestApiBase {
 
             // MOVE AFTER CHANGE
             testListener.reset();
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
-            testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CANCEL);
             clock.addDeltaFromReality(ctd);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
 
             // REFETCH AO SUBSCRIPTION AND CHECK THIS CANCELLED
             aoSubscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(aoSubscription.getId());
             assertEquals(aoSubscription.getState(), SubscriptionState.CANCELLED);
 
+            assertListenerStatus();
         } catch (Exception e) {
             Assert.fail(e.getMessage());
         }
@@ -263,6 +279,9 @@ public class TestUserApiAddOn extends TestApiBase {
 
     @Test(enabled=true, groups={"slow"})
     public void testAddonCreateWithBundleAlign() {
+
+        log.info("Starting testAddonCreateWithBundleAlign");
+
         try {
             String aoProduct = "Telescopic-Scope";
             BillingPeriod aoTerm = BillingPeriod.MONTHLY;
@@ -278,6 +297,7 @@ public class TestUserApiAddOn extends TestApiBase {
 
             testAddonCreateInternal(aoProduct, aoTerm, aoPriceList, alignement);
 
+            assertListenerStatus();
         } catch (CatalogApiException e) {
             Assert.fail(e.getMessage());
         }
@@ -287,6 +307,8 @@ public class TestUserApiAddOn extends TestApiBase {
     @Test(enabled=true, groups={"slow"})
     public void testAddonCreateWithSubscriptionAlign() {
 
+        log.info("Starting testAddonCreateWithSubscriptionAlign");
+
         try {
             String aoProduct = "Laser-Scope";
             BillingPeriod aoTerm = BillingPeriod.MONTHLY;
@@ -302,13 +324,15 @@ public class TestUserApiAddOn extends TestApiBase {
 
             testAddonCreateInternal(aoProduct, aoTerm, aoPriceList, alignement);
 
-            } catch (CatalogApiException e) {
-                Assert.fail(e.getMessage());
-            }
+            assertListenerStatus();
+        } catch (CatalogApiException e) {
+            Assert.fail(e.getMessage());
+        }
     }
 
 
     private void testAddonCreateInternal(String aoProduct, BillingPeriod aoTerm, String aoPriceList, PlanAlignmentCreate expAlignement) {
+
         try {
 
             String baseProduct = "Shotgun";
@@ -338,73 +362,73 @@ public class TestUserApiAddOn extends TestApiBase {
             assertNotNull(aoCurrentPhase);
             assertEquals(aoCurrentPhase.getPhaseType(), PhaseType.DISCOUNT);
 
-           assertDateWithin(aoSubscription.getStartDate(), beforeAOCreation, afterAOCreation);
-           assertEquals(aoSubscription.getBundleStartDate(), baseSubscription.getBundleStartDate());
-
-           // CHECK next AO PHASE EVENT IS INDEED A MONTH AFTER BP STARTED => BUNDLE ALIGNMENT
-           SubscriptionEvent aoPendingTranstion = aoSubscription.getPendingTransition();
-
-           if (expAlignement == PlanAlignmentCreate.START_OF_BUNDLE) {
-               assertEquals(aoPendingTranstion.getEffectiveTransitionTime(), baseSubscription.getStartDate().plusMonths(1));
-           } else {
-               assertEquals(aoPendingTranstion.getEffectiveTransitionTime(), aoSubscription.getStartDate().plusMonths(1));
-           }
-
-           // ADD TWO PHASE EVENTS (BP + AO)
-           testListener.reset();
-           testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
-           testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
-
-           // MOVE THROUGH TIME TO GO INTO EVERGREEN
-           
-           // Talk with Stephane about this fix. It seemed that the add on phase change was not appearing in the queue
-           // hypothesis is that waiting a period that is exactly the duration of the phase might be an instant too short
-           // depending how the comparison works
-           
-           //someTimeLater = aoCurrentPhase.getDuration();
-           someTimeLater = new Duration() {
+            assertDateWithin(aoSubscription.getStartDate(), beforeAOCreation, afterAOCreation);
+            assertEquals(aoSubscription.getBundleStartDate(), baseSubscription.getBundleStartDate());
+
+            // CHECK next AO PHASE EVENT IS INDEED A MONTH AFTER BP STARTED => BUNDLE ALIGNMENT
+            SubscriptionEvent aoPendingTranstion = aoSubscription.getPendingTransition();
+
+            if (expAlignement == PlanAlignmentCreate.START_OF_BUNDLE) {
+                assertEquals(aoPendingTranstion.getEffectiveTransitionTime(), baseSubscription.getStartDate().plusMonths(1));
+            } else {
+                assertEquals(aoPendingTranstion.getEffectiveTransitionTime(), aoSubscription.getStartDate().plusMonths(1));
+            }
+
+            // ADD TWO PHASE EVENTS (BP + AO)
+            testListener.reset();
+            testListener.pushExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
+
+            // MOVE THROUGH TIME TO GO INTO EVERGREEN
+
+            // Talk with Stephane about this fix. It seemed that the add on phase change was not appearing in the queue
+            // hypothesis is that waiting a period that is exactly the duration of the phase might be an instant too short
+            // depending how the comparison works
+
+            //someTimeLater = aoCurrentPhase.getDuration();
+            someTimeLater = new Duration() {
                 @Override
                 public TimeUnit getUnit() {
-                   return TimeUnit.DAYS;
+                    return TimeUnit.DAYS;
                 }
 
                 @Override
                 public int getNumber() {
-                   return 32;
+                    return 32;
                 }
 
                 @Override
                 public DateTime addToDateTime(DateTime dateTime) {
-                   throw new NotImplementedException();
+                    throw new NotImplementedException();
                 }
                 @Override
                 public Period toJodaPeriod() {
                     throw new UnsupportedOperationException();
                 }
-           };
-           
-           clock.addDeltaFromReality(someTimeLater);
-           clock.addDeltaFromReality(getDurationDay(1));
-           assertTrue(testListener.isApiCompleted(5000));
+            };
 
+            clock.addDeltaFromReality(someTimeLater);
+            clock.addDeltaFromReality(getDurationDay(1));
+            assertTrue(testListener.isCompleted(5000));
 
-           // CHECK EVERYTHING AGAIN
-           aoSubscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(aoSubscription.getId());
 
-           aoCurrentPlan = aoSubscription.getCurrentPlan();
-           assertNotNull(aoCurrentPlan);
-           assertEquals(aoCurrentPlan.getProduct().getName(),aoProduct);
-           assertEquals(aoCurrentPlan.getProduct().getCategory(), ProductCategory.ADD_ON);
-           assertEquals(aoCurrentPlan.getBillingPeriod(), aoTerm);
+            // CHECK EVERYTHING AGAIN
+            aoSubscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(aoSubscription.getId());
+
+            aoCurrentPlan = aoSubscription.getCurrentPlan();
+            assertNotNull(aoCurrentPlan);
+            assertEquals(aoCurrentPlan.getProduct().getName(),aoProduct);
+            assertEquals(aoCurrentPlan.getProduct().getCategory(), ProductCategory.ADD_ON);
+            assertEquals(aoCurrentPlan.getBillingPeriod(), aoTerm);
 
-           aoCurrentPhase = aoSubscription.getCurrentPhase();
-           assertNotNull(aoCurrentPhase);
-           assertEquals(aoCurrentPhase.getPhaseType(), PhaseType.EVERGREEN);
+            aoCurrentPhase = aoSubscription.getCurrentPhase();
+            assertNotNull(aoCurrentPhase);
+            assertEquals(aoCurrentPhase.getPhaseType(), PhaseType.EVERGREEN);
 
 
-           aoSubscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(aoSubscription.getId());
-           aoPendingTranstion = aoSubscription.getPendingTransition();
-           assertNull(aoPendingTranstion);
+            aoSubscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(aoSubscription.getId());
+            aoPendingTranstion = aoSubscription.getPendingTransition();
+            assertNull(aoPendingTranstion);
 
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancel.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancel.java
index 09046f1..0b44a23 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancel.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancel.java
@@ -24,13 +24,13 @@ import static org.testng.Assert.assertTrue;
 import org.joda.time.DateTime;
 import org.testng.Assert;
 
+import com.ning.billing.api.TestApiListener.NextEvent;
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.Duration;
 import com.ning.billing.catalog.api.PhaseType;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.PlanPhase;
 import com.ning.billing.catalog.api.PriceListSet;
-import com.ning.billing.entitlement.api.ApiTestListener.NextEvent;
 import com.ning.billing.entitlement.api.TestApiBase;
 import com.ning.billing.entitlement.api.billing.EntitlementBillingApiException;
 import com.ning.billing.util.clock.DefaultClock;
@@ -59,16 +59,17 @@ public abstract class TestUserApiCancel extends TestApiBase {
             clock.setDeltaFromReality(moveALittleInTime, 0);
 
             DateTime future = clock.getUTCNow();
-            testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
+            testListener.pushExpectedEvent(NextEvent.CANCEL);
 
             // CANCEL in trial period to get IMM policy
             subscription.cancel(clock.getUTCNow(), false, context);
             currentPhase = subscription.getCurrentPhase();
-            testListener.isApiCompleted(1000);
+            testListener.isCompleted(3000);
 
             assertNull(currentPhase);
             checkNextPhaseChange(subscription, 0, null);
 
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
         }
@@ -94,9 +95,9 @@ public abstract class TestUserApiCancel extends TestApiBase {
             checkNextPhaseChange(subscription, 1, expectedPhaseTrialChange);
 
             // MOVE TO NEXT PHASE
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.setDeltaFromReality(trialPhase.getDuration(), DAY_IN_MS);
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
             trialPhase = subscription.getCurrentPhase();
             assertEquals(trialPhase.getPhaseType(), PhaseType.EVERGREEN);
 
@@ -106,21 +107,22 @@ public abstract class TestUserApiCancel extends TestApiBase {
             billingApi.setChargedThroughDate(subscription.getId(), newChargedThroughDate, context);
             subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(subscription.getId());
 
-            testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
+            testListener.pushExpectedEvent(NextEvent.CANCEL);
 
             // CANCEL
             subscription.cancel(clock.getUTCNow(), false, context);
-            assertFalse(testListener.isApiCompleted(2000));
+            assertFalse(testListener.isCompleted(5000));
 
             // MOVE TO EOT + RECHECK
             clock.addDeltaFromReality(ctd);
             DateTime future = clock.getUTCNow();
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
 
             PlanPhase currentPhase = subscription.getCurrentPhase();
             assertNull(currentPhase);
             checkNextPhaseChange(subscription, 0, null);
 
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
         }
@@ -147,22 +149,23 @@ public abstract class TestUserApiCancel extends TestApiBase {
             checkNextPhaseChange(subscription, 1, expectedPhaseTrialChange);
 
             // MOVE TO NEXT PHASE
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.setDeltaFromReality(trialPhase.getDuration(), DAY_IN_MS);
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
             trialPhase = subscription.getCurrentPhase();
             assertEquals(trialPhase.getPhaseType(), PhaseType.EVERGREEN);
 
-            testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
+            testListener.pushExpectedEvent(NextEvent.CANCEL);
 
             // CANCEL
             subscription.cancel(clock.getUTCNow(), false, context);
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
 
             PlanPhase currentPhase = subscription.getCurrentPhase();
             assertNull(currentPhase);
             checkNextPhaseChange(subscription, 0, null);
 
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
         }
@@ -191,9 +194,9 @@ public abstract class TestUserApiCancel extends TestApiBase {
             checkNextPhaseChange(subscription, 1, expectedPhaseTrialChange);
 
             // MOVE TO NEXT PHASE
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.setDeltaFromReality(trialPhase.getDuration(), DAY_IN_MS);
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
             PlanPhase currentPhase = subscription.getCurrentPhase();
             assertEquals(currentPhase.getPhaseType(), PhaseType.EVERGREEN);
 
@@ -203,27 +206,24 @@ public abstract class TestUserApiCancel extends TestApiBase {
             billingApi.setChargedThroughDate(subscription.getId(), newChargedThroughDate, context);
             subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(subscription.getId());
 
-            testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
-
-            // CANCEL
+            // CANCEL EOT
             subscription.cancel(clock.getUTCNow(), false, context);
-            assertFalse(testListener.isApiCompleted(2000));
 
             subscription.uncancel(context);
-
+            
             // MOVE TO EOT + RECHECK
+            testListener.pushExpectedEvent(NextEvent.UNCANCEL);
             clock.addDeltaFromReality(ctd);
-            DateTime future = clock.getUTCNow();
-            assertFalse(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
 
             Plan currentPlan = subscription.getCurrentPlan();
             assertEquals(currentPlan.getProduct().getName(), prod);
             currentPhase = subscription.getCurrentPhase();
             assertEquals(currentPhase.getPhaseType(), PhaseType.EVERGREEN);
 
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
         }
     }
-
 }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
index b5b98e4..81a97a6 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
@@ -27,6 +27,7 @@ import java.util.List;
 import org.joda.time.DateTime;
 import org.testng.Assert;
 
+import com.ning.billing.api.TestApiListener.NextEvent;
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.Duration;
 import com.ning.billing.catalog.api.PhaseType;
@@ -34,7 +35,6 @@ import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.PlanPhase;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.ApiTestListener.NextEvent;
 import com.ning.billing.entitlement.api.TestApiBase;
 import com.ning.billing.entitlement.api.billing.EntitlementBillingApiException;
 import com.ning.billing.entitlement.events.EntitlementEvent;
@@ -69,7 +69,7 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
     private void tChangePlanBundleAlignEOTWithNoChargeThroughDate(String fromProd, BillingPeriod fromTerm, String fromPlanSet,
         String toProd, BillingPeriod toTerm, String toPlanSet) {
 
-        log.info("Starting testChangePlanBundleAlignEOTWithNoChargeThroughDateReal");
+        log.info("Starting testChangePlanBundleAlignEOTWithNoChargeThroughDate");
 
         try {
 
@@ -78,22 +78,23 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
 
             // MOVE TO NEXT PHASE
             PlanPhase currentPhase = subscription.getCurrentPhase();
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.setDeltaFromReality(currentPhase.getDuration(), DAY_IN_MS);
             DateTime futureNow = clock.getUTCNow();
             DateTime nextExpectedPhaseChange = DefaultClock.addDuration(subscription.getStartDate(), currentPhase.getDuration());
             assertTrue(futureNow.isAfter(nextExpectedPhaseChange));
-            assertTrue(testListener.isApiCompleted(3000));
+            assertTrue(testListener.isCompleted(5000));
 
             // CHANGE PLAN
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             subscription.changePlan(toProd, toTerm, toPlanSet, clock.getUTCNow(), context);
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
 
             // CHECK CHANGE PLAN
             currentPhase = subscription.getCurrentPhase();
             checkChangePlan(subscription, toProd, ProductCategory.BASE, toTerm, PhaseType.EVERGREEN);
 
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
         }
@@ -101,13 +102,14 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
 
 
     protected void testChangePlanBundleAlignEOTWithChargeThroughDate() throws EntitlementBillingApiException {
+        log.info("Starting testChangePlanBundleAlignEOTWithChargeThroughDate");
         testChangePlanBundleAlignEOTWithChargeThroughDate("Shotgun", BillingPeriod.ANNUAL, "gunclubDiscount", "Pistol", BillingPeriod.ANNUAL, "gunclubDiscount");
     }
 
     private void testChangePlanBundleAlignEOTWithChargeThroughDate(String fromProd, BillingPeriod fromTerm, String fromPlanSet,
             String toProd, BillingPeriod toTerm, String toPlanSet) throws EntitlementBillingApiException {
 
-        log.info("Starting testChangeSubscriptionEOTWithChargeThroughDate");
+        
         try {
 
             // CREATE
@@ -118,9 +120,9 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
 
 
             // MOVE TO NEXT PHASE
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.setDeltaFromReality(trialPhase.getDuration(), DAY_IN_MS);
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
             PlanPhase currentPhase = subscription.getCurrentPhase();
             assertEquals(currentPhase.getPhaseType(), PhaseType.DISCOUNT);
 
@@ -131,10 +133,10 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
             billingApi.setChargedThroughDate(subscription.getId(), newChargedThroughDate, context);
 
             // RE READ SUBSCRIPTION + CHANGE PLAN
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(subscription.getId());
             subscription.changePlan(toProd, toTerm, toPlanSet, clock.getUTCNow(), context);
-            assertFalse(testListener.isApiCompleted(2000));
+            assertFalse(testListener.isCompleted(5000));
             testListener.reset();
 
             // CHECK CHANGE PLAN
@@ -151,14 +153,15 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
 
 
             // MOVE TO EOT
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             clock.addDeltaFromReality(ctd);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(subscription.getId());
             currentPhase = subscription.getCurrentPhase();
             checkChangePlan(subscription, toProd, ProductCategory.BASE, toTerm, PhaseType.DISCOUNT);
 
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
         }
@@ -179,7 +182,7 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
 
             SubscriptionData subscription = createSubscription(fromProd, fromTerm, fromPlanSet);
 
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
 
             Duration moveALittleInTime = getDurationDay(3);
             clock.setDeltaFromReality(moveALittleInTime, 0);
@@ -188,28 +191,21 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
             subscription.changePlan(toProd, toTerm, toPlanSet, clock.getUTCNow(), context);
             checkChangePlan(subscription, toProd, ProductCategory.BASE, toTerm, PhaseType.TRIAL);
 
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
 
             PlanPhase currentPhase = subscription.getCurrentPhase();
             DateTime nextExpectedPhaseChange = DefaultClock.addDuration(subscription.getStartDate(), currentPhase.getDuration());
             checkNextPhaseChange(subscription, 1, nextExpectedPhaseChange);
 
             // NEXT PHASE
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.addDeltaFromReality(currentPhase.getDuration());
             DateTime futureNow = clock.getUTCNow();
 
-            /*
-            try {
-                Thread.sleep(1000 * 3000);
-            } catch (Exception e) {
-
-            }
-            */
-
             assertTrue(futureNow.isAfter(nextExpectedPhaseChange));
-            assertTrue(testListener.isApiCompleted(3000));
+            assertTrue(testListener.isCompleted(5000));
 
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
         }
@@ -217,14 +213,13 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
 
 
     protected void testChangePlanChangePlanAlignEOTWithChargeThroughDate() throws EntitlementBillingApiException {
+        log.info("Starting testChangePlanChangePlanAlignEOTWithChargeThroughDate");
         tChangePlanChangePlanAlignEOTWithChargeThroughDate("Shotgun", BillingPeriod.ANNUAL, PriceListSet.DEFAULT_PRICELIST_NAME, "Assault-Rifle", BillingPeriod.ANNUAL, "rescue");
     }
 
     private void tChangePlanChangePlanAlignEOTWithChargeThroughDate(String fromProd, BillingPeriod fromTerm, String fromPlanSet,
             String toProd, BillingPeriod toTerm, String toPlanSet) throws EntitlementBillingApiException {
 
-        log.info("Starting testChangePlanBundleAlignEOTWithChargeThroughDate");
-
         try {
 
             DateTime currentTime = clock.getUTCNow();
@@ -235,11 +230,11 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
             assertEquals(trialPhase.getPhaseType(), PhaseType.TRIAL);
 
             // MOVE TO NEXT PHASE
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             currentTime = clock.getUTCNow();
             clock.setDeltaFromReality(trialPhase.getDuration(), DAY_IN_MS);
             currentTime = clock.getUTCNow();
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
 
             // SET CTD
             Duration ctd = getDurationMonth(1);
@@ -255,18 +250,18 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
             // CHANGE PLAN
             currentTime = clock.getUTCNow();
 
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             subscription.changePlan(toProd, toTerm, toPlanSet, clock.getUTCNow(), context);
 
             checkChangePlan(subscription, fromProd, ProductCategory.BASE, fromTerm, PhaseType.EVERGREEN);
 
             // CHECK CHANGE DID NOT KICK IN YET
-            assertFalse(testListener.isApiCompleted(2000));
+            assertFalse(testListener.isCompleted(5000));
 
             // MOVE TO AFTER CTD
             clock.addDeltaFromReality(ctd);
             currentTime = clock.getUTCNow();
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
 
             // CHECK CORRECT PRODUCT, PHASE, PLAN SET
             String currentProduct =  subscription.getCurrentPlan().getProduct().getName();
@@ -276,13 +271,13 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
             assertNotNull(currentPhase);
             assertEquals(currentPhase.getPhaseType(), PhaseType.DISCOUNT);
 
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
 
             // MOVE TIME ABOUT ONE MONTH BEFORE NEXT EXPECTED PHASE CHANGE
             clock.addDeltaFromReality(getDurationMonth(11));
 
             currentTime = clock.getUTCNow();
-            assertFalse(testListener.isApiCompleted(2000));
+            assertFalse(testListener.isCompleted(5000));
 
             DateTime nextExpectedPhaseChange = DefaultClock.addDuration(newChargedThroughDate, currentPhase.getDuration());
             checkNextPhaseChange(subscription, 1, nextExpectedPhaseChange);
@@ -290,8 +285,9 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
             // MOVE TIME RIGHT AFTER NEXT EXPECTED PHASE CHANGE
             clock.addDeltaFromReality(getDurationMonth(1));
             currentTime = clock.getUTCNow();
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
 
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
         }
@@ -299,15 +295,16 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
 
     protected void testMultipleChangeLastIMM() throws EntitlementBillingApiException {
 
+        log.info("Starting testMultipleChangeLastIMM");
         try {
             SubscriptionData subscription = createSubscription("Assault-Rifle", BillingPeriod.MONTHLY, "gunclubDiscount");
             PlanPhase trialPhase = subscription.getCurrentPhase();
             assertEquals(trialPhase.getPhaseType(), PhaseType.TRIAL);
 
             // MOVE TO NEXT PHASE
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.setDeltaFromReality(trialPhase.getDuration(), DAY_IN_MS);
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
 
             // SET CTD
             List<Duration> durationList = new ArrayList<Duration>();
@@ -320,14 +317,14 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
             subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(subscription.getId());
 
             // CHANGE EOT
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             subscription.changePlan("Pistol", BillingPeriod.MONTHLY, "gunclubDiscount", clock.getUTCNow(), context);
-            assertFalse(testListener.isApiCompleted(2000));
+            assertFalse(testListener.isCompleted(5000));
 
             // CHANGE
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             subscription.changePlan("Assault-Rifle", BillingPeriod.ANNUAL, "gunclubDiscount", clock.getUTCNow(), context);
-            assertFalse(testListener.isApiCompleted(2000));
+            assertFalse(testListener.isCompleted(5000));
 
             Plan currentPlan = subscription.getCurrentPlan();
             assertNotNull(currentPlan);
@@ -338,7 +335,8 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
             PlanPhase currentPhase = subscription.getCurrentPhase();
             assertNotNull(currentPhase);
             assertEquals(currentPhase.getPhaseType(), PhaseType.DISCOUNT);
-
+            
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
         }
@@ -346,15 +344,16 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
 
     protected void testMultipleChangeLastEOT() throws EntitlementBillingApiException {
 
+        log.info("Starting testMultipleChangeLastEOT");
         try {
 
             SubscriptionData subscription = createSubscription("Assault-Rifle", BillingPeriod.ANNUAL, "gunclubDiscount");
             PlanPhase trialPhase = subscription.getCurrentPhase();
             assertEquals(trialPhase.getPhaseType(), PhaseType.TRIAL);
 
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.setDeltaFromReality(trialPhase.getDuration(), DAY_IN_MS);
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
 
             // SET CTD
             List<Duration> durationList = new ArrayList<Duration>();
@@ -366,15 +365,15 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
             subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(subscription.getId());
 
             // CHANGE EOT
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             subscription.changePlan("Shotgun", BillingPeriod.MONTHLY, "gunclubDiscount", clock.getUTCNow(), context);
-            assertFalse(testListener.isApiCompleted(2000));
+            assertFalse(testListener.isCompleted(5000));
             testListener.reset();
 
             // CHANGE EOT
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             subscription.changePlan("Pistol", BillingPeriod.ANNUAL, "gunclubDiscount", clock.getUTCNow(), context);
-            assertFalse(testListener.isApiCompleted(2000));
+            assertFalse(testListener.isCompleted(5000));
             testListener.reset();
 
             // CHECK NO CHANGE OCCURED YET
@@ -389,9 +388,9 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
             assertEquals(currentPhase.getPhaseType(), PhaseType.DISCOUNT);
 
             // ACTIVATE CHNAGE BY MOVING AFTER CTD
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             clock.addDeltaFromReality(ctd);
-            assertTrue(testListener.isApiCompleted(3000));
+            assertTrue(testListener.isCompleted(5000));
 
             currentPlan = subscription.getCurrentPlan();
             assertNotNull(currentPlan);
@@ -406,9 +405,9 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
 
 
             // MOVE TO NEXT PHASE
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.addDeltaFromReality(currentPhase.getDuration());
-            assertTrue(testListener.isApiCompleted(3000));
+            assertTrue(testListener.isCompleted(5000));
             subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(subscription.getId());
 
             currentPlan = subscription.getCurrentPlan();
@@ -421,7 +420,7 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
             assertNotNull(currentPhase);
             assertEquals(currentPhase.getPhaseType(), PhaseType.EVERGREEN);
 
-
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
         }
@@ -429,6 +428,9 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
 
 
     protected void testCorrectPhaseAlignmentOnChange() {
+        
+        log.info("Starting testCorrectPhaseAlignmentOnChange");
+        
         try {
 
             SubscriptionData subscription = createSubscription("Shotgun", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME);
@@ -440,9 +442,9 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
 
             // CHANGE IMMEDIATE TO A 3 PHASES PLAN
             testListener.reset();
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             subscription.changePlan("Assault-Rifle", BillingPeriod.ANNUAL, "gunclubDiscount", clock.getUTCNow(), context);
-            assertTrue(testListener.isApiCompleted(3000));
+            assertTrue(testListener.isCompleted(5000));
             testListener.reset();
 
             // CHECK EVERYTHING LOOKS CORRECT
@@ -456,9 +458,9 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
             assertEquals(trialPhase.getPhaseType(), PhaseType.TRIAL);
 
             // MOVE AFTER TRIAL PERIOD -> DISCOUNT
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.addDeltaFromReality(trialPhase.getDuration());
-            assertTrue(testListener.isApiCompleted(3000));
+            assertTrue(testListener.isCompleted(5000));
 
             trialPhase = subscription.getCurrentPhase();
             assertEquals(trialPhase.getPhaseType(), PhaseType.DISCOUNT);
@@ -471,6 +473,7 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
 
             assertEquals(nextPhaseEffectiveDate, expectedNextPhaseDate);
 
+            assertListenerStatus();
 
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlanSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlanSql.java
index 509b056..1039b85 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlanSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlanSql.java
@@ -33,7 +33,7 @@ public class TestUserApiChangePlanSql extends TestUserApiChangePlan {
         return Guice.createInjector(Stage.DEVELOPMENT, new MockEngineModuleSql());
     }
 
-    @Test(enabled= true, groups={"stress"})
+    @Test(enabled= false, groups={"stress"})
     public void stressTest() throws Exception {
         for (int i = 0; i < MAX_STRESS_ITERATIONS; i++) {
             cleanupTest();
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCreate.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCreate.java
index ef4533d..8af0400 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCreate.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCreate.java
@@ -28,13 +28,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 
+import com.ning.billing.api.TestApiListener.NextEvent;
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.PhaseType;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.PlanPhase;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.ApiTestListener.NextEvent;
 import com.ning.billing.entitlement.api.TestApiBase;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.phase.PhaseEvent;
@@ -56,8 +56,8 @@ public abstract class TestUserApiCreate extends TestApiBase {
             String planSetName = PriceListSet.DEFAULT_PRICELIST_NAME;
 
 
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
-            testListener.pushNextApiExpectedEvent(NextEvent.CREATE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.CREATE);
 
 
             SubscriptionData subscription = (SubscriptionData) entitlementApi.createSubscription(bundle.getId(),
@@ -69,8 +69,10 @@ public abstract class TestUserApiCreate extends TestApiBase {
             assertEquals(subscription.getBundleId(), bundle.getId());
             assertEquals(subscription.getStartDate(), requestedDate);
 
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
+            assertListenerStatus();
+            
         } catch (EntitlementUserApiException e) {
         	log.error("Unexpected exception",e);
             Assert.fail(e.getMessage());
@@ -89,7 +91,7 @@ public abstract class TestUserApiCreate extends TestApiBase {
             BillingPeriod term = BillingPeriod.MONTHLY;
             String planSetName = PriceListSet.DEFAULT_PRICELIST_NAME;
 
-            testListener.pushNextApiExpectedEvent(NextEvent.CREATE);
+            testListener.pushExpectedEvent(NextEvent.CREATE);
 
             SubscriptionData subscription = (SubscriptionData) entitlementApi.createSubscription(bundle.getId(),
                     getProductSpecifier(productName, planSetName, term, PhaseType.EVERGREEN), clock.getUTCNow(), context);
@@ -111,6 +113,8 @@ public abstract class TestUserApiCreate extends TestApiBase {
             assertNotNull(currentPhase);
             assertEquals(currentPhase.getPhaseType(), PhaseType.EVERGREEN);
 
+            assertListenerStatus();
+            
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
         }
@@ -127,7 +131,7 @@ public abstract class TestUserApiCreate extends TestApiBase {
             BillingPeriod term = BillingPeriod.MONTHLY;
             String planSetName = PriceListSet.DEFAULT_PRICELIST_NAME;
 
-            testListener.pushNextApiExpectedEvent(NextEvent.CREATE);
+            testListener.pushExpectedEvent(NextEvent.CREATE);
 
             SubscriptionData subscription = (SubscriptionData) entitlementApi.createSubscription(bundle.getId(),
                     getProductSpecifier(productName, planSetName, term, null),
@@ -149,7 +153,7 @@ public abstract class TestUserApiCreate extends TestApiBase {
             PlanPhase currentPhase = subscription.getCurrentPhase();
             assertNotNull(currentPhase);
             assertEquals(currentPhase.getPhaseType(), PhaseType.TRIAL);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             List<EntitlementEvent> events = dao.getPendingEventsForSubscription(subscription.getId());
             assertNotNull(events);
@@ -160,15 +164,16 @@ public abstract class TestUserApiCreate extends TestApiBase {
             DateTime nextExpectedPhaseChange = DefaultClock.addDuration(subscription.getStartDate(), currentPhase.getDuration());
             assertEquals(nextPhaseChange, nextExpectedPhaseChange);
 
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
 
             clock.setDeltaFromReality(currentPhase.getDuration(), DAY_IN_MS);
 
             DateTime futureNow = clock.getUTCNow();
             assertTrue(futureNow.isAfter(nextPhaseChange));
 
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
         }
@@ -184,7 +189,7 @@ public abstract class TestUserApiCreate extends TestApiBase {
             BillingPeriod term = BillingPeriod.ANNUAL;
             String planSetName = "gunclubDiscount";
 
-            testListener.pushNextApiExpectedEvent(NextEvent.CREATE);
+            testListener.pushExpectedEvent(NextEvent.CREATE);
 
             // CREATE SUBSCRIPTION
             SubscriptionData subscription = (SubscriptionData) entitlementApi.createSubscription(bundle.getId(),
@@ -194,27 +199,27 @@ public abstract class TestUserApiCreate extends TestApiBase {
             PlanPhase currentPhase = subscription.getCurrentPhase();
             assertNotNull(currentPhase);
             assertEquals(currentPhase.getPhaseType(), PhaseType.TRIAL);
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
 
             // MOVE TO DISCOUNT PHASE
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.setDeltaFromReality(currentPhase.getDuration(), DAY_IN_MS);
             currentPhase = subscription.getCurrentPhase();
             assertNotNull(currentPhase);
             assertEquals(currentPhase.getPhaseType(), PhaseType.DISCOUNT);
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
 
             // MOVE TO EVERGREEN PHASE + RE-READ SUBSCRIPTION
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.addDeltaFromReality(currentPhase.getDuration());
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
 
             subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(subscription.getId());
             currentPhase = subscription.getCurrentPhase();
             assertNotNull(currentPhase);
             assertEquals(currentPhase.getPhaseType(), PhaseType.EVERGREEN);
 
-
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
         }
@@ -229,17 +234,15 @@ public abstract class TestUserApiCreate extends TestApiBase {
             BillingPeriod term = BillingPeriod.ANNUAL;
             String planSetName = PriceListSet.DEFAULT_PRICELIST_NAME;
 
-            testListener.pushNextApiExpectedEvent(NextEvent.CREATE);
+            testListener.pushExpectedEvent(NextEvent.CREATE);
 
             SubscriptionData subscription = (SubscriptionData) entitlementApi.createSubscription(bundle.getId(),
                     getProductSpecifier(productName, planSetName, term, null), clock.getUTCNow(), context);
             assertNotNull(subscription);
 
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
         }
     }
-
-
-
 }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiDemos.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiDemos.java
index fdd6e66..2fbd967 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiDemos.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiDemos.java
@@ -32,13 +32,13 @@ import org.testng.annotations.Test;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Stage;
+import com.ning.billing.api.TestApiListener.NextEvent;
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.Duration;
 import com.ning.billing.catalog.api.PhaseType;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.PlanPhase;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.ApiTestListener.NextEvent;
 import com.ning.billing.entitlement.api.TestApiBase;
 import com.ning.billing.entitlement.api.billing.EntitlementBillingApiException;
 import com.ning.billing.entitlement.glue.MockEngineModuleSql;
@@ -69,6 +69,7 @@ public class TestUserApiDemos extends TestApiBase {
     @Test(enabled=true, groups="demos")
     public void testDemo1() throws EntitlementBillingApiException {
 
+        log.info("Starting testSubscriptionWithAddOn");
         try {
             System.out.println("DEMO 1 START");
 
@@ -80,16 +81,16 @@ public class TestUserApiDemos extends TestApiBase {
             displayState(subscription.getId(), "STEP 1. CREATED SUBSCRIPTION");
 
             /* STEP 2. CHANGE PLAN WHILE IN TRIAL */
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             subscription.changePlan("Assault-Rifle", BillingPeriod.ANNUAL, "gunclubDiscount", clock.getUTCNow(), context);
-            assertTrue(testListener.isApiCompleted(3000));
+            assertTrue(testListener.isCompleted(5000));
 
             displayState(subscription.getId(), "STEP 2. CHANGED PLAN WHILE IN TRIAL");
 
             /* STEP 3. MOVE TO DISCOUNT PHASE */
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.setDeltaFromReality(trialPhase.getDuration(), DAY_IN_MS);
-            assertTrue(testListener.isApiCompleted(3000));
+            assertTrue(testListener.isCompleted(5000));
 
             displayState(subscription.getId(), "STEP 3. MOVE TO DISCOUNT PHASE");
 
@@ -103,25 +104,25 @@ public class TestUserApiDemos extends TestApiBase {
             billingApi.setChargedThroughDate(subscription.getId(), newChargedThroughDate, context);
             subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(subscription.getId());
 
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             subscription.changePlan("Shotgun", BillingPeriod.ANNUAL, "gunclubDiscount", clock.getUTCNow(), context);
-            assertFalse(testListener.isApiCompleted(2000));
+            assertFalse(testListener.isCompleted(5000));
             testListener.reset();
 
             displayState(subscription.getId(), "STEP 4. SET CTD AND CHANGE PLAN EOT (Shotgun)");
 
             /* STEP 5. CHANGE AGAIN */
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             subscription.changePlan("Pistol", BillingPeriod.ANNUAL, "gunclubDiscount", clock.getUTCNow(), context);
-            assertFalse(testListener.isApiCompleted(2000));
+            assertFalse(testListener.isCompleted(5000));
             testListener.reset();
 
             displayState(subscription.getId(), "STEP 5. CHANGE AGAIN EOT (Pistol)");
 
             /* STEP 6. MOVE TO EOT AND CHECK CHANGE OCCURED */
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             clock.addDeltaFromReality(ctd);
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
 
             Plan currentPlan = subscription.getCurrentPlan();
             assertNotNull(currentPlan);
@@ -136,9 +137,9 @@ public class TestUserApiDemos extends TestApiBase {
             displayState(subscription.getId(), "STEP 6. MOVE TO EOT");
 
             /* STEP 7.  MOVE TO NEXT PHASE */
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.addDeltaFromReality(currentPhase.getDuration());
-            assertTrue(testListener.isApiCompleted(5000));
+            assertTrue(testListener.isCompleted(5000));
             subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(subscription.getId());
 
             currentPlan = subscription.getCurrentPlan();
@@ -154,11 +155,12 @@ public class TestUserApiDemos extends TestApiBase {
             displayState(subscription.getId(), "STEP 7.  MOVE TO NEXT PHASE");
 
             /* STEP 8. CANCEL IMM (NO CTD) */
-            testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
+            testListener.pushExpectedEvent(NextEvent.CANCEL);
             subscription.cancel(clock.getUTCNow(), false, context);
 
             displayState(subscription.getId(), "STEP 8.  CANCELLATION");
 
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
         }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiError.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiError.java
index c3f6061..227ff14 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiError.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiError.java
@@ -32,11 +32,11 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Stage;
 import com.ning.billing.ErrorCode;
+import com.ning.billing.api.TestApiListener.NextEvent;
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.Duration;
 import com.ning.billing.catalog.api.PlanPhase;
 import com.ning.billing.catalog.api.PriceListSet;
-import com.ning.billing.entitlement.api.ApiTestListener.NextEvent;
 import com.ning.billing.entitlement.api.TestApiBase;
 import com.ning.billing.entitlement.glue.MockEngineModuleMemory;
 import com.ning.billing.util.clock.DefaultClock;
@@ -52,6 +52,9 @@ public class TestUserApiError extends TestApiBase {
 
     @Test(enabled=true, groups={"fast"})
     public void testCreateSubscriptionBadCatalog() {
+        
+        log.info("Starting testCreateSubscriptionBadCatalog");
+        
         // WRONG PRODUCTS
         tCreateSubscriptionInternal(bundle.getId(), null, BillingPeriod.ANNUAL, PriceListSet.DEFAULT_PRICELIST_NAME, ErrorCode.CAT_NULL_PRODUCT_NAME);
         tCreateSubscriptionInternal(bundle.getId(), "Whatever", BillingPeriod.ANNUAL, PriceListSet.DEFAULT_PRICELIST_NAME, ErrorCode.CAT_NO_SUCH_PRODUCT);
@@ -68,16 +71,19 @@ public class TestUserApiError extends TestApiBase {
 
     @Test(enabled=true, groups={"fast"})
     public void testCreateSubscriptionNoBundle() {
+        log.info("Starting testCreateSubscriptionNoBundle");
         tCreateSubscriptionInternal(null, "Shotgun", BillingPeriod.ANNUAL, PriceListSet.DEFAULT_PRICELIST_NAME, ErrorCode.ENT_CREATE_NO_BUNDLE);
     }
 
     @Test(enabled=true, groups={"fast"})
     public void testCreateSubscriptionNoBP() {
+        log.info("Starting testCreateSubscriptionNoBP");
         tCreateSubscriptionInternal(bundle.getId(), "Telescopic-Scope", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME, ErrorCode.ENT_CREATE_NO_BP);
     }
 
     @Test(enabled=true, groups={"fast"})
     public void testCreateSubscriptionBPExists() {
+        log.info("Starting testCreateSubscriptionBPExists");
         try {
             createSubscription("Shotgun", BillingPeriod.ANNUAL, PriceListSet.DEFAULT_PRICELIST_NAME);
             tCreateSubscriptionInternal(bundle.getId(), "Shotgun", BillingPeriod.ANNUAL, PriceListSet.DEFAULT_PRICELIST_NAME, ErrorCode.ENT_CREATE_BP_EXISTS);
@@ -89,6 +95,7 @@ public class TestUserApiError extends TestApiBase {
 
     @Test(enabled=true, groups={"fast"})
     public void testRecreateSubscriptionBPNotCancelled() {
+        log.info("Starting testRecreateSubscriptionBPNotCancelled");
         try {
             SubscriptionData subscription = createSubscription("Shotgun", BillingPeriod.ANNUAL, PriceListSet.DEFAULT_PRICELIST_NAME);
             try {
@@ -105,6 +112,7 @@ public class TestUserApiError extends TestApiBase {
 
     @Test(enabled=true, groups={"fast"})
     public void testCreateSubscriptionAddOnNotAvailable() {
+        log.info("Starting testCreateSubscriptionAddOnNotAvailable");
         try {
             UUID accountId = UUID.randomUUID();
             SubscriptionBundle aoBundle = entitlementApi.createBundleForAccount(accountId, "myAOBundle", context);
@@ -118,6 +126,7 @@ public class TestUserApiError extends TestApiBase {
 
     @Test(enabled=true, groups={"fast"})
     public void testCreateSubscriptionAddOnIncluded() {
+        log.info("Starting testCreateSubscriptionAddOnIncluded");
         try {
             UUID accountId = UUID.randomUUID();
             SubscriptionBundle aoBundle = entitlementApi.createBundleForAccount(accountId, "myAOBundle", context);
@@ -150,10 +159,11 @@ public class TestUserApiError extends TestApiBase {
 
     @Test(enabled=true, groups={"fast"})
     public void testChangeSubscriptionNonActive() {
+        log.info("Starting testChangeSubscriptionNonActive");
         try {
             Subscription subscription = createSubscription("Shotgun", BillingPeriod.ANNUAL, PriceListSet.DEFAULT_PRICELIST_NAME);
 
-            testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
+            testListener.pushExpectedEvent(NextEvent.CANCEL);
             subscription.cancel(clock.getUTCNow(), false, context);
             try {
                 subscription.changePlan("Pistol", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME, clock.getUTCNow(), context);
@@ -174,15 +184,16 @@ public class TestUserApiError extends TestApiBase {
 
     @Test(enabled=true, groups={"fast"})
     public void testChangeSubscriptionFutureCancelled() {
+        log.info("Starting testChangeSubscriptionFutureCancelled");
         try {
             Subscription subscription = createSubscription("Shotgun", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME);
             PlanPhase trialPhase = subscription.getCurrentPhase();
 
             // MOVE TO NEXT PHASE
             PlanPhase currentPhase = subscription.getCurrentPhase();
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.setDeltaFromReality(currentPhase.getDuration(), DAY_IN_MS);
-            assertTrue(testListener.isApiCompleted(3000));
+            assertTrue(testListener.isCompleted(5000));
 
 
             // SET CTD TO CANCEL IN FUTURE
@@ -204,6 +215,8 @@ public class TestUserApiError extends TestApiBase {
                     assertFalse(true);
                 }
             }
+            
+            assertListenerStatus();
         } catch (Exception e) {
             e.printStackTrace();
             Assert.assertFalse(true);
@@ -213,10 +226,12 @@ public class TestUserApiError extends TestApiBase {
 
     @Test(enabled=false, groups={"fast"})
     public void testCancelBadState() {
+        log.info("Starting testCancelBadState");
     }
 
     @Test(enabled=true, groups={"fast"})
     public void testUncancelBadState() {
+        log.info("Starting testUncancelBadState");
         try {
             Subscription subscription = createSubscription("Shotgun", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME);
 
@@ -230,7 +245,7 @@ public class TestUserApiError extends TestApiBase {
                     assertFalse(true);
                 }
             }
-
+            assertListenerStatus();
         } catch (Exception e) {
             e.printStackTrace();
             Assert.assertFalse(true);
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiRecreate.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiRecreate.java
index b7506dd..389ac5f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiRecreate.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiRecreate.java
@@ -25,9 +25,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 
+import com.ning.billing.api.TestApiListener.NextEvent;
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.PriceListSet;
-import com.ning.billing.entitlement.api.ApiTestListener.NextEvent;
 import com.ning.billing.entitlement.api.TestApiBase;
 
 public abstract class TestUserApiRecreate extends TestApiBase {
@@ -36,9 +36,10 @@ public abstract class TestUserApiRecreate extends TestApiBase {
 
 
     protected void testRecreateWithBPCanceledThroughSubscription() {
-        log.info("Starting testRecreateWithBPCanceled");
+        log.info("Starting testRecreateWithBPCanceledThroughSubscription");
         try {
             testCreateAndRecreate(false);
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             log.error("Unexpected exception",e);
             Assert.fail(e.getMessage());
@@ -46,9 +47,10 @@ public abstract class TestUserApiRecreate extends TestApiBase {
     }
 
     protected void testCreateWithBPCanceledFromUserApi() {
-        log.info("Starting testCreateWithBPCanceled");
+        log.info("Starting testCreateWithBPCanceledFromUserApi");
         try {
             testCreateAndRecreate(true);
+            assertListenerStatus();
         } catch (EntitlementUserApiException e) {
             log.error("Unexpected exception",e);
             Assert.fail(e.getMessage());
@@ -65,8 +67,8 @@ public abstract class TestUserApiRecreate extends TestApiBase {
         BillingPeriod term = BillingPeriod.MONTHLY;
         String planSetName = PriceListSet.DEFAULT_PRICELIST_NAME;
 
-        testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
-        testListener.pushNextApiExpectedEvent(NextEvent.CREATE);
+        testListener.pushExpectedEvent(NextEvent.PHASE);
+        testListener.pushExpectedEvent(NextEvent.CREATE);
         SubscriptionData subscription = (SubscriptionData) entitlementApi.createSubscription(bundle.getId(),
                 getProductSpecifier(productName, planSetName, term, null), requestedDate, context);
         assertNotNull(subscription);
@@ -75,7 +77,7 @@ public abstract class TestUserApiRecreate extends TestApiBase {
         assertEquals(subscription.getStartDate(), requestedDate);
         assertEquals(productName, subscription.getCurrentPlan().getProduct().getName());
 
-        assertTrue(testListener.isApiCompleted(5000));
+        assertTrue(testListener.isCompleted(5000));
 
         // CREATE (AGAIN) WITH NEW PRODUCT
         productName = "Pistol";
@@ -95,11 +97,11 @@ public abstract class TestUserApiRecreate extends TestApiBase {
         }
 
         // NOW CANCEL ADN THIS SHOULD WORK
-        testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
+        testListener.pushExpectedEvent(NextEvent.CANCEL);
         subscription.cancel(null, false, context);
 
-        testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
-        testListener.pushNextApiExpectedEvent(NextEvent.RE_CREATE);
+        testListener.pushExpectedEvent(NextEvent.PHASE);
+        testListener.pushExpectedEvent(NextEvent.RE_CREATE);
 
         // Avoid ordering issue for events at exact same date; this is actually a real good test, we
         // we test it at Beatrix level. At this level that would work for sql tests but not for in memory.
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiScenarios.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiScenarios.java
index 92ac1b0..de149be 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiScenarios.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiScenarios.java
@@ -27,11 +27,11 @@ import org.testng.annotations.Test;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Stage;
+import com.ning.billing.api.TestApiListener.NextEvent;
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.Duration;
 import com.ning.billing.catalog.api.PhaseType;
 import com.ning.billing.catalog.api.PlanPhase;
-import com.ning.billing.entitlement.api.ApiTestListener.NextEvent;
 import com.ning.billing.entitlement.api.TestApiBase;
 import com.ning.billing.entitlement.api.billing.EntitlementBillingApiException;
 import com.ning.billing.entitlement.glue.MockEngineModuleSql;
@@ -54,14 +54,14 @@ public class TestUserApiScenarios extends TestApiBase {
             PlanPhase trialPhase = subscription.getCurrentPhase();
             assertEquals(trialPhase.getPhaseType(), PhaseType.TRIAL);
 
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             subscription.changePlan("Pistol", BillingPeriod.ANNUAL, "gunclubDiscount", clock.getUTCNow(), context);
-            testListener.isApiCompleted(3000);
+            testListener.isCompleted(5000);
 
             // MOVE TO NEXT PHASE
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.setDeltaFromReality(trialPhase.getDuration(), DAY_IN_MS);
-            assertTrue(testListener.isApiCompleted(2000));
+            assertTrue(testListener.isCompleted(5000));
 
             // SET CTD
             Duration ctd = getDurationMonth(1);
@@ -71,22 +71,23 @@ public class TestUserApiScenarios extends TestApiBase {
             subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(subscription.getId());
 
             // CANCEL EOT
-            testListener.pushNextApiExpectedEvent(NextEvent.CANCEL);
+            testListener.pushExpectedEvent(NextEvent.CANCEL);
             subscription.cancel(clock.getUTCNow(), false, context);
-            assertFalse(testListener.isApiCompleted(2000));
+            assertFalse(testListener.isCompleted(5000));
             testListener.reset();
 
             // UNCANCEL
             subscription.uncancel(context);
 
             // CHANGE EOT
-            testListener.pushNextApiExpectedEvent(NextEvent.CHANGE);
+            testListener.pushExpectedEvent(NextEvent.CHANGE);
             subscription.changePlan("Pistol", BillingPeriod.MONTHLY, "gunclubDiscount", clock.getUTCNow(), context);
-            assertFalse(testListener.isApiCompleted(2000));
+            assertFalse(testListener.isCompleted(5000));
 
             clock.addDeltaFromReality(ctd);
-            assertTrue(testListener.isApiCompleted(3000));
+            assertTrue(testListener.isCompleted(5000));
 
+            assertListenerStatus();
 
         } catch (EntitlementUserApiException e) {
             Assert.fail(e.getMessage());
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserCustomFieldsSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserCustomFieldsSql.java
index 90966d8..c208521 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserCustomFieldsSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserCustomFieldsSql.java
@@ -28,9 +28,9 @@ import org.testng.annotations.Test;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Stage;
+import com.ning.billing.api.TestApiListener.NextEvent;
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.PriceListSet;
-import com.ning.billing.entitlement.api.ApiTestListener.NextEvent;
 import com.ning.billing.entitlement.api.TestApiBase;
 import com.ning.billing.entitlement.glue.MockEngineModuleSql;
 import com.ning.billing.util.callcontext.CallContext;
@@ -66,7 +66,7 @@ public class TestUserCustomFieldsSql extends TestApiBase {
 
     @Test(enabled=true, groups={"slow"})
     public void testOverwriteCustomFields() {
-        log.info("Starting testCreateWithRequestedDate");
+        log.info("Starting testOverwriteCustomFields");
         try {
 
             DateTime init = clock.getUTCNow();
@@ -76,8 +76,8 @@ public class TestUserCustomFieldsSql extends TestApiBase {
             BillingPeriod term = BillingPeriod.MONTHLY;
             String planSetName = PriceListSet.DEFAULT_PRICELIST_NAME;
 
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
-            testListener.pushNextApiExpectedEvent(NextEvent.CREATE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.CREATE);
             SubscriptionData subscription = (SubscriptionData) entitlementApi.createSubscription(bundle.getId(),
                     getProductSpecifier(productName, planSetName, term, null), requestedDate, context);
             assertNotNull(subscription);
@@ -122,7 +122,7 @@ public class TestUserCustomFieldsSql extends TestApiBase {
 
     @Test(enabled=true, groups={"slow"})
     public void testBasicCustomFields() {
-        log.info("Starting testCreateWithRequestedDate");
+        log.info("Starting testBasicCustomFields");
         try {
 
             DateTime init = clock.getUTCNow();
@@ -132,8 +132,8 @@ public class TestUserCustomFieldsSql extends TestApiBase {
             BillingPeriod term = BillingPeriod.MONTHLY;
             String planSetName = PriceListSet.DEFAULT_PRICELIST_NAME;
 
-            testListener.pushNextApiExpectedEvent(NextEvent.PHASE);
-            testListener.pushNextApiExpectedEvent(NextEvent.CREATE);
+            testListener.pushExpectedEvent(NextEvent.PHASE);
+            testListener.pushExpectedEvent(NextEvent.CREATE);
             SubscriptionData subscription = (SubscriptionData) entitlementApi.createSubscription(bundle.getId(),
                     getProductSpecifier(productName, planSetName, term, null), requestedDate, context);
             assertNotNull(subscription);
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
index 2b1b834..7ede0d0 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
@@ -19,11 +19,8 @@ package com.ning.billing.util.bus;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
@@ -40,9 +37,10 @@ import com.ning.billing.util.Hostname;
 import com.ning.billing.util.bus.dao.BusEventEntry;
 import com.ning.billing.util.bus.dao.PersistentBusSqlDao;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.queue.PersistentQueueBase;
 
 
-public class PersistentBus implements Bus  {
+public class PersistentBus extends PersistentQueueBase implements Bus {
 
     private final static int NB_BUS_THREADS = 3;
     private final static long TIMEOUT_MSEC = 15L * 1000L; // 15 sec
@@ -53,16 +51,13 @@ public class PersistentBus implements Bus  {
     private static final Logger log = LoggerFactory.getLogger(PersistentBus.class);
     
     private final PersistentBusSqlDao dao;
-    private final ExecutorService executor;
     
     private final ObjectMapper objectMapper;
     private final EventBusDelegate eventBusDelegate;
     private final Clock clock;
     private final String hostname;
     
-    protected boolean isProcessingEvents;
-    private int curActiveThreads;
-    
+
     
     private static final class EventBusDelegate extends EventBus {
         public EventBusDelegate(String busName) {
@@ -86,115 +81,34 @@ public class PersistentBus implements Bus  {
     
     @Inject
     public PersistentBus(final IDBI dbi, final Clock clock) {
+        super("Bus", Executors.newFixedThreadPool(NB_BUS_THREADS, new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(new ThreadGroup(DefaultBusService.EVENT_BUS_GROUP_NAME),
+                        r,
+                        DefaultBusService.EVENT_BUS_TH_NAME);
+            }
+        }), NB_BUS_THREADS, TIMEOUT_MSEC, SLEEP_TIME_MS);
         this.dao = dbi.onDemand(PersistentBusSqlDao.class);
         this.clock = clock;
         this.objectMapper = new ObjectMapper();
         this.objectMapper.disable(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS);
         this.eventBusDelegate = new EventBusDelegate("Killbill EventBus");
-        final ThreadGroup group = new ThreadGroup(DefaultBusService.EVENT_BUS_GROUP_NAME);
-        this.executor = Executors.newFixedThreadPool(NB_BUS_THREADS, new ThreadFactory() {
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(group, r, DefaultBusService.EVENT_BUS_TH_NAME);
-            }
-        });
         this.hostname = Hostname.get();
-        this.isProcessingEvents = false;
     }
-
     
     @Override
     public void start() {
-        
-        isProcessingEvents = true;
-        curActiveThreads = 0;
-        
-        final PersistentBus thePersistentBus = this;
-        final CountDownLatch doneInitialization = new CountDownLatch(NB_BUS_THREADS);
-
-        log.info("Starting Persistent BUS with {} threads, countDownLatch = {}", NB_BUS_THREADS, doneInitialization.getCount());
-        
-        for (int i = 0; i < NB_BUS_THREADS; i++) {
-            executor.execute(new Runnable() {
-                @Override
-                public void run() {
-
-                    log.info(String.format("PersistentBus thread %s [%d] started",
-                            Thread.currentThread().getName(),
-                            Thread.currentThread().getId()));
-                    
-                    synchronized(thePersistentBus) {
-                        curActiveThreads++;
-                    }
-
-                    doneInitialization.countDown();
-                    
-                    try {
-                        while (true) {
-                            
-                            synchronized(thePersistentBus) {
-                                if (!isProcessingEvents) {
-                                    thePersistentBus.notify();
-                                    break;
-                                }
-                            }
-
-                            try {
-                                doProcessEvents();
-                            } catch (Exception e) {
-                                log.error(String.format("PersistentBus thread  %s  [%d] got an exception..",
-                                        Thread.currentThread().getName(),
-                                        Thread.currentThread().getId()), e);
-                            }
-                            sleepALittle();
-                        }
-                    } catch (InterruptedException e) {
-                        log.info(Thread.currentThread().getName() + " got interrupted, exting...");
-                    } catch (Throwable e) {
-                        log.error(Thread.currentThread().getName() + " got an exception exiting...", e);
-                        // Just to make it really obvious in the log
-                        e.printStackTrace();
-                    } finally {
-                        
-                        log.info(String.format("PersistentBus thread %s [%d] exited",
-                                Thread.currentThread().getName(),
-                                Thread.currentThread().getId()));
-                    
-                        synchronized(thePersistentBus) {
-                            curActiveThreads--;
-                        }
-                    }
-                }
-                
-                private void sleepALittle() throws InterruptedException {
-                    Thread.sleep(SLEEP_TIME_MS);
-                }
-            });
-        }
-        try {
-            boolean success = doneInitialization.await(TIMEOUT_MSEC, TimeUnit.MILLISECONDS);
-            if (!success) {
-                log.warn("Failed to wait for all threads to be started, got {}/{}", doneInitialization.getCount(), NB_BUS_THREADS);
-            } else {
-                log.info("Done waiting for all threads to be started, got {}/{}", doneInitialization.getCount(), NB_BUS_THREADS);
-            }
-        } catch (InterruptedException e) {
-            log.warn("PersistentBus start sequence got interrupted...");
-        }
+        startQueue();
     }
-    
-    
-    private BusEvent deserializeBusEvent(final String className, final String json) {
-        try {
-            Class<?> claz = Class.forName(className);
-            return (BusEvent) objectMapper.readValue(json, claz);
-        } catch (Exception e) {
-            log.error(String.format("Failed to deserialize json object %s for class %s", json, className), e);
-            return null;
-        }
+
+    @Override
+    public void stop() {
+        stopQueue();
     }
-    
-    private int doProcessEvents() {
+
+    @Override
+    public int doProcessEvents() {
 
         List<BusEventEntry> events = getNextBusEvent();
         if (events.size() == 0) {
@@ -212,6 +126,16 @@ public class PersistentBus implements Bus  {
         return result;
     }
 
+    private BusEvent deserializeBusEvent(final String className, final String json) {
+        try {
+            Class<?> claz = Class.forName(className);
+            return (BusEvent) objectMapper.readValue(json, claz);
+        } catch (Exception e) {
+            log.error(String.format("Failed to deserialize json object %s for class %s", json, className), e);
+            return null;
+        }
+    }
+
     
     private List<BusEventEntry> getNextBusEvent() {
 
@@ -231,34 +155,6 @@ public class PersistentBus implements Bus  {
         return Collections.emptyList();
     }
 
-
-    @Override
-    public void stop() {
-        int remaining = 0;
-        try {
-            synchronized(this) {
-                isProcessingEvents = false;
-                long ini = System.currentTimeMillis();
-                long remainingWaitTimeMs = TIMEOUT_MSEC;
-                while (curActiveThreads > 0 && remainingWaitTimeMs > 0) {
-                    wait(1000);
-                    remainingWaitTimeMs = TIMEOUT_MSEC - (System.currentTimeMillis() - ini);
-                }
-                remaining = curActiveThreads;
-            }
-            
-        } catch (InterruptedException ignore) {
-            log.info("PersistentBus has been interrupted during stop sequence");
-        } finally {
-            if (remaining > 0) {
-                log.error(String.format("PersistentBus stopped with %d active remaing threads", remaining));
-            } else {
-                log.info("PersistentBus completed sucesfully shutdown sequence");
-            }
-            curActiveThreads = 0;
-        }
-    }
-
     @Override
     public void register(Object handlerInstance) throws EventBusException {
         eventBusDelegate.register(handlerInstance);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index c6958c2..c7b770e 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -41,10 +41,10 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     }
 
     @Override
-    protected int doProcessEvents(final int sequenceId) {
+    public int doProcessEvents() {
 
         logDebug("ENTER doProcessEvents");
-        List<Notification> notifications = getReadyNotifications(sequenceId);
+        List<Notification> notifications = getReadyNotifications(1);
         if (notifications.size() == 0) {
             logDebug("EXIT doProcessEvents");
             return 0;
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 14b68e0..d5c2425 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -22,8 +22,9 @@ import org.joda.time.DateTime;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.queue.QueueLifecycle;
 
-public interface NotificationQueue {
+public interface NotificationQueue extends QueueLifecycle {
 
    /**
     *
@@ -61,25 +62,9 @@ public interface NotificationQueue {
    public int processReadyNotification();
 
    /**
-    * Stops the queue. Blocks until queue is completely stopped.
-    *
-    * @see NotificationQueueHandler.completedQueueStop to be notified when the notification thread exited
-    */
-   public void stopQueue();
-
-   /**
-    * Starts the queue. Blocks until queue has completely started.
-    *
-    * @see NotificationQueueHandler.completedQueueStart to be notified when the notification thread started
-    */
-   public void startQueue();
-
-   /**
     *
     * @return the name of that queue
     */
    public String getFullQName();
 
-   
-
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index eb3f269..d7213e9 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -28,50 +28,35 @@ import org.slf4j.LoggerFactory;
 
 import com.ning.billing.config.NotificationConfig;
 import com.ning.billing.util.Hostname;
+import com.ning.billing.util.bus.DefaultBusService;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.queue.PersistentQueueBase;
 
 
-public abstract class NotificationQueueBase implements NotificationQueue {
+public abstract class NotificationQueueBase extends PersistentQueueBase implements NotificationQueue {
 
     protected final static Logger log = LoggerFactory.getLogger(NotificationQueueBase.class);
 
-    private static final long MAX_NOTIFICATION_THREAD_WAIT_MS = 10000; // 10 secs
-    private static final long NOTIFICATION_THREAD_WAIT_INCREMENT_MS = 1000; // 1 sec
-    private static final long NANO_TO_MS = (1000 * 1000);
-
     protected static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
-    protected final long STOP_WAIT_TIMEOUT_MS = 60000;
+    protected static final long STOP_WAIT_TIMEOUT_MS = 60000;
 
     protected final String svcName;
     protected final String queueName;
     protected final NotificationQueueHandler handler;
     protected final NotificationConfig config;
 
-    protected final Executor executor;
     protected final Clock clock;
     protected final String hostname;
 
-    protected static final AtomicInteger sequenceId = new AtomicInteger();
-
     protected AtomicLong nbProcessedEvents;
 
-    // Use this object's monitor for synchronization (no need for volatile)
-    protected boolean isProcessingEvents;
-
-    private boolean startedComplete = false;
-    private boolean stoppedComplete = false;
-
     // Package visibility on purpose
     NotificationQueueBase(final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
-        this.clock = clock;
-        this.svcName = svcName;
-        this.queueName = queueName;
-        this.handler = handler;
-        this.config = config;
-        this.hostname = Hostname.get();
+        // final String svcName, final Executor executor, final int nbThreads, final long waitTimeoutMs, final long sleepTimeMs) {
+        super(svcName, Executors.newFixedThreadPool(1, new ThreadFactory() {
+
 
-        this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
                 Thread th = new Thread(r);
@@ -84,103 +69,21 @@ public abstract class NotificationQueueBase implements NotificationQueue {
                 });
                 return th;
             }
-        });
-    }
+        }), 1, STOP_WAIT_TIMEOUT_MS, config.getNotificationSleepTimeMs());
 
-
-    @Override
-    public int processReadyNotification() {
-        return doProcessEvents(sequenceId.incrementAndGet());
+        this.clock = clock;
+        this.svcName = svcName;
+        this.queueName = queueName;
+        this.handler = handler;
+        this.config = config;
+        this.hostname = Hostname.get();
+        this.nbProcessedEvents = new AtomicLong();
     }
 
 
     @Override
-    public void stopQueue() {
-        if (config.isNotificationProcessingOff()) {
-            completedQueueStop();
-            return;
-        }
-
-        synchronized(this) {
-            isProcessingEvents = false;
-            try {
-                log.info("NotificationQueue requested to stop");
-                wait(STOP_WAIT_TIMEOUT_MS);
-                log.info("NotificationQueue requested should have exited");
-            } catch (InterruptedException e) {
-                log.warn("NotificationQueue got interrupted exception when stopping notifications", e);
-            }
-        }
-        waitForNotificationStopCompletion();
-    }
-
-    @Override
-    public void startQueue() {
-
-        this.isProcessingEvents = true;
-        this.nbProcessedEvents = new AtomicLong();
-
-
-        if (config.isNotificationProcessingOff()) {
-            log.warn(String.format("KILLBILL NOTIFICATION PROCESSING FOR SVC %s IS OFF !!!", getFullQName()));
-            completedQueueStart();
-            return;
-        }
-        final NotificationQueueBase notificationQueue = this;
-
-        executor.execute(new Runnable() {
-            @Override
-            public void run() {
-
-                log.info(String.format("NotificationQueue thread %s [%d] started",
-                        Thread.currentThread().getName(),
-                        Thread.currentThread().getId()));
-
-                // Thread is now started, notify the listener
-                completedQueueStart();
-
-                try {
-                    while (true) {
-
-                        synchronized (notificationQueue) {
-                            if (!isProcessingEvents) {
-                                log.info(String.format("NotificationQueue has been requested to stop, thread  %s  [%d] stopping...",
-                                        Thread.currentThread().getName(),
-                                        Thread.currentThread().getId()));
-                                notificationQueue.notify();
-                                break;
-                            }
-                        }
-
-                        // Callback may trigger exceptions in user code so catch anything here and live with it.
-                        try {
-                            doProcessEvents(sequenceId.getAndIncrement());
-                        } catch (Exception e) {
-                            log.error(String.format("NotificationQueue thread  %s  [%d] got an exception..",
-                                    Thread.currentThread().getName(),
-                                    Thread.currentThread().getId()), e);
-                        }
-                        sleepALittle();
-                    }
-                } catch (InterruptedException e) {
-                    log.warn(Thread.currentThread().getName() + " got interrupted ", e);
-                } catch (Throwable e) {
-                    log.error(Thread.currentThread().getName() + " got an exception exiting...", e);
-                    // Just to make it really obvious in the log
-                    e.printStackTrace();
-                } finally {
-                    completedQueueStop();
-                    log.info(String.format("NotificationQueue thread  %s  [%d] exited...",
-                            Thread.currentThread().getName(),
-                            Thread.currentThread().getId()));
-                }
-            }
-
-            private void sleepALittle() throws InterruptedException {
-                Thread.sleep(config.getNotificationSleepTimeMs());
-            }
-        });
-        waitForNotificationStartCompletion();
+    public int processReadyNotification() {
+        return doProcessEvents();
     }
 
     @Override
@@ -188,61 +91,12 @@ public abstract class NotificationQueueBase implements NotificationQueue {
         return getFullQName();
     }
 
-    private void completedQueueStop() {
-    	synchronized (this) {
-    		stoppedComplete = true;
-            this.notifyAll();
-        }
-    }
-
-    private void completedQueueStart() {
-        synchronized (this) {
-        	startedComplete = true;
-            this.notifyAll();
-        }
-    }
-
-    private void waitForNotificationStartCompletion() {
-        waitForNotificationEventCompletion(true);
-    }
-
-    private void waitForNotificationStopCompletion() {
-        waitForNotificationEventCompletion(false);
-    }
-
-    private void waitForNotificationEventCompletion(boolean startEvent) {
-
-        long ini = System.nanoTime();
-        synchronized(this) {
-            do {
-                if ((startEvent ? startedComplete : stoppedComplete)) {
-                    break;
-                }
-                try {
-                    this.wait(NOTIFICATION_THREAD_WAIT_INCREMENT_MS);
-                } catch (InterruptedException e ) {
-                    Thread.currentThread().interrupt();
-                    throw new NotificationError(e);
-                }
-            } while (!(startEvent ? startedComplete : stoppedComplete) &&
-                    (System.nanoTime() - ini) / NANO_TO_MS < MAX_NOTIFICATION_THREAD_WAIT_MS);
-
-            if (!(startEvent ? startedComplete : stoppedComplete)) {
-                log.error("Could not {} notification thread in {} msec !!!",
-                        (startEvent ? "start" : "stop"),
-                        MAX_NOTIFICATION_THREAD_WAIT_MS);
-                throw new NotificationError("Failed to start service!!");
-            }
-            log.info("Notification thread has been {} in {} ms",
-                    (startEvent ? "started" : "stopped"),
-                    (System.nanoTime() - ini) / NANO_TO_MS);
-        }
-    }
 
     @Override
     public String getFullQName() {
         return svcName + ":" +  queueName;
     }
 
-    protected abstract int doProcessEvents(int sequenceId);
+    @Override
+    public abstract int doProcessEvents();
 }
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
new file mode 100644
index 0000000..a6844ad
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -0,0 +1,159 @@
+/* 
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.ning.billing.util.queue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class PersistentQueueBase implements QueueLifecycle {
+
+    private static final Logger log = LoggerFactory.getLogger(PersistentQueueBase.class);
+
+    private final int nbThreads;
+    private final Executor executor;
+    private final String svcName;
+    private final long sleepTimeMs;
+    private final long waitTimeoutMs;
+
+    private boolean isProcessingEvents;
+    private int curActiveThreads;
+    
+    public PersistentQueueBase(final String svcName, final Executor executor, final int nbThreads, final long waitTimeoutMs, final long sleepTimeMs) {
+        this.executor = executor;
+        this.nbThreads = nbThreads;
+        this.svcName = svcName;
+        this.waitTimeoutMs = waitTimeoutMs;
+        this.sleepTimeMs = sleepTimeMs;
+        this.isProcessingEvents = false;
+        this.curActiveThreads = 0;
+    }
+    
+    @Override
+    public void startQueue() {
+        
+        isProcessingEvents = true;
+        curActiveThreads = 0;
+        
+        final PersistentQueueBase thePersistentQ = this;
+        final CountDownLatch doneInitialization = new CountDownLatch(nbThreads);
+
+        log.info(String.format("%s: Starting with %d threads",
+                svcName, nbThreads));
+        
+        for (int i = 0; i < nbThreads; i++) {
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+
+                    log.info(String.format("%s: Thread %s [%d] starting",
+                            svcName,
+                            Thread.currentThread().getName(),
+                            Thread.currentThread().getId()));
+                    
+                    synchronized(thePersistentQ) {
+                        curActiveThreads++;
+                    }
+
+                    doneInitialization.countDown();
+                    
+                    try {
+                        while (true) {
+                            
+                            synchronized(thePersistentQ) {
+                                if (!isProcessingEvents) {
+                                    thePersistentQ.notify();
+                                    break;
+                                }
+                            }
+
+                            try {
+                                doProcessEvents();
+                            } catch (Exception e) {
+                                log.warn(String.format("%s: Thread  %s  [%d] got an exception, catching and moving on...",
+                                        svcName,
+                                        Thread.currentThread().getName(),
+                                        Thread.currentThread().getId()), e);
+                            }
+                            sleepALittle();
+                        }
+                    } catch (InterruptedException e) {
+                        log.info(String.format("%s: Thread %s got interrupted, exting... ", svcName, Thread.currentThread().getName()));
+                    } catch (Throwable e) {
+                        log.error(String.format("%s: Thread %s got an exception, exting... ", svcName, Thread.currentThread().getName()), e);                        
+                    } finally {
+
+                        log.info(String.format("%s: Thread %s has exited", svcName, Thread.currentThread().getName()));                                                
+                        synchronized(thePersistentQ) {
+                            curActiveThreads--;
+                        }
+                    }
+                }
+                
+                private void sleepALittle() throws InterruptedException {
+                    Thread.sleep(sleepTimeMs);
+                }
+            });
+        }
+        try {
+            boolean success = doneInitialization.await(sleepTimeMs, TimeUnit.MILLISECONDS);
+            if (!success) {
+                
+                log.warn(String.format("%s: Failed to wait for all threads to be started, got %d/%d", svcName, (nbThreads - doneInitialization.getCount()), nbThreads));
+            } else {
+                log.info(String.format("%s: Done waiting for all threads to be started, got %d/%d", svcName, (nbThreads - doneInitialization.getCount()), nbThreads));                
+            }
+        } catch (InterruptedException e) {
+            log.warn(String.format("%s: Start sequence, got interrupted", svcName));
+        }
+    }
+    
+    
+    @Override
+    public void stopQueue() {
+        int remaining = 0;
+        try {
+            synchronized(this) {
+                isProcessingEvents = false;
+                long ini = System.currentTimeMillis();
+                long remainingWaitTimeMs = waitTimeoutMs;
+                while (curActiveThreads > 0 && remainingWaitTimeMs > 0) {
+                    wait(1000);
+                    remainingWaitTimeMs = waitTimeoutMs - (System.currentTimeMillis() - ini);
+                }
+                remaining = curActiveThreads;
+            }
+            
+        } catch (InterruptedException ignore) {
+            log.info(String.format("%s: Stop sequence has been interrupted, remaining active threads = %d", svcName, curActiveThreads));
+        } finally {
+            if (remaining > 0) {
+                log.error(String.format("%s: Stop sequence completed with %d active remaing threads", svcName, curActiveThreads));
+            } else {
+                log.info(String.format("%s: Stop sequence completed with %d active remaing threads", svcName, curActiveThreads));                
+            }
+            curActiveThreads = 0;
+        }
+    }
+    
+    
+    @Override
+    public abstract int doProcessEvents();
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 658752a..86ec0ea 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -75,7 +75,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
-    protected int doProcessEvents(int sequenceId) {
+    public int doProcessEvents() {
 
         int result = 0;