killbill-aplcache

Changes

Details

diff --git a/api/src/main/java/com/ning/billing/ErrorCode.java b/api/src/main/java/com/ning/billing/ErrorCode.java
index c3b55a1..68eadbe 100644
--- a/api/src/main/java/com/ning/billing/ErrorCode.java
+++ b/api/src/main/java/com/ning/billing/ErrorCode.java
@@ -22,6 +22,7 @@ public enum ErrorCode {
      */
     NOT_IMPLEMENTED(1, "Api not implemented yet"),
     DATA_TRUNCATION(2, "Data truncation error. (%s)"),
+    UNEXPECTED_ERROR(3, "%s"),
     /*
      *
      * Range 1000 : ENTITLEMENTS
diff --git a/api/src/main/java/com/ning/billing/util/tag/api/TagEvent.java b/api/src/main/java/com/ning/billing/util/tag/api/TagEvent.java
index 1cd3cfd..dc05d00 100644
--- a/api/src/main/java/com/ning/billing/util/tag/api/TagEvent.java
+++ b/api/src/main/java/com/ning/billing/util/tag/api/TagEvent.java
@@ -23,6 +23,7 @@ import com.ning.billing.util.dao.ObjectType;
 import com.ning.billing.util.tag.TagDefinition;
 
 public interface TagEvent extends BusEvent {
+
     UUID getTagId();
 
     UUID getObjectId();
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationWithAutoInvoiceOffTag.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationWithAutoInvoiceOffTag.java
index 426bac7..217cb1b 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationWithAutoInvoiceOffTag.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationWithAutoInvoiceOffTag.java
@@ -79,9 +79,9 @@ public class TestIntegrationWithAutoInvoiceOffTag extends TestIntegrationBase {
     @Test(groups = {"slow"}, enabled = true)
     public void testAutoInvoiceOffAccount() throws Exception {
         clock.setTime(new DateTime(2012, 5, 1, 0, 3, 42, 0));
-        addTag(account.getId(), ObjectType.ACCOUNT);
+        add_AUTO_INVOICING_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
 
-        // set next invoice to fail and create network 
+        // set next invoice to fail and create network
         busHandler.pushExpectedEvents(NextEvent.CREATE);
         final SubscriptionData baseSubscription = subscriptionDataFromSubscription(entitlementUserApi.createSubscription(bundle.getId(),
                                                                                                                    new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSetName, null), null, context));
@@ -111,7 +111,7 @@ public class TestIntegrationWithAutoInvoiceOffTag extends TestIntegrationBase {
     public void testAutoInvoiceOffSingleSubscription() throws Exception {
         clock.setTime(new DateTime(2012, 5, 1, 0, 3, 42, 0));
 
-        // set next invoice to fail and create network 
+        // set next invoice to fail and create network
         busHandler.pushExpectedEvents(NextEvent.CREATE, NextEvent.INVOICE);
         final SubscriptionData baseSubscription = subscriptionDataFromSubscription(entitlementUserApi.createSubscription(bundle.getId(),
                                                                                                                    new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSetName, null), null, context));
@@ -122,7 +122,7 @@ public class TestIntegrationWithAutoInvoiceOffTag extends TestIntegrationBase {
         assertEquals(invoices.size(), 1); // first invoice is generated immediately after creation can't reliably stop it
 
 
-        addTag(baseSubscription.getBundleId(), ObjectType.BUNDLE);
+        add_AUTO_INVOICING_OFF_Tag(baseSubscription.getBundleId(), ObjectType.BUNDLE);
 
         busHandler.pushExpectedEvents(NextEvent.PHASE);
         clock.addDays(40); // DAY 40 out of trial
@@ -138,7 +138,7 @@ public class TestIntegrationWithAutoInvoiceOffTag extends TestIntegrationBase {
     public void testAutoInvoiceOffMultipleSubscriptions() throws Exception {
         clock.setTime(new DateTime(2012, 5, 1, 0, 3, 42, 0));
 
-        // set next invoice to fail and create network 
+        // set next invoice to fail and create network
         busHandler.pushExpectedEvents(NextEvent.CREATE, NextEvent.INVOICE);
         final SubscriptionData baseSubscription = subscriptionDataFromSubscription(entitlementUserApi.createSubscription(bundle.getId(),
                                                                                                                    new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSetName, null), null, context));
@@ -156,7 +156,7 @@ public class TestIntegrationWithAutoInvoiceOffTag extends TestIntegrationBase {
         Collection<Invoice> invoices = invoiceApi.getInvoicesByAccount(account.getId());
         assertEquals(invoices.size(), 2); // first invoice is generated immediately after creation can't reliably stop it
 
-        addTag(baseSubscription.getBundleId(), ObjectType.BUNDLE);
+        add_AUTO_INVOICING_OFF_Tag(baseSubscription.getBundleId(), ObjectType.BUNDLE);
 
         busHandler.pushExpectedEvents(NextEvent.PHASE, NextEvent.PHASE, NextEvent.INVOICE);
         clock.addDays(40); // DAY 40 out of trial
@@ -167,7 +167,7 @@ public class TestIntegrationWithAutoInvoiceOffTag extends TestIntegrationBase {
     }
 
 
-    private void addTag(final UUID id, final ObjectType type) throws TagDefinitionApiException, TagApiException {
+    private void add_AUTO_INVOICING_OFF_Tag(final UUID id, final ObjectType type) throws TagDefinitionApiException, TagApiException {
         final TagDefinition def = tagApi.getTagDefinition(ControlTagType.AUTO_INVOICING_OFF.name());
         tagApi.addTag(id, type, def, context);
         final Map<String, Tag> tags = tagApi.getTags(id, type);
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationWithAutoPayOff.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationWithAutoPayOff.java
new file mode 100644
index 0000000..b988c7a
--- /dev/null
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationWithAutoPayOff.java
@@ -0,0 +1,294 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.ning.billing.beatrix.integration;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+
+import junit.framework.Assert;
+
+import org.joda.time.DateTime;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import com.google.inject.Inject;
+import com.ning.billing.account.api.Account;
+import com.ning.billing.api.TestApiListener.NextEvent;
+import com.ning.billing.catalog.api.BillingPeriod;
+import com.ning.billing.catalog.api.PlanPhaseSpecifier;
+import com.ning.billing.catalog.api.PriceListSet;
+import com.ning.billing.catalog.api.ProductCategory;
+import com.ning.billing.config.PaymentConfig;
+import com.ning.billing.entitlement.api.user.SubscriptionBundle;
+import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.invoice.api.Invoice;
+import com.ning.billing.invoice.api.InvoiceUserApi;
+import com.ning.billing.util.api.TagApiException;
+import com.ning.billing.util.api.TagDefinitionApiException;
+import com.ning.billing.util.api.TagUserApi;
+import com.ning.billing.util.dao.ObjectType;
+import com.ning.billing.util.tag.ControlTagType;
+import com.ning.billing.util.tag.Tag;
+import com.ning.billing.util.tag.TagDefinition;
+
+@Guice(modules = {BeatrixModule.class})
+public class TestIntegrationWithAutoPayOff extends TestIntegrationBase {
+
+
+    @Inject
+    private InvoiceUserApi invoiceApi;
+
+    @Inject
+    private TagUserApi tagApi;
+
+    @Inject
+    private PaymentConfig paymentConfig;
+
+
+    private Account account;
+    private SubscriptionBundle bundle;
+    private String productName;
+    private BillingPeriod term;
+    private String planSetName;
+
+    @BeforeMethod(groups = {"slow"})
+    public void setupOverdue() throws Exception {
+
+        account = createAccountWithPaymentMethod(getAccountData(25));
+        assertNotNull(account);
+
+        bundle = entitlementUserApi.createBundleForAccount(account.getId(), "whatever", context);
+
+        productName = "Shotgun";
+        term = BillingPeriod.MONTHLY;
+        planSetName = PriceListSet.DEFAULT_PRICELIST_NAME;
+    }
+
+    @Test(groups = {"slow"}, enabled = true)
+    public void testAutoPayOff() throws Exception {
+        clock.setTime(new DateTime(2012, 5, 1, 0, 3, 42, 0));
+        add_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE);
+        busHandler.pushExpectedEvents(NextEvent.CREATE);
+        final SubscriptionData baseSubscription = subscriptionDataFromSubscription(entitlementUserApi.createSubscription(bundle.getId(),
+                                                                                                                   new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSetName, null), null, context));
+        assertNotNull(baseSubscription);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        Collection<Invoice> invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 1);
+
+        busHandler.pushExpectedEvents(NextEvent.PHASE);
+        busHandler.pushExpectedEvents(NextEvent.INVOICE);
+        clock.addDays(40); // After trial
+
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 2);
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertEquals(cur.getBalance(), cur.getChargedAmount());
+        }
+
+        busHandler.pushExpectedEvents(NextEvent.PAYMENT);
+        remove_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 2);
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertTrue(cur.getBalance().compareTo(BigDecimal.ZERO) == 0);
+            assertTrue(cur.getPaidAmount().compareTo(cur.getChargedAmount()) == 0);
+        }
+        assertListenerStatus();
+    }
+
+
+    @Test(groups = {"slow"}, enabled = true)
+    public void testAutoPayOffWithPaymentFailure() throws Exception {
+        clock.setTime(new DateTime(2012, 5, 1, 0, 3, 42, 0));
+        add_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE);
+        busHandler.pushExpectedEvents(NextEvent.CREATE);
+        final SubscriptionData baseSubscription = subscriptionDataFromSubscription(entitlementUserApi.createSubscription(bundle.getId(),
+                                                                                                                   new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSetName, null), null, context));
+        assertNotNull(baseSubscription);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        Collection<Invoice> invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 1);
+
+        busHandler.pushExpectedEvents(NextEvent.PHASE);
+        busHandler.pushExpectedEvents(NextEvent.INVOICE);
+        clock.addDays(40); // After trial
+
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 2);
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertEquals(cur.getBalance(), cur.getChargedAmount());
+        }
+
+        paymentPlugin.makeNextPaymentFailWithError();
+        remove_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+        busHandler.pushExpectedEvents(NextEvent.PAYMENT_ERROR);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 2);
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertEquals(cur.getBalance(), cur.getChargedAmount());
+        }
+        assertListenerStatus();
+
+        int nbDaysBeforeRetry = paymentConfig.getPaymentRetryDays().get(0);
+
+        // MOVE TIME FOR RETRY TO HAPPEN
+        busHandler.pushExpectedEvents(NextEvent.PAYMENT);
+        clock.addDays(nbDaysBeforeRetry + 1);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertTrue(cur.getBalance().compareTo(BigDecimal.ZERO) == 0);
+            assertTrue(cur.getPaidAmount().compareTo(cur.getChargedAmount()) == 0);
+        }
+        assertListenerStatus();
+
+    }
+
+
+    @Test(groups = {"slow"}, enabled = true)
+    public void testAutoPayOffWithPaymentFailureOn_AUTO_PAY_OFF() throws Exception {
+        clock.setTime(new DateTime(2012, 5, 1, 0, 3, 42, 0));
+        add_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE);
+        busHandler.pushExpectedEvents(NextEvent.CREATE);
+        final SubscriptionData baseSubscription = subscriptionDataFromSubscription(entitlementUserApi.createSubscription(bundle.getId(),
+                                                                                                                   new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSetName, null), null, context));
+        assertNotNull(baseSubscription);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        Collection<Invoice> invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 1);
+
+        busHandler.pushExpectedEvents(NextEvent.PHASE);
+        busHandler.pushExpectedEvents(NextEvent.INVOICE);
+        clock.addDays(40); // After trial
+
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 2);
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertEquals(cur.getBalance(), cur.getChargedAmount());
+        }
+
+        paymentPlugin.makeNextPaymentFailWithError();
+        remove_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+        busHandler.pushExpectedEvents(NextEvent.PAYMENT_ERROR);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 2);
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertEquals(cur.getBalance(), cur.getChargedAmount());
+        }
+        assertListenerStatus();
+
+        int nbDaysBeforeRetry = paymentConfig.getPaymentRetryDays().get(0);
+
+        // AUTO_PAY_OFF to ON
+        add_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+
+        // MOVE TIME FOR RETRY TO HAPPEN
+        clock.addDays(nbDaysBeforeRetry + 1);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        assertEquals(invoices.size(), 2);
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertEquals(cur.getBalance(), cur.getChargedAmount());
+        }
+        assertListenerStatus();
+
+        // SWICTH BACK AUTO_PAY_OFF OFF
+        busHandler.pushExpectedEvents(NextEvent.PAYMENT);
+        remove_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertTrue(cur.getBalance().compareTo(BigDecimal.ZERO) == 0);
+            assertTrue(cur.getPaidAmount().compareTo(cur.getChargedAmount()) == 0);
+        }
+        assertListenerStatus();
+
+    }
+
+
+    private void add_AUTO_PAY_OFF_Tag(final UUID id, final ObjectType type) throws TagDefinitionApiException, TagApiException {
+        final TagDefinition def = tagApi.getTagDefinition(ControlTagType.AUTO_PAY_OFF.name());
+        tagApi.addTag(id, type, def, context);
+        final Map<String, Tag> tags = tagApi.getTags(id, type);
+        assertNotNull(tags.get(ControlTagType.AUTO_PAY_OFF.name()));
+    }
+
+    private void remove_AUTO_PAY_OFF_Tag(final UUID id, final ObjectType type) throws TagDefinitionApiException, TagApiException {
+        final TagDefinition def = tagApi.getTagDefinition(ControlTagType.AUTO_PAY_OFF.name());
+        tagApi.removeTag(id, type, def, context);
+    }
+}
+
diff --git a/payment/src/main/java/com/ning/billing/payment/bus/InvoiceHandler.java b/payment/src/main/java/com/ning/billing/payment/bus/InvoiceHandler.java
index e94b175..527a776 100644
--- a/payment/src/main/java/com/ning/billing/payment/bus/InvoiceHandler.java
+++ b/payment/src/main/java/com/ning/billing/payment/bus/InvoiceHandler.java
@@ -16,8 +16,6 @@
 
 package com.ning.billing.payment.bus;
 
-import java.util.Map;
-import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,24 +35,15 @@ import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.DefaultCallContext;
 import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.dao.ObjectType;
-import com.ning.billing.util.tag.ControlTagType;
-import com.ning.billing.util.tag.Tag;
+
 
 public class InvoiceHandler {
 
     public static final String PAYMENT_PROVIDER_KEY = "paymentProvider";
 
-    /*
-    private final static int NB_PAYMENT_THREADS = 3; // STEPH
-    private final static String PAYMENT_GROUP_NAME = "payment-grp";
-    private final static String PAYMENT_TH_NAME = "payment-th";
-*/
-
     private final PaymentProcessor paymentProcessor;
     private final AccountUserApi accountUserApi;
     private final Clock clock;
-    private final TagUserApi tagUserApi;
 
 
     private static final Logger log = LoggerFactory.getLogger(InvoiceHandler.class);
@@ -66,7 +55,6 @@ public class InvoiceHandler {
                           final TagUserApi tagUserApi) {
         this.clock = clock;
         this.accountUserApi = accountUserApi;
-        this.tagUserApi = tagUserApi;
         this.paymentProcessor = paymentProcessor;
     }
 
@@ -79,11 +67,9 @@ public class InvoiceHandler {
 
         Account account = null;
         try {
-            account = accountUserApi.getAccountById(event.getAccountId());
-            if (isAccountAutoPayOff(account.getId())) {
-                return;
-            }
+
             final CallContext context = new DefaultCallContext("PaymentRequestProcessor", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken(), clock);
+            account = accountUserApi.getAccountById(event.getAccountId());
             paymentProcessor.createPayment(account, event.getInvoiceId(), null, context, false);
         } catch (AccountApiException e) {
             log.error("Failed to process invoice payment", e);
@@ -93,16 +79,6 @@ public class InvoiceHandler {
             }
         }
     }
-
-    private boolean isAccountAutoPayOff(final UUID accountId) {
-        final Map<String, Tag> accountTags = tagUserApi.getTags(accountId, ObjectType.ACCOUNT);
-        for (final Tag cur : accountTags.values()) {
-            if (cur.getTagDefinitionName().equals(ControlTagType.AUTO_PAY_OFF.toString())) {
-                return true;
-            }
-        }
-        return false;
-    }
 }
 
 
diff --git a/payment/src/main/java/com/ning/billing/payment/bus/TagHandler.java b/payment/src/main/java/com/ning/billing/payment/bus/TagHandler.java
index da66d06..53527ab 100644
--- a/payment/src/main/java/com/ning/billing/payment/bus/TagHandler.java
+++ b/payment/src/main/java/com/ning/billing/payment/bus/TagHandler.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Copyright 2010-2011 Ning, Inc.
  *
  * Ning licenses this file to you under the Apache License, version 2.0
@@ -15,6 +15,63 @@
  */
 package com.ning.billing.payment.bus;
 
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.ning.billing.account.api.Account;
+import com.ning.billing.account.api.AccountApiException;
+import com.ning.billing.account.api.AccountUserApi;
+import com.ning.billing.payment.api.PaymentApiException;
+import com.ning.billing.payment.core.PaymentProcessor;
+import com.ning.billing.util.callcontext.CallContext;
+import com.ning.billing.util.callcontext.CallOrigin;
+import com.ning.billing.util.callcontext.DefaultCallContext;
+import com.ning.billing.util.callcontext.UserType;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.dao.ObjectType;
+import com.ning.billing.util.tag.ControlTagType;
+import com.ning.billing.util.tag.api.ControlTagDeletionEvent;
+
 public class TagHandler {
 
+    private static final Logger log = LoggerFactory.getLogger(TagHandler.class);
+
+    private final Clock clock;
+    private final AccountUserApi accountUserApi;
+    private final PaymentProcessor paymentProcessor;
+
+    @Inject
+    public TagHandler(final Clock clock,
+            final AccountUserApi accountUserApi,
+            final PaymentProcessor paymentProcessor) {
+        this.clock = clock;
+        this.accountUserApi = accountUserApi;
+        this.paymentProcessor = paymentProcessor;
+    }
+
+    @Subscribe
+    public void process_AUTO_PAY_OFF_removal(final ControlTagDeletionEvent event) {
+        if (event.getTagDefinition().getName().equals(ControlTagType.AUTO_PAY_OFF.toString()) && event.getObjectType() ==  ObjectType.ACCOUNT) {
+            final UUID accountId = event.getObjectId();
+            processUnpaid_AUTO_PAY_OFF_payments(accountId, event.getUserToken());
+        }
+    }
+
+    private void processUnpaid_AUTO_PAY_OFF_payments(final UUID accountId, final UUID userToken) {
+        try {
+            final CallContext context = new DefaultCallContext("PaymentRequestProcessor", CallOrigin.INTERNAL, UserType.SYSTEM, userToken, clock);
+            final Account account = accountUserApi.getAccountById(accountId);
+
+            paymentProcessor.process_AUTO_PAY_OFF_removal(account, context);
+
+        } catch (AccountApiException e) {
+            log.warn(String.format("Failed to process process  removal AUTO_PAY_OFF for account %s", accountId), e);
+        } catch (PaymentApiException e) {
+            log.warn(String.format("Failed to process process  removal AUTO_PAY_OFF for account %s", accountId), e);
+        }
+    }
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
index f22bb8e..07c76c9 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
@@ -18,9 +18,11 @@ package com.ning.billing.payment.core;
 import javax.inject.Inject;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
@@ -46,13 +48,16 @@ import com.ning.billing.payment.api.PaymentStatus;
 import com.ning.billing.payment.dao.PaymentAttemptModelDao;
 import com.ning.billing.payment.dao.PaymentDao;
 import com.ning.billing.payment.dao.PaymentModelDao;
+import com.ning.billing.payment.dao.PaymentSqlDao;
 import com.ning.billing.payment.dispatcher.PluginDispatcher;
 import com.ning.billing.payment.plugin.api.PaymentInfoPlugin;
 import com.ning.billing.payment.plugin.api.PaymentPluginApi;
 import com.ning.billing.payment.plugin.api.PaymentPluginApiException;
 import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
+import com.ning.billing.payment.retry.AutoPayRetryService.AutoPayRetryServiceScheduler;
 import com.ning.billing.payment.retry.FailedPaymentRetryService.FailedPaymentRetryServiceScheduler;
 import com.ning.billing.payment.retry.PluginFailureRetryService.PluginFailureRetryServiceScheduler;
+import com.ning.billing.util.api.TagUserApi;
 import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.bus.BusEvent;
 import com.ning.billing.util.callcontext.CallContext;
@@ -60,15 +65,21 @@ import com.ning.billing.util.callcontext.CallContextFactory;
 import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.dao.ObjectType;
 import com.ning.billing.util.globallocker.GlobalLocker;
+import com.ning.billing.util.tag.ControlTagType;
+import com.ning.billing.util.tag.Tag;
 
 import static com.ning.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
 
 public class PaymentProcessor extends ProcessorBase {
 
     private final InvoicePaymentApi invoicePaymentApi;
+    private final TagUserApi tagUserApi;
     private final FailedPaymentRetryServiceScheduler failedPaymentRetryService;
     private final PluginFailureRetryServiceScheduler pluginFailureRetryService;
+    private final AutoPayRetryServiceScheduler autoPayoffRetryService;
+
     private final CallContextFactory factory;
     private final Clock clock;
 
@@ -81,8 +92,10 @@ public class PaymentProcessor extends ProcessorBase {
     public PaymentProcessor(final PaymentProviderPluginRegistry pluginRegistry,
             final AccountUserApi accountUserApi,
             final InvoicePaymentApi invoicePaymentApi,
+            final TagUserApi tagUserApi,
             final FailedPaymentRetryServiceScheduler failedPaymentRetryService,
             final PluginFailureRetryServiceScheduler pluginFailureRetryService,
+            final AutoPayRetryServiceScheduler autoPayoffRetryService,
             final PaymentDao paymentDao,
             final Bus eventBus,
             final Clock clock,
@@ -91,8 +104,10 @@ public class PaymentProcessor extends ProcessorBase {
             final CallContextFactory factory) {
         super(pluginRegistry, accountUserApi, eventBus, paymentDao, locker, executor);
         this.invoicePaymentApi = invoicePaymentApi;
+        this.tagUserApi = tagUserApi;
         this.failedPaymentRetryService = failedPaymentRetryService;
         this.pluginFailureRetryService = pluginFailureRetryService;
+        this.autoPayoffRetryService = autoPayoffRetryService;
         this.clock = clock;
         this.factory = factory;
         this.paymentPluginDispatcher = new PluginDispatcher<Payment>(executor);
@@ -130,6 +145,53 @@ public class PaymentProcessor extends ProcessorBase {
         return result;
     }
 
+    public void process_AUTO_PAY_OFF_removal(final Account account, final CallContext context) throws PaymentApiException  {
+
+        try {
+            voidPluginDispatcher.dispatchWithAccountLock(new CallableWithAccountLock<Void>(locker,
+                    account.getExternalKey(),
+                    new WithAccountLockCallback<Void>() {
+
+                @Override
+                public Void doOperation() throws PaymentApiException {
+
+                    final List<PaymentModelDao> payments = paymentDao.getPaymentsForAccount(account.getId());
+                    final Collection<PaymentModelDao> paymentsToBeCompleted = Collections2.filter(payments, new Predicate<PaymentModelDao>() {
+                        @Override
+                        public boolean apply(final PaymentModelDao in) {
+                            // Payments left in AUTO_PAY_OFF or for which we did not retry enough
+                             return (in.getPaymentStatus() == PaymentStatus.AUTO_PAY_OFF ||
+                                     in.getPaymentStatus() == PaymentStatus.PAYMENT_FAILURE ||
+                                     in.getPaymentStatus() == PaymentStatus.PLUGIN_FAILURE);
+                        }
+                    });
+                    // Insert one retry event for each payment left in AUTO_PAY_OFF
+                    for (PaymentModelDao cur : paymentsToBeCompleted) {
+                        switch(cur.getPaymentStatus()) {
+                        case AUTO_PAY_OFF:
+                            autoPayoffRetryService.scheduleRetry(cur.getId(), clock.getUTCNow());
+                            break;
+                        case PAYMENT_FAILURE:
+                            scheduleRetryOnPaymentFailure(cur.getId());
+                            break;
+                        case PLUGIN_FAILURE:
+                            scheduleRetryOnPluginFailure(cur.getId());
+                            break;
+                        default:
+                            // Impossible...
+                            throw new RuntimeException("Unexpected case " + cur.getPaymentStatus());
+                        }
+
+                    }
+                    return null;
+                }
+            }));
+        } catch (TimeoutException e) {
+            throw new PaymentApiException(ErrorCode.UNEXPECTED_ERROR, "Unexpected timeout for payment creation (AUTO_PAY_OFF)");
+        }
+    }
+
+
     public Payment createPayment(final String accountKey, final UUID invoiceId, final BigDecimal inputAmount, final CallContext context, final boolean isInstantPayment)
     throws PaymentApiException {
         try {
@@ -152,6 +214,8 @@ public class PaymentProcessor extends ProcessorBase {
 
                 @Override
                 public Payment doOperation() throws PaymentApiException {
+
+
                     final Invoice invoice = invoicePaymentApi.getInvoice(invoiceId);
 
                     if (invoice.isMigrationInvoice()) {
@@ -160,7 +224,11 @@ public class PaymentProcessor extends ProcessorBase {
                     }
 
                     final BigDecimal requestedAmount = getAndValidatePaymentAmount(invoice, inputAmount, isInstantPayment);
-                    return processNewPaymentWithAccountLocked(plugin, account, invoice, requestedAmount, isInstantPayment, context);
+                    if (isAccountAutoPayOff(account.getId())) {
+                        return processNewPaymentForAutoPayOffWithAccountLocked(account, invoice, requestedAmount, context);
+                    } else {
+                        return processNewPaymentWithAccountLocked(plugin, account, invoice, requestedAmount, isInstantPayment, context);
+                    }
                 }
             }));
         } catch (TimeoutException e) {
@@ -190,10 +258,15 @@ public class PaymentProcessor extends ProcessorBase {
             throw new PaymentApiException(ErrorCode.PAYMENT_AMOUNT_DENIED,
                     invoice.getId(), inputAmount.floatValue(), invoice.getBalance().floatValue());
         }
-        return inputAmount != null ? inputAmount : invoice.getBalance();
+        BigDecimal result =  inputAmount != null ? inputAmount : invoice.getBalance();
+        return result.setScale(2, RoundingMode.HALF_EVEN);
     }
 
 
+    public void retryAutoPayOff(final UUID paymentId) {
+        retryFailedPaymentInternal(paymentId, PaymentStatus.AUTO_PAY_OFF);
+    }
+
     public void retryPluginFailure(final UUID paymentId) {
         retryFailedPaymentInternal(paymentId, PaymentStatus.PLUGIN_FAILURE, PaymentStatus.TIMEDOUT);
     }
@@ -202,6 +275,17 @@ public class PaymentProcessor extends ProcessorBase {
         retryFailedPaymentInternal(paymentId, PaymentStatus.PAYMENT_FAILURE);
     }
 
+    private boolean isAccountAutoPayOff(final UUID accountId) {
+        final Map<String, Tag> accountTags = tagUserApi.getTags(accountId, ObjectType.ACCOUNT);
+        for (final Tag cur : accountTags.values()) {
+            if (cur.getTagDefinitionName().equals(ControlTagType.AUTO_PAY_OFF.toString())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+
     private void retryFailedPaymentInternal(final UUID paymentId, final PaymentStatus... expectedPaymentStates) {
 
         try {
@@ -212,6 +296,11 @@ public class PaymentProcessor extends ProcessorBase {
                 return;
             }
 
+            if (isAccountAutoPayOff(payment.getAccountId())) {
+                log.info(String.format("Skip retry payment %s in state %s because AUTO_PAY_OFF", payment.getId(), payment.getPaymentStatus()));
+                return;
+            }
+
             final Account account = accountUserApi.getAccountById(payment.getAccountId());
             final PaymentPluginApi plugin = getPaymentProviderPlugin(account);
             final CallContext context = factory.createCallContext("PaymentRetry", CallOrigin.INTERNAL, UserType.SYSTEM);
@@ -260,22 +349,33 @@ public class PaymentProcessor extends ProcessorBase {
         }
     }
 
+    private Payment processNewPaymentForAutoPayOffWithAccountLocked(final Account account, final Invoice invoice, final BigDecimal requestedAmount, final CallContext context)
+    throws PaymentApiException {
+
+    final PaymentStatus paymentStatus =  PaymentStatus.AUTO_PAY_OFF;
+
+    final PaymentModelDao paymentInfo = new PaymentModelDao(account.getId(), invoice.getId(), requestedAmount, invoice.getCurrency(), invoice.getTargetDate(), paymentStatus);
+    final PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(account.getId(), invoice.getId(), paymentInfo.getId(), paymentStatus, clock.getUTCNow(), requestedAmount);
+
+    paymentDao.insertPaymentWithAttempt(paymentInfo, attempt, context);
+    return new DefaultPayment(paymentInfo, Collections.singletonList(attempt));
+}
+
+
     private Payment processNewPaymentWithAccountLocked(final PaymentPluginApi plugin, final Account account, final Invoice invoice,
             final BigDecimal requestedAmount, final boolean isInstantPayment, final CallContext context) throws PaymentApiException {
 
-        final boolean scheduleRetryForPayment = !isInstantPayment;
         final PaymentModelDao payment = new PaymentModelDao(account.getId(), invoice.getId(), requestedAmount.setScale(2, RoundingMode.HALF_EVEN), invoice.getCurrency(), invoice.getTargetDate());
         final PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(account.getId(), invoice.getId(), payment.getId(), clock.getUTCNow(), requestedAmount);
 
-        final PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, scheduleRetryForPayment, context);
+        final PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, context);
         return processPaymentWithAccountLocked(plugin, account, invoice, savedPayment, attempt, isInstantPayment, context);
     }
 
     private Payment processRetryPaymentWithAccountLocked(final PaymentPluginApi plugin, final Account account, final Invoice invoice, final PaymentModelDao payment,
             final BigDecimal requestedAmount, final CallContext context) throws PaymentApiException {
-        final boolean scheduleRetryForPayment = true;
         final PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(account.getId(), invoice.getId(), payment.getId(), clock.getUTCNow(), requestedAmount);
-        paymentDao.insertNewAttemptForPayment(payment.getId(), attempt, scheduleRetryForPayment, context);
+        paymentDao.insertNewAttemptForPayment(payment.getId(), attempt, context);
         return processPaymentWithAccountLocked(plugin, account, invoice, payment, attempt, false, context);
     }
 
@@ -316,13 +416,10 @@ public class PaymentProcessor extends ProcessorBase {
                 break;
 
             case ERROR:
+                allAttempts = paymentDao.getAttemptsForPayment(paymentInput.getId());
                 // Schedule if non instant payment and max attempt for retry not reached yet
                 if (!isInstantPayment) {
-                    allAttempts = paymentDao.getAttemptsForPayment(paymentInput.getId());
-                    final int retryAttempt = getNumberAttemptsInState(paymentInput.getId(), allAttempts,
-                            PaymentStatus.UNKNOWN, PaymentStatus.PAYMENT_FAILURE);
-                    final boolean isScheduledForRetry = failedPaymentRetryService.scheduleRetry(paymentInput.getId(), retryAttempt);
-                    paymentStatus = isScheduledForRetry ? PaymentStatus.PAYMENT_FAILURE : PaymentStatus.PAYMENT_FAILURE_ABORTED;
+                    paymentStatus = scheduleRetryOnPaymentFailure(paymentInput.getId());
                 } else {
                     paymentStatus = PaymentStatus.PAYMENT_FAILURE_ABORTED;
                 }
@@ -367,6 +464,14 @@ public class PaymentProcessor extends ProcessorBase {
         return isScheduledForRetry ? PaymentStatus.PLUGIN_FAILURE : PaymentStatus.PLUGIN_FAILURE_ABORTED;
     }
 
+    private PaymentStatus scheduleRetryOnPaymentFailure(final UUID paymentId) {
+        final List<PaymentAttemptModelDao> allAttempts = paymentDao.getAttemptsForPayment(paymentId);
+        final int retryAttempt = getNumberAttemptsInState(paymentId, allAttempts,
+                PaymentStatus.UNKNOWN, PaymentStatus.PAYMENT_FAILURE);
+        final boolean isScheduledForRetry = failedPaymentRetryService.scheduleRetry(paymentId, retryAttempt);
+        return isScheduledForRetry ? PaymentStatus.PAYMENT_FAILURE : PaymentStatus.PAYMENT_FAILURE_ABORTED;
+    }
+
     private int getNumberAttemptsInState(final UUID paymentId, final List<PaymentAttemptModelDao> allAttempts, final PaymentStatus... statuses) {
         if (allAttempts == null || allAttempts.size() == 0) {
             return 0;
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java b/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java
index 4ecbe1f..0a2aa27 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java
@@ -40,7 +40,6 @@ public class AuditedPaymentDao implements PaymentDao {
     private final PaymentAttemptSqlDao paymentAttemptSqlDao;
     private final PaymentMethodSqlDao paymentMethodSqlDao;
     private final RefundSqlDao refundSqlDao;
-    //private final TimedoutPaymentRetryServiceScheduler timedoutSchduler;
 
     @Inject
     public AuditedPaymentDao(final IDBI dbi, final PluginFailureRetryServiceScheduler timedoutSchduler) {
@@ -48,13 +47,12 @@ public class AuditedPaymentDao implements PaymentDao {
         this.paymentAttemptSqlDao = dbi.onDemand(PaymentAttemptSqlDao.class);
         this.paymentMethodSqlDao = dbi.onDemand(PaymentMethodSqlDao.class);
         this.refundSqlDao = dbi.onDemand(RefundSqlDao.class);
-        // this.timedoutSchduler = timedoutSchduler;
     }
 
 
     @Override
     public PaymentAttemptModelDao insertNewAttemptForPayment(final UUID paymentId,
-                                                             final PaymentAttemptModelDao attempt, final boolean scheduleTimeoutRetry, final CallContext context) {
+                                                             final PaymentAttemptModelDao attempt, final CallContext context) {
 
         return paymentAttemptSqlDao.inTransaction(new Transaction<PaymentAttemptModelDao, PaymentAttemptSqlDao>() {
             @Override
@@ -70,7 +68,7 @@ public class AuditedPaymentDao implements PaymentDao {
 
 
     @Override
-    public PaymentModelDao insertPaymentWithAttempt(final PaymentModelDao payment, final PaymentAttemptModelDao attempt, final boolean scheduleTimeoutRetry, final CallContext context) {
+    public PaymentModelDao insertPaymentWithAttempt(final PaymentModelDao payment, final PaymentAttemptModelDao attempt, final CallContext context) {
 
         return paymentSqlDao.inTransaction(new Transaction<PaymentModelDao, PaymentSqlDao>() {
 
@@ -85,28 +83,6 @@ public class AuditedPaymentDao implements PaymentDao {
         });
     }
 
-    /*
-    private int getNbTimedoutAttemptsFromTransaction(final UUID paymentId, final PaymentAttemptSqlDao transactional) {
-        List<PaymentAttemptModelDao> attempts = transactional.getPaymentAttempts(paymentId.toString());
-        return Collections2.filter(attempts, new Predicate<PaymentAttemptModelDao>() {
-            @Override
-            public boolean apply(PaymentAttemptModelDao input) {
-                return input.getPaymentStatus() == PaymentStatus.TIMEDOUT;
-            }
-        }).size();
-    }
-
-
-    private void scheduleTimeoutRetryFromTransaction(final UUID paymentId, final PaymentAttemptSqlDao transactional, final boolean scheduleTimeoutRetry) {
-
-        if (scheduleTimeoutRetry) {
-            int retryAttempt = getNbTimedoutAttemptsFromTransaction(paymentId, transactional) + 1;
-            timedoutSchduler.scheduleRetryFromTransaction(paymentId, retryAttempt, transactional);
-        }
-    }
-*/
-
-
     private PaymentModelDao insertPaymentFromTransaction(final PaymentModelDao payment, final CallContext context, final PaymentSqlDao transactional) {
         transactional.insertPayment(payment, context);
         final PaymentModelDao savedPayment = transactional.getPayment(payment.getId().toString());
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptModelDao.java b/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptModelDao.java
index 9c65c80..383bd0f 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptModelDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptModelDao.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Copyright 2010-2011 Ning, Inc.
  *
  * Ning licenses this file to you under the Apache License, version 2.0
@@ -46,6 +46,10 @@ public class PaymentAttemptModelDao extends EntityBase {
         this.paymentError = paymentError;
     }
 
+    public PaymentAttemptModelDao(final UUID accountId, final UUID invoiceId, final UUID paymentId, final PaymentStatus paymentStatus, final DateTime effectiveDate, final BigDecimal requestedAmount) {
+        this(UUID.randomUUID(), accountId, invoiceId, paymentId, paymentStatus, effectiveDate, requestedAmount, null);
+    }
+
     public PaymentAttemptModelDao(final UUID accountId, final UUID invoiceId, final UUID paymentId, final DateTime effectiveDate, final BigDecimal requestedAmount) {
         this(UUID.randomUUID(), accountId, invoiceId, paymentId, PaymentStatus.UNKNOWN, effectiveDate, requestedAmount, null);
     }
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java b/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java
index 711ae93..68a7fec 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java
@@ -25,9 +25,9 @@ import com.ning.billing.util.callcontext.CallContext;
 public interface PaymentDao {
 
     // STEPH do we need object returned?
-    public PaymentModelDao insertPaymentWithAttempt(final PaymentModelDao paymentInfo, final PaymentAttemptModelDao attempt, final boolean scheduleTimeoutRetry, final CallContext context);
+    public PaymentModelDao insertPaymentWithAttempt(final PaymentModelDao paymentInfo, final PaymentAttemptModelDao attempt, final CallContext context);
 
-    public PaymentAttemptModelDao insertNewAttemptForPayment(final UUID paymentId, final PaymentAttemptModelDao attempt, final boolean scheduleTimeoutRetry, final CallContext context);
+    public PaymentAttemptModelDao insertNewAttemptForPayment(final UUID paymentId, final PaymentAttemptModelDao attempt, final CallContext context);
 
     public void updateStatusForPaymentWithAttempt(final UUID paymentId, final PaymentStatus paymentStatus, final String paymentError, final String extPaymentRefId, final UUID attemptId, final CallContext context);
 
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/PaymentModelDao.java b/payment/src/main/java/com/ning/billing/payment/dao/PaymentModelDao.java
index bf19311..5cb97c9 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/PaymentModelDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/PaymentModelDao.java
@@ -55,7 +55,12 @@ public class PaymentModelDao extends EntityBase {
     }
 
     public PaymentModelDao(final UUID accountId, final UUID invoiceId,
-                           final BigDecimal amount, final Currency currency, final DateTime effectiveDate) {
+            final BigDecimal amount, final Currency currency, final DateTime effectiveDate, final PaymentStatus paymentStatus) {
+        this(UUID.randomUUID(), accountId, invoiceId, null, INVALID_PAYMENT_NUMBER, amount, currency, paymentStatus, effectiveDate, null);
+    }
+
+    public PaymentModelDao(final UUID accountId, final UUID invoiceId,
+            final BigDecimal amount, final Currency currency, final DateTime effectiveDate) {
         this(UUID.randomUUID(), accountId, invoiceId, null, INVALID_PAYMENT_NUMBER, amount, currency, PaymentStatus.UNKNOWN, effectiveDate, null);
     }
 
diff --git a/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java b/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
index 9bd547b..cc97e10 100644
--- a/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
+++ b/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
@@ -25,6 +25,8 @@ import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.api.PaymentService;
 import com.ning.billing.payment.bus.InvoiceHandler;
+import com.ning.billing.payment.bus.TagHandler;
+import com.ning.billing.payment.retry.AutoPayRetryService;
 import com.ning.billing.payment.retry.FailedPaymentRetryService;
 import com.ning.billing.payment.retry.PluginFailureRetryService;
 import com.ning.billing.util.bus.Bus;
@@ -35,23 +37,30 @@ public class DefaultPaymentService implements PaymentService {
 
     private static final Logger log = LoggerFactory.getLogger(DefaultPaymentService.class);
 
-    // STEPH for retry crappiness
     public static final String SERVICE_NAME = "payment-service";
 
-    private final InvoiceHandler requestProcessor;
+    private final InvoiceHandler invoiceHandler;
+    private final TagHandler tagHandler;
     private final Bus eventBus;
     private final PaymentApi api;
     private final FailedPaymentRetryService failedRetryService;
     private final PluginFailureRetryService timedoutRetryService;
+    private final AutoPayRetryService autoPayoffRetryService;
 
     @Inject
-    public DefaultPaymentService(final InvoiceHandler requestProcessor, final PaymentApi api, final Bus eventBus,
-                                 final FailedPaymentRetryService failedRetryService, final PluginFailureRetryService timedoutRetryService) {
-        this.requestProcessor = requestProcessor;
+    public DefaultPaymentService(final InvoiceHandler invoiceHandler,
+            final TagHandler tagHandler,
+            final PaymentApi api, final Bus eventBus,
+            final FailedPaymentRetryService failedRetryService,
+            final PluginFailureRetryService timedoutRetryService,
+            final AutoPayRetryService autoPayoffRetryService) {
+        this.invoiceHandler = invoiceHandler;
+        this.tagHandler = tagHandler;
         this.eventBus = eventBus;
         this.api = api;
         this.failedRetryService = failedRetryService;
         this.timedoutRetryService = timedoutRetryService;
+        this.autoPayoffRetryService = autoPayoffRetryService;
     }
 
     @Override
@@ -63,12 +72,14 @@ public class DefaultPaymentService implements PaymentService {
     public void initialize() throws NotificationQueueAlreadyExists {
         failedRetryService.initialize(SERVICE_NAME);
         timedoutRetryService.initialize(SERVICE_NAME);
+        autoPayoffRetryService.initialize(SERVICE_NAME);
     }
 
     @LifecycleHandlerType(LifecycleHandlerType.LifecycleLevel.REGISTER_EVENTS)
     public void registerForNotifications() {
         try {
-            eventBus.register(requestProcessor);
+            eventBus.register(invoiceHandler);
+            eventBus.register(tagHandler);
         } catch (Bus.EventBusException e) {
             log.error("Unable to register with the EventBus!", e);
         }
@@ -78,12 +89,14 @@ public class DefaultPaymentService implements PaymentService {
     public void start() {
         failedRetryService.start();
         timedoutRetryService.start();
+        autoPayoffRetryService.start();
     }
 
     @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
     public void stop() throws NoSuchNotificationQueue {
         failedRetryService.stop();
         timedoutRetryService.stop();
+        autoPayoffRetryService.stop();
     }
 
     @Override
diff --git a/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java b/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java
index 803b818..348a0ed 100644
--- a/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java
+++ b/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java
@@ -33,6 +33,7 @@ import com.ning.billing.payment.api.DefaultPaymentApi;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.api.PaymentService;
 import com.ning.billing.payment.bus.InvoiceHandler;
+import com.ning.billing.payment.bus.TagHandler;
 import com.ning.billing.payment.core.PaymentMethodProcessor;
 import com.ning.billing.payment.core.PaymentProcessor;
 import com.ning.billing.payment.core.RefundProcessor;
@@ -40,6 +41,8 @@ import com.ning.billing.payment.dao.AuditedPaymentDao;
 import com.ning.billing.payment.dao.PaymentDao;
 import com.ning.billing.payment.provider.DefaultPaymentProviderPluginRegistry;
 import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
+import com.ning.billing.payment.retry.AutoPayRetryService;
+import com.ning.billing.payment.retry.AutoPayRetryService.AutoPayRetryServiceScheduler;
 import com.ning.billing.payment.retry.FailedPaymentRetryService;
 import com.ning.billing.payment.retry.FailedPaymentRetryService.FailedPaymentRetryServiceScheduler;
 import com.ning.billing.payment.retry.PluginFailureRetryService;
@@ -76,8 +79,10 @@ public class PaymentModule extends AbstractModule {
     protected void installRetryEngines() {
         bind(FailedPaymentRetryService.class).asEagerSingleton();
         bind(PluginFailureRetryService.class).asEagerSingleton();
+        bind(AutoPayRetryService.class).asEagerSingleton();
         bind(FailedPaymentRetryServiceScheduler.class).asEagerSingleton();
         bind(PluginFailureRetryServiceScheduler.class).asEagerSingleton();
+        bind(AutoPayRetryServiceScheduler.class).asEagerSingleton();
     }
 
     protected void installProcessors() {
@@ -105,6 +110,7 @@ public class PaymentModule extends AbstractModule {
         bind(PaymentProviderPluginRegistry.class).to(DefaultPaymentProviderPluginRegistry.class).asEagerSingleton();
         bind(PaymentApi.class).to(DefaultPaymentApi.class).asEagerSingleton();
         bind(InvoiceHandler.class).asEagerSingleton();
+        bind(TagHandler.class).asEagerSingleton();
         bind(PaymentService.class).to(DefaultPaymentService.class).asEagerSingleton();
         installPaymentProviderPlugins(paymentConfig);
         installPaymentDao();
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
new file mode 100644
index 0000000..1ebb798
--- /dev/null
+++ b/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.ning.billing.payment.retry;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.config.PaymentConfig;
+import com.ning.billing.payment.core.PaymentProcessor;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
+
+public class AutoPayRetryService extends BaseRetryService implements RetryService {
+
+
+    private static final Logger log = LoggerFactory.getLogger(FailedPaymentRetryService.class);
+
+    public static final String QUEUE_NAME = "autopayoff";
+
+    private final PaymentProcessor paymentProcessor;
+
+    @Inject
+    public AutoPayRetryService(final NotificationQueueService notificationQueueService,
+            final Clock clock,
+            final PaymentConfig config,
+            final PaymentProcessor paymentProcessor) {
+        super(notificationQueueService, clock, config);
+        this.paymentProcessor = paymentProcessor;
+    }
+
+
+    @Override
+    public String getQueueName() {
+        return QUEUE_NAME;
+    }
+
+    @Override
+    public void retry(final UUID paymentId) {
+        paymentProcessor.retryAutoPayOff(paymentId);
+    }
+
+    public static class AutoPayRetryServiceScheduler extends RetryServiceScheduler {
+
+
+        @Inject
+        public AutoPayRetryServiceScheduler(final NotificationQueueService notificationQueueService) {
+            super(notificationQueueService);
+        }
+
+        @Override
+        public boolean scheduleRetry(final UUID paymentId, final DateTime timeOfRetry) {
+            return super.scheduleRetry(paymentId, timeOfRetry);
+        }
+
+        @Override
+        public String getQueueName() {
+            return QUEUE_NAME;
+        }
+    }
+}
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
index 2259ce6..9fd3231 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
@@ -77,7 +77,7 @@ public class FailedPaymentRetryService extends BaseRetryService implements Retry
             if (timeOfRetry == null) {
                 return false;
             }
-            return scheduleRetry(paymentId, timeOfRetry);
+            return super.scheduleRetry(paymentId, timeOfRetry);
         }
 
 
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
index 2fca8e0..6ea381c 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Copyright 2010-2011 Ning, Inc.
  *
  * Ning licenses this file to you under the Apache License, version 2.0
@@ -78,7 +78,7 @@ public class PluginFailureRetryService extends BaseRetryService implements Retry
             if (nextRetryDate == null) {
                 return false;
             }
-            return scheduleRetry(paymentId, nextRetryDate);
+            return super.scheduleRetry(paymentId, nextRetryDate);
         }
 
         public boolean scheduleRetryFromTransaction(final UUID paymentId, final int retryAttempt, final Transmogrifier transactionalDao) {
diff --git a/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
index 80b75f4..c1ff2b7 100644
--- a/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
@@ -34,7 +34,7 @@ public class MockPaymentDao implements PaymentDao {
 
     @Override
     public PaymentModelDao insertPaymentWithAttempt(final PaymentModelDao paymentInfo, final PaymentAttemptModelDao attempt,
-                                                    final boolean scheduleTimeoutRetry, final CallContext context) {
+                                                    final CallContext context) {
         synchronized (this) {
             payments.put(paymentInfo.getId(), paymentInfo);
             attempts.put(attempt.getId(), attempt);
@@ -44,7 +44,7 @@ public class MockPaymentDao implements PaymentDao {
 
     @Override
     public PaymentAttemptModelDao insertNewAttemptForPayment(final UUID paymentId,
-                                                             final PaymentAttemptModelDao attempt, final boolean scheduleTimeoutRetry, final CallContext context) {
+                                                             final PaymentAttemptModelDao attempt, final CallContext context) {
         synchronized (this) {
             attempts.put(attempt.getId(), attempt);
         }
diff --git a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
index cbe952a..6ac63c3 100644
--- a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
@@ -155,7 +155,7 @@ public class TestPaymentDao {
 
         final PaymentModelDao payment = new PaymentModelDao(accountId, invoiceId, amount, currency, effectiveDate);
         final PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), amount);
-        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, true, context);
+        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, context);
 
         final PaymentStatus paymentStatus = PaymentStatus.SUCCESS;
         final String paymentError = "No error";
@@ -198,7 +198,7 @@ public class TestPaymentDao {
         final PaymentModelDao payment = new PaymentModelDao(accountId, invoiceId, amount, currency, effectiveDate);
         final PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), amount);
 
-        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, true, context);
+        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, context);
         assertEquals(savedPayment.getId(), payment.getId());
         assertEquals(savedPayment.getAccountId(), accountId);
         assertEquals(savedPayment.getInvoiceId(), invoiceId);
@@ -249,11 +249,11 @@ public class TestPaymentDao {
 
         final PaymentModelDao payment = new PaymentModelDao(accountId, invoiceId, amount, currency, effectiveDate);
         final PaymentAttemptModelDao firstAttempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), amount);
-        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, firstAttempt, true, context);
+        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, firstAttempt, context);
 
         final BigDecimal newAmount = new BigDecimal(15.23).setScale(2, RoundingMode.HALF_EVEN);
         final PaymentAttemptModelDao secondAttempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), newAmount);
-        paymentDao.insertNewAttemptForPayment(payment.getId(), secondAttempt, true, context);
+        paymentDao.insertNewAttemptForPayment(payment.getId(), secondAttempt, context);
 
         final List<PaymentModelDao> payments = paymentDao.getPaymentsForInvoice(invoiceId);
         assertEquals(payments.size(), 1);