killbill-memoizeit

Merge branch 'integration' of github.com:ning/killbill

7/6/2012 7:23:08 PM

Changes

Details

diff --git a/api/src/main/java/com/ning/billing/ErrorCode.java b/api/src/main/java/com/ning/billing/ErrorCode.java
index c3b55a1..68eadbe 100644
--- a/api/src/main/java/com/ning/billing/ErrorCode.java
+++ b/api/src/main/java/com/ning/billing/ErrorCode.java
@@ -22,6 +22,7 @@ public enum ErrorCode {
      */
     NOT_IMPLEMENTED(1, "Api not implemented yet"),
     DATA_TRUNCATION(2, "Data truncation error. (%s)"),
+    UNEXPECTED_ERROR(3, "%s"),
     /*
      *
      * Range 1000 : ENTITLEMENTS
diff --git a/api/src/main/java/com/ning/billing/util/tag/api/TagEvent.java b/api/src/main/java/com/ning/billing/util/tag/api/TagEvent.java
index 1cd3cfd..dc05d00 100644
--- a/api/src/main/java/com/ning/billing/util/tag/api/TagEvent.java
+++ b/api/src/main/java/com/ning/billing/util/tag/api/TagEvent.java
@@ -23,6 +23,7 @@ import com.ning.billing.util.dao.ObjectType;
 import com.ning.billing.util.tag.TagDefinition;
 
 public interface TagEvent extends BusEvent {
+
     UUID getTagId();
 
     UUID getObjectId();
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationWithAutoInvoiceOffTag.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationWithAutoInvoiceOffTag.java
index 426bac7..217cb1b 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationWithAutoInvoiceOffTag.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationWithAutoInvoiceOffTag.java
@@ -79,9 +79,9 @@ public class TestIntegrationWithAutoInvoiceOffTag extends TestIntegrationBase {
     @Test(groups = {"slow"}, enabled = true)
     public void testAutoInvoiceOffAccount() throws Exception {
         clock.setTime(new DateTime(2012, 5, 1, 0, 3, 42, 0));
-        addTag(account.getId(), ObjectType.ACCOUNT);
+        add_AUTO_INVOICING_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
 
-        // set next invoice to fail and create network 
+        // set next invoice to fail and create network
         busHandler.pushExpectedEvents(NextEvent.CREATE);
         final SubscriptionData baseSubscription = subscriptionDataFromSubscription(entitlementUserApi.createSubscription(bundle.getId(),
                                                                                                                    new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSetName, null), null, context));
@@ -111,7 +111,7 @@ public class TestIntegrationWithAutoInvoiceOffTag extends TestIntegrationBase {
     public void testAutoInvoiceOffSingleSubscription() throws Exception {
         clock.setTime(new DateTime(2012, 5, 1, 0, 3, 42, 0));
 
-        // set next invoice to fail and create network 
+        // set next invoice to fail and create network
         busHandler.pushExpectedEvents(NextEvent.CREATE, NextEvent.INVOICE);
         final SubscriptionData baseSubscription = subscriptionDataFromSubscription(entitlementUserApi.createSubscription(bundle.getId(),
                                                                                                                    new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSetName, null), null, context));
@@ -122,7 +122,7 @@ public class TestIntegrationWithAutoInvoiceOffTag extends TestIntegrationBase {
         assertEquals(invoices.size(), 1); // first invoice is generated immediately after creation can't reliably stop it
 
 
-        addTag(baseSubscription.getBundleId(), ObjectType.BUNDLE);
+        add_AUTO_INVOICING_OFF_Tag(baseSubscription.getBundleId(), ObjectType.BUNDLE);
 
         busHandler.pushExpectedEvents(NextEvent.PHASE);
         clock.addDays(40); // DAY 40 out of trial
@@ -138,7 +138,7 @@ public class TestIntegrationWithAutoInvoiceOffTag extends TestIntegrationBase {
     public void testAutoInvoiceOffMultipleSubscriptions() throws Exception {
         clock.setTime(new DateTime(2012, 5, 1, 0, 3, 42, 0));
 
-        // set next invoice to fail and create network 
+        // set next invoice to fail and create network
         busHandler.pushExpectedEvents(NextEvent.CREATE, NextEvent.INVOICE);
         final SubscriptionData baseSubscription = subscriptionDataFromSubscription(entitlementUserApi.createSubscription(bundle.getId(),
                                                                                                                    new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSetName, null), null, context));
@@ -156,7 +156,7 @@ public class TestIntegrationWithAutoInvoiceOffTag extends TestIntegrationBase {
         Collection<Invoice> invoices = invoiceApi.getInvoicesByAccount(account.getId());
         assertEquals(invoices.size(), 2); // first invoice is generated immediately after creation can't reliably stop it
 
-        addTag(baseSubscription.getBundleId(), ObjectType.BUNDLE);
+        add_AUTO_INVOICING_OFF_Tag(baseSubscription.getBundleId(), ObjectType.BUNDLE);
 
         busHandler.pushExpectedEvents(NextEvent.PHASE, NextEvent.PHASE, NextEvent.INVOICE);
         clock.addDays(40); // DAY 40 out of trial
@@ -167,7 +167,7 @@ public class TestIntegrationWithAutoInvoiceOffTag extends TestIntegrationBase {
     }
 
 
-    private void addTag(final UUID id, final ObjectType type) throws TagDefinitionApiException, TagApiException {
+    private void add_AUTO_INVOICING_OFF_Tag(final UUID id, final ObjectType type) throws TagDefinitionApiException, TagApiException {
         final TagDefinition def = tagApi.getTagDefinition(ControlTagType.AUTO_INVOICING_OFF.name());
         tagApi.addTag(id, type, def, context);
         final Map<String, Tag> tags = tagApi.getTags(id, type);
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationWithAutoPayOff.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationWithAutoPayOff.java
new file mode 100644
index 0000000..b988c7a
--- /dev/null
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationWithAutoPayOff.java
@@ -0,0 +1,294 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.ning.billing.beatrix.integration;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+
+import junit.framework.Assert;
+
+import org.joda.time.DateTime;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import com.google.inject.Inject;
+import com.ning.billing.account.api.Account;
+import com.ning.billing.api.TestApiListener.NextEvent;
+import com.ning.billing.catalog.api.BillingPeriod;
+import com.ning.billing.catalog.api.PlanPhaseSpecifier;
+import com.ning.billing.catalog.api.PriceListSet;
+import com.ning.billing.catalog.api.ProductCategory;
+import com.ning.billing.config.PaymentConfig;
+import com.ning.billing.entitlement.api.user.SubscriptionBundle;
+import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.invoice.api.Invoice;
+import com.ning.billing.invoice.api.InvoiceUserApi;
+import com.ning.billing.util.api.TagApiException;
+import com.ning.billing.util.api.TagDefinitionApiException;
+import com.ning.billing.util.api.TagUserApi;
+import com.ning.billing.util.dao.ObjectType;
+import com.ning.billing.util.tag.ControlTagType;
+import com.ning.billing.util.tag.Tag;
+import com.ning.billing.util.tag.TagDefinition;
+
+@Guice(modules = {BeatrixModule.class})
+public class TestIntegrationWithAutoPayOff extends TestIntegrationBase {
+
+
+    @Inject
+    private InvoiceUserApi invoiceApi;
+
+    @Inject
+    private TagUserApi tagApi;
+
+    @Inject
+    private PaymentConfig paymentConfig;
+
+
+    private Account account;
+    private SubscriptionBundle bundle;
+    private String productName;
+    private BillingPeriod term;
+    private String planSetName;
+
+    @BeforeMethod(groups = {"slow"})
+    public void setupOverdue() throws Exception {
+
+        account = createAccountWithPaymentMethod(getAccountData(25));
+        assertNotNull(account);
+
+        bundle = entitlementUserApi.createBundleForAccount(account.getId(), "whatever", context);
+
+        productName = "Shotgun";
+        term = BillingPeriod.MONTHLY;
+        planSetName = PriceListSet.DEFAULT_PRICELIST_NAME;
+    }
+
+    @Test(groups = {"slow"}, enabled = true)
+    public void testAutoPayOff() throws Exception {
+        clock.setTime(new DateTime(2012, 5, 1, 0, 3, 42, 0));
+        add_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE);
+        busHandler.pushExpectedEvents(NextEvent.CREATE);
+        final SubscriptionData baseSubscription = subscriptionDataFromSubscription(entitlementUserApi.createSubscription(bundle.getId(),
+                                                                                                                   new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSetName, null), null, context));
+        assertNotNull(baseSubscription);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        Collection<Invoice> invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 1);
+
+        busHandler.pushExpectedEvents(NextEvent.PHASE);
+        busHandler.pushExpectedEvents(NextEvent.INVOICE);
+        clock.addDays(40); // After trial
+
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 2);
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertEquals(cur.getBalance(), cur.getChargedAmount());
+        }
+
+        busHandler.pushExpectedEvents(NextEvent.PAYMENT);
+        remove_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 2);
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertTrue(cur.getBalance().compareTo(BigDecimal.ZERO) == 0);
+            assertTrue(cur.getPaidAmount().compareTo(cur.getChargedAmount()) == 0);
+        }
+        assertListenerStatus();
+    }
+
+
+    @Test(groups = {"slow"}, enabled = true)
+    public void testAutoPayOffWithPaymentFailure() throws Exception {
+        clock.setTime(new DateTime(2012, 5, 1, 0, 3, 42, 0));
+        add_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE);
+        busHandler.pushExpectedEvents(NextEvent.CREATE);
+        final SubscriptionData baseSubscription = subscriptionDataFromSubscription(entitlementUserApi.createSubscription(bundle.getId(),
+                                                                                                                   new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSetName, null), null, context));
+        assertNotNull(baseSubscription);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        Collection<Invoice> invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 1);
+
+        busHandler.pushExpectedEvents(NextEvent.PHASE);
+        busHandler.pushExpectedEvents(NextEvent.INVOICE);
+        clock.addDays(40); // After trial
+
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 2);
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertEquals(cur.getBalance(), cur.getChargedAmount());
+        }
+
+        paymentPlugin.makeNextPaymentFailWithError();
+        remove_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+        busHandler.pushExpectedEvents(NextEvent.PAYMENT_ERROR);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 2);
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertEquals(cur.getBalance(), cur.getChargedAmount());
+        }
+        assertListenerStatus();
+
+        int nbDaysBeforeRetry = paymentConfig.getPaymentRetryDays().get(0);
+
+        // MOVE TIME FOR RETRY TO HAPPEN
+        busHandler.pushExpectedEvents(NextEvent.PAYMENT);
+        clock.addDays(nbDaysBeforeRetry + 1);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertTrue(cur.getBalance().compareTo(BigDecimal.ZERO) == 0);
+            assertTrue(cur.getPaidAmount().compareTo(cur.getChargedAmount()) == 0);
+        }
+        assertListenerStatus();
+
+    }
+
+
+    @Test(groups = {"slow"}, enabled = true)
+    public void testAutoPayOffWithPaymentFailureOn_AUTO_PAY_OFF() throws Exception {
+        clock.setTime(new DateTime(2012, 5, 1, 0, 3, 42, 0));
+        add_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE);
+        busHandler.pushExpectedEvents(NextEvent.CREATE);
+        final SubscriptionData baseSubscription = subscriptionDataFromSubscription(entitlementUserApi.createSubscription(bundle.getId(),
+                                                                                                                   new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSetName, null), null, context));
+        assertNotNull(baseSubscription);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        Collection<Invoice> invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 1);
+
+        busHandler.pushExpectedEvents(NextEvent.PHASE);
+        busHandler.pushExpectedEvents(NextEvent.INVOICE);
+        clock.addDays(40); // After trial
+
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 2);
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertEquals(cur.getBalance(), cur.getChargedAmount());
+        }
+
+        paymentPlugin.makeNextPaymentFailWithError();
+        remove_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+        busHandler.pushExpectedEvents(NextEvent.PAYMENT_ERROR);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        assertEquals(invoices.size(), 2);
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertEquals(cur.getBalance(), cur.getChargedAmount());
+        }
+        assertListenerStatus();
+
+        int nbDaysBeforeRetry = paymentConfig.getPaymentRetryDays().get(0);
+
+        // AUTO_PAY_OFF to ON
+        add_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+
+        // MOVE TIME FOR RETRY TO HAPPEN
+        clock.addDays(nbDaysBeforeRetry + 1);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+        assertEquals(invoices.size(), 2);
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertEquals(cur.getBalance(), cur.getChargedAmount());
+        }
+        assertListenerStatus();
+
+        // SWICTH BACK AUTO_PAY_OFF OFF
+        busHandler.pushExpectedEvents(NextEvent.PAYMENT);
+        remove_AUTO_PAY_OFF_Tag(account.getId(), ObjectType.ACCOUNT);
+        assertTrue(busHandler.isCompleted(DELAY));
+
+
+        invoices = invoiceApi.getInvoicesByAccount(account.getId());
+        for (Invoice cur : invoices) {
+            if (cur.getChargedAmount().compareTo(BigDecimal.ZERO) == 0) {
+                continue;
+            }
+            assertTrue(cur.getBalance().compareTo(BigDecimal.ZERO) == 0);
+            assertTrue(cur.getPaidAmount().compareTo(cur.getChargedAmount()) == 0);
+        }
+        assertListenerStatus();
+
+    }
+
+
+    private void add_AUTO_PAY_OFF_Tag(final UUID id, final ObjectType type) throws TagDefinitionApiException, TagApiException {
+        final TagDefinition def = tagApi.getTagDefinition(ControlTagType.AUTO_PAY_OFF.name());
+        tagApi.addTag(id, type, def, context);
+        final Map<String, Tag> tags = tagApi.getTags(id, type);
+        assertNotNull(tags.get(ControlTagType.AUTO_PAY_OFF.name()));
+    }
+
+    private void remove_AUTO_PAY_OFF_Tag(final UUID id, final ObjectType type) throws TagDefinitionApiException, TagApiException {
+        final TagDefinition def = tagApi.getTagDefinition(ControlTagType.AUTO_PAY_OFF.name());
+        tagApi.removeTag(id, type, def, context);
+    }
+}
+
diff --git a/payment/src/main/java/com/ning/billing/payment/bus/InvoiceHandler.java b/payment/src/main/java/com/ning/billing/payment/bus/InvoiceHandler.java
index e94b175..527a776 100644
--- a/payment/src/main/java/com/ning/billing/payment/bus/InvoiceHandler.java
+++ b/payment/src/main/java/com/ning/billing/payment/bus/InvoiceHandler.java
@@ -16,8 +16,6 @@
 
 package com.ning.billing.payment.bus;
 
-import java.util.Map;
-import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,24 +35,15 @@ import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.DefaultCallContext;
 import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.dao.ObjectType;
-import com.ning.billing.util.tag.ControlTagType;
-import com.ning.billing.util.tag.Tag;
+
 
 public class InvoiceHandler {
 
     public static final String PAYMENT_PROVIDER_KEY = "paymentProvider";
 
-    /*
-    private final static int NB_PAYMENT_THREADS = 3; // STEPH
-    private final static String PAYMENT_GROUP_NAME = "payment-grp";
-    private final static String PAYMENT_TH_NAME = "payment-th";
-*/
-
     private final PaymentProcessor paymentProcessor;
     private final AccountUserApi accountUserApi;
     private final Clock clock;
-    private final TagUserApi tagUserApi;
 
 
     private static final Logger log = LoggerFactory.getLogger(InvoiceHandler.class);
@@ -66,7 +55,6 @@ public class InvoiceHandler {
                           final TagUserApi tagUserApi) {
         this.clock = clock;
         this.accountUserApi = accountUserApi;
-        this.tagUserApi = tagUserApi;
         this.paymentProcessor = paymentProcessor;
     }
 
@@ -79,11 +67,9 @@ public class InvoiceHandler {
 
         Account account = null;
         try {
-            account = accountUserApi.getAccountById(event.getAccountId());
-            if (isAccountAutoPayOff(account.getId())) {
-                return;
-            }
+
             final CallContext context = new DefaultCallContext("PaymentRequestProcessor", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken(), clock);
+            account = accountUserApi.getAccountById(event.getAccountId());
             paymentProcessor.createPayment(account, event.getInvoiceId(), null, context, false);
         } catch (AccountApiException e) {
             log.error("Failed to process invoice payment", e);
@@ -93,16 +79,6 @@ public class InvoiceHandler {
             }
         }
     }
-
-    private boolean isAccountAutoPayOff(final UUID accountId) {
-        final Map<String, Tag> accountTags = tagUserApi.getTags(accountId, ObjectType.ACCOUNT);
-        for (final Tag cur : accountTags.values()) {
-            if (cur.getTagDefinitionName().equals(ControlTagType.AUTO_PAY_OFF.toString())) {
-                return true;
-            }
-        }
-        return false;
-    }
 }
 
 
diff --git a/payment/src/main/java/com/ning/billing/payment/bus/TagHandler.java b/payment/src/main/java/com/ning/billing/payment/bus/TagHandler.java
index da66d06..53527ab 100644
--- a/payment/src/main/java/com/ning/billing/payment/bus/TagHandler.java
+++ b/payment/src/main/java/com/ning/billing/payment/bus/TagHandler.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Copyright 2010-2011 Ning, Inc.
  *
  * Ning licenses this file to you under the Apache License, version 2.0
@@ -15,6 +15,63 @@
  */
 package com.ning.billing.payment.bus;
 
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.ning.billing.account.api.Account;
+import com.ning.billing.account.api.AccountApiException;
+import com.ning.billing.account.api.AccountUserApi;
+import com.ning.billing.payment.api.PaymentApiException;
+import com.ning.billing.payment.core.PaymentProcessor;
+import com.ning.billing.util.callcontext.CallContext;
+import com.ning.billing.util.callcontext.CallOrigin;
+import com.ning.billing.util.callcontext.DefaultCallContext;
+import com.ning.billing.util.callcontext.UserType;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.dao.ObjectType;
+import com.ning.billing.util.tag.ControlTagType;
+import com.ning.billing.util.tag.api.ControlTagDeletionEvent;
+
 public class TagHandler {
 
+    private static final Logger log = LoggerFactory.getLogger(TagHandler.class);
+
+    private final Clock clock;
+    private final AccountUserApi accountUserApi;
+    private final PaymentProcessor paymentProcessor;
+
+    @Inject
+    public TagHandler(final Clock clock,
+            final AccountUserApi accountUserApi,
+            final PaymentProcessor paymentProcessor) {
+        this.clock = clock;
+        this.accountUserApi = accountUserApi;
+        this.paymentProcessor = paymentProcessor;
+    }
+
+    @Subscribe
+    public void process_AUTO_PAY_OFF_removal(final ControlTagDeletionEvent event) {
+        if (event.getTagDefinition().getName().equals(ControlTagType.AUTO_PAY_OFF.toString()) && event.getObjectType() ==  ObjectType.ACCOUNT) {
+            final UUID accountId = event.getObjectId();
+            processUnpaid_AUTO_PAY_OFF_payments(accountId, event.getUserToken());
+        }
+    }
+
+    private void processUnpaid_AUTO_PAY_OFF_payments(final UUID accountId, final UUID userToken) {
+        try {
+            final CallContext context = new DefaultCallContext("PaymentRequestProcessor", CallOrigin.INTERNAL, UserType.SYSTEM, userToken, clock);
+            final Account account = accountUserApi.getAccountById(accountId);
+
+            paymentProcessor.process_AUTO_PAY_OFF_removal(account, context);
+
+        } catch (AccountApiException e) {
+            log.warn(String.format("Failed to process process  removal AUTO_PAY_OFF for account %s", accountId), e);
+        } catch (PaymentApiException e) {
+            log.warn(String.format("Failed to process process  removal AUTO_PAY_OFF for account %s", accountId), e);
+        }
+    }
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
index f00de24..6c154fd 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
@@ -18,9 +18,11 @@ package com.ning.billing.payment.core;
 import javax.inject.Inject;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
@@ -46,13 +48,16 @@ import com.ning.billing.payment.api.PaymentStatus;
 import com.ning.billing.payment.dao.PaymentAttemptModelDao;
 import com.ning.billing.payment.dao.PaymentDao;
 import com.ning.billing.payment.dao.PaymentModelDao;
+import com.ning.billing.payment.dao.PaymentSqlDao;
 import com.ning.billing.payment.dispatcher.PluginDispatcher;
 import com.ning.billing.payment.plugin.api.PaymentInfoPlugin;
 import com.ning.billing.payment.plugin.api.PaymentPluginApi;
 import com.ning.billing.payment.plugin.api.PaymentPluginApiException;
 import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
+import com.ning.billing.payment.retry.AutoPayRetryService.AutoPayRetryServiceScheduler;
 import com.ning.billing.payment.retry.FailedPaymentRetryService.FailedPaymentRetryServiceScheduler;
 import com.ning.billing.payment.retry.PluginFailureRetryService.PluginFailureRetryServiceScheduler;
+import com.ning.billing.util.api.TagUserApi;
 import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.bus.BusEvent;
 import com.ning.billing.util.callcontext.CallContext;
@@ -60,15 +65,21 @@ import com.ning.billing.util.callcontext.CallContextFactory;
 import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.dao.ObjectType;
 import com.ning.billing.util.globallocker.GlobalLocker;
+import com.ning.billing.util.tag.ControlTagType;
+import com.ning.billing.util.tag.Tag;
 
 import static com.ning.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
 
 public class PaymentProcessor extends ProcessorBase {
 
     private final InvoicePaymentApi invoicePaymentApi;
+    private final TagUserApi tagUserApi;
     private final FailedPaymentRetryServiceScheduler failedPaymentRetryService;
     private final PluginFailureRetryServiceScheduler pluginFailureRetryService;
+    private final AutoPayRetryServiceScheduler autoPayoffRetryService;
+
     private final CallContextFactory factory;
     private final Clock clock;
 
@@ -81,8 +92,10 @@ public class PaymentProcessor extends ProcessorBase {
     public PaymentProcessor(final PaymentProviderPluginRegistry pluginRegistry,
             final AccountUserApi accountUserApi,
             final InvoicePaymentApi invoicePaymentApi,
+            final TagUserApi tagUserApi,
             final FailedPaymentRetryServiceScheduler failedPaymentRetryService,
             final PluginFailureRetryServiceScheduler pluginFailureRetryService,
+            final AutoPayRetryServiceScheduler autoPayoffRetryService,
             final PaymentDao paymentDao,
             final Bus eventBus,
             final Clock clock,
@@ -91,8 +104,10 @@ public class PaymentProcessor extends ProcessorBase {
             final CallContextFactory factory) {
         super(pluginRegistry, accountUserApi, eventBus, paymentDao, locker, executor);
         this.invoicePaymentApi = invoicePaymentApi;
+        this.tagUserApi = tagUserApi;
         this.failedPaymentRetryService = failedPaymentRetryService;
         this.pluginFailureRetryService = pluginFailureRetryService;
+        this.autoPayoffRetryService = autoPayoffRetryService;
         this.clock = clock;
         this.factory = factory;
         this.paymentPluginDispatcher = new PluginDispatcher<Payment>(executor);
@@ -130,6 +145,53 @@ public class PaymentProcessor extends ProcessorBase {
         return result;
     }
 
+    public void process_AUTO_PAY_OFF_removal(final Account account, final CallContext context) throws PaymentApiException  {
+
+        try {
+            voidPluginDispatcher.dispatchWithAccountLock(new CallableWithAccountLock<Void>(locker,
+                    account.getExternalKey(),
+                    new WithAccountLockCallback<Void>() {
+
+                @Override
+                public Void doOperation() throws PaymentApiException {
+
+                    final List<PaymentModelDao> payments = paymentDao.getPaymentsForAccount(account.getId());
+                    final Collection<PaymentModelDao> paymentsToBeCompleted = Collections2.filter(payments, new Predicate<PaymentModelDao>() {
+                        @Override
+                        public boolean apply(final PaymentModelDao in) {
+                            // Payments left in AUTO_PAY_OFF or for which we did not retry enough
+                             return (in.getPaymentStatus() == PaymentStatus.AUTO_PAY_OFF ||
+                                     in.getPaymentStatus() == PaymentStatus.PAYMENT_FAILURE ||
+                                     in.getPaymentStatus() == PaymentStatus.PLUGIN_FAILURE);
+                        }
+                    });
+                    // Insert one retry event for each payment left in AUTO_PAY_OFF
+                    for (PaymentModelDao cur : paymentsToBeCompleted) {
+                        switch(cur.getPaymentStatus()) {
+                        case AUTO_PAY_OFF:
+                            autoPayoffRetryService.scheduleRetry(cur.getId(), clock.getUTCNow());
+                            break;
+                        case PAYMENT_FAILURE:
+                            scheduleRetryOnPaymentFailure(cur.getId());
+                            break;
+                        case PLUGIN_FAILURE:
+                            scheduleRetryOnPluginFailure(cur.getId());
+                            break;
+                        default:
+                            // Impossible...
+                            throw new RuntimeException("Unexpected case " + cur.getPaymentStatus());
+                        }
+
+                    }
+                    return null;
+                }
+            }));
+        } catch (TimeoutException e) {
+            throw new PaymentApiException(ErrorCode.UNEXPECTED_ERROR, "Unexpected timeout for payment creation (AUTO_PAY_OFF)");
+        }
+    }
+
+
     public Payment createPayment(final String accountKey, final UUID invoiceId, final BigDecimal inputAmount, final CallContext context, final boolean isInstantPayment)
     throws PaymentApiException {
         try {
@@ -151,6 +213,8 @@ public class PaymentProcessor extends ProcessorBase {
 
                 @Override
                 public Payment doOperation() throws PaymentApiException {
+
+
                     final Invoice invoice = invoicePaymentApi.getInvoice(invoiceId);
 
                     if (invoice.isMigrationInvoice()) {
@@ -159,7 +223,11 @@ public class PaymentProcessor extends ProcessorBase {
                     }
 
                     final BigDecimal requestedAmount = getAndValidatePaymentAmount(invoice, inputAmount, isInstantPayment);
-                    return processNewPaymentWithAccountLocked(plugin, account, invoice, requestedAmount, isInstantPayment, context);
+                    if (isAccountAutoPayOff(account.getId())) {
+                        return processNewPaymentForAutoPayOffWithAccountLocked(account, invoice, requestedAmount, context);
+                    } else {
+                        return processNewPaymentWithAccountLocked(plugin, account, invoice, requestedAmount, isInstantPayment, context);
+                    }
                 }
             }));
         } catch (TimeoutException e) {
@@ -189,10 +257,15 @@ public class PaymentProcessor extends ProcessorBase {
             throw new PaymentApiException(ErrorCode.PAYMENT_AMOUNT_DENIED,
                     invoice.getId(), inputAmount.floatValue(), invoice.getBalance().floatValue());
         }
-        return inputAmount != null ? inputAmount : invoice.getBalance();
+        BigDecimal result =  inputAmount != null ? inputAmount : invoice.getBalance();
+        return result.setScale(2, RoundingMode.HALF_EVEN);
     }
 
 
+    public void retryAutoPayOff(final UUID paymentId) {
+        retryFailedPaymentInternal(paymentId, PaymentStatus.AUTO_PAY_OFF);
+    }
+
     public void retryPluginFailure(final UUID paymentId) {
         retryFailedPaymentInternal(paymentId, PaymentStatus.PLUGIN_FAILURE, PaymentStatus.TIMEDOUT);
     }
@@ -201,6 +274,17 @@ public class PaymentProcessor extends ProcessorBase {
         retryFailedPaymentInternal(paymentId, PaymentStatus.PAYMENT_FAILURE);
     }
 
+    private boolean isAccountAutoPayOff(final UUID accountId) {
+        final Map<String, Tag> accountTags = tagUserApi.getTags(accountId, ObjectType.ACCOUNT);
+        for (final Tag cur : accountTags.values()) {
+            if (cur.getTagDefinitionName().equals(ControlTagType.AUTO_PAY_OFF.toString())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+
     private void retryFailedPaymentInternal(final UUID paymentId, final PaymentStatus... expectedPaymentStates) {
 
         try {
@@ -211,6 +295,11 @@ public class PaymentProcessor extends ProcessorBase {
                 return;
             }
 
+            if (isAccountAutoPayOff(payment.getAccountId())) {
+                log.info(String.format("Skip retry payment %s in state %s because AUTO_PAY_OFF", payment.getId(), payment.getPaymentStatus()));
+                return;
+            }
+
             final Account account = accountUserApi.getAccountById(payment.getAccountId());
             final PaymentPluginApi plugin = getPaymentProviderPlugin(account);
             final CallContext context = factory.createCallContext("PaymentRetry", CallOrigin.INTERNAL, UserType.SYSTEM);
@@ -259,23 +348,34 @@ public class PaymentProcessor extends ProcessorBase {
         }
     }
 
+    private Payment processNewPaymentForAutoPayOffWithAccountLocked(final Account account, final Invoice invoice, final BigDecimal requestedAmount, final CallContext context)
+    throws PaymentApiException {
+
+    final PaymentStatus paymentStatus =  PaymentStatus.AUTO_PAY_OFF;
+
+    final PaymentModelDao paymentInfo = new PaymentModelDao(account.getId(), invoice.getId(), account.getPaymentMethodId(), requestedAmount, invoice.getCurrency(), invoice.getTargetDate(), paymentStatus);
+    final PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(account.getId(), invoice.getId(), paymentInfo.getId(), paymentStatus, clock.getUTCNow(), requestedAmount);
+
+    paymentDao.insertPaymentWithAttempt(paymentInfo, attempt, context);
+    return new DefaultPayment(paymentInfo, Collections.singletonList(attempt));
+}
+
+
     private Payment processNewPaymentWithAccountLocked(final PaymentPluginApi plugin, final Account account, final Invoice invoice,
                                                        final BigDecimal requestedAmount, final boolean isInstantPayment, final CallContext context) throws PaymentApiException {
 
-        final boolean scheduleRetryForPayment = !isInstantPayment;
-        final PaymentModelDao payment = new PaymentModelDao(account.getId(), invoice.getId(), account.getPaymentMethodId(),
-                                                            requestedAmount.setScale(2, RoundingMode.HALF_EVEN), invoice.getCurrency(), invoice.getTargetDate());
+
+        final PaymentModelDao payment = new PaymentModelDao(account.getId(), invoice.getId(), account.getPaymentMethodId(), requestedAmount.setScale(2, RoundingMode.HALF_EVEN), invoice.getCurrency(), invoice.getTargetDate());
         final PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(account.getId(), invoice.getId(), payment.getId(), clock.getUTCNow(), requestedAmount);
 
-        final PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, scheduleRetryForPayment, context);
+        final PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, context);
         return processPaymentWithAccountLocked(plugin, account, invoice, savedPayment, attempt, isInstantPayment, context);
     }
 
     private Payment processRetryPaymentWithAccountLocked(final PaymentPluginApi plugin, final Account account, final Invoice invoice, final PaymentModelDao payment,
             final BigDecimal requestedAmount, final CallContext context) throws PaymentApiException {
-        final boolean scheduleRetryForPayment = true;
         final PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(account.getId(), invoice.getId(), payment.getId(), clock.getUTCNow(), requestedAmount);
-        paymentDao.insertNewAttemptForPayment(payment.getId(), attempt, scheduleRetryForPayment, context);
+        paymentDao.insertNewAttemptForPayment(payment.getId(), attempt, context);
         return processPaymentWithAccountLocked(plugin, account, invoice, payment, attempt, false, context);
     }
 
@@ -314,13 +414,10 @@ public class PaymentProcessor extends ProcessorBase {
                 break;
 
             case ERROR:
+                allAttempts = paymentDao.getAttemptsForPayment(paymentInput.getId());
                 // Schedule if non instant payment and max attempt for retry not reached yet
                 if (!isInstantPayment) {
-                    allAttempts = paymentDao.getAttemptsForPayment(paymentInput.getId());
-                    final int retryAttempt = getNumberAttemptsInState(paymentInput.getId(), allAttempts,
-                            PaymentStatus.UNKNOWN, PaymentStatus.PAYMENT_FAILURE);
-                    final boolean isScheduledForRetry = failedPaymentRetryService.scheduleRetry(paymentInput.getId(), retryAttempt);
-                    paymentStatus = isScheduledForRetry ? PaymentStatus.PAYMENT_FAILURE : PaymentStatus.PAYMENT_FAILURE_ABORTED;
+                    paymentStatus = scheduleRetryOnPaymentFailure(paymentInput.getId());
                 } else {
                     paymentStatus = PaymentStatus.PAYMENT_FAILURE_ABORTED;
                 }
@@ -365,6 +462,14 @@ public class PaymentProcessor extends ProcessorBase {
         return isScheduledForRetry ? PaymentStatus.PLUGIN_FAILURE : PaymentStatus.PLUGIN_FAILURE_ABORTED;
     }
 
+    private PaymentStatus scheduleRetryOnPaymentFailure(final UUID paymentId) {
+        final List<PaymentAttemptModelDao> allAttempts = paymentDao.getAttemptsForPayment(paymentId);
+        final int retryAttempt = getNumberAttemptsInState(paymentId, allAttempts,
+                PaymentStatus.UNKNOWN, PaymentStatus.PAYMENT_FAILURE);
+        final boolean isScheduledForRetry = failedPaymentRetryService.scheduleRetry(paymentId, retryAttempt);
+        return isScheduledForRetry ? PaymentStatus.PAYMENT_FAILURE : PaymentStatus.PAYMENT_FAILURE_ABORTED;
+    }
+
     private int getNumberAttemptsInState(final UUID paymentId, final List<PaymentAttemptModelDao> allAttempts, final PaymentStatus... statuses) {
         if (allAttempts == null || allAttempts.size() == 0) {
             return 0;
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java b/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java
index 4ecbe1f..0a2aa27 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java
@@ -40,7 +40,6 @@ public class AuditedPaymentDao implements PaymentDao {
     private final PaymentAttemptSqlDao paymentAttemptSqlDao;
     private final PaymentMethodSqlDao paymentMethodSqlDao;
     private final RefundSqlDao refundSqlDao;
-    //private final TimedoutPaymentRetryServiceScheduler timedoutSchduler;
 
     @Inject
     public AuditedPaymentDao(final IDBI dbi, final PluginFailureRetryServiceScheduler timedoutSchduler) {
@@ -48,13 +47,12 @@ public class AuditedPaymentDao implements PaymentDao {
         this.paymentAttemptSqlDao = dbi.onDemand(PaymentAttemptSqlDao.class);
         this.paymentMethodSqlDao = dbi.onDemand(PaymentMethodSqlDao.class);
         this.refundSqlDao = dbi.onDemand(RefundSqlDao.class);
-        // this.timedoutSchduler = timedoutSchduler;
     }
 
 
     @Override
     public PaymentAttemptModelDao insertNewAttemptForPayment(final UUID paymentId,
-                                                             final PaymentAttemptModelDao attempt, final boolean scheduleTimeoutRetry, final CallContext context) {
+                                                             final PaymentAttemptModelDao attempt, final CallContext context) {
 
         return paymentAttemptSqlDao.inTransaction(new Transaction<PaymentAttemptModelDao, PaymentAttemptSqlDao>() {
             @Override
@@ -70,7 +68,7 @@ public class AuditedPaymentDao implements PaymentDao {
 
 
     @Override
-    public PaymentModelDao insertPaymentWithAttempt(final PaymentModelDao payment, final PaymentAttemptModelDao attempt, final boolean scheduleTimeoutRetry, final CallContext context) {
+    public PaymentModelDao insertPaymentWithAttempt(final PaymentModelDao payment, final PaymentAttemptModelDao attempt, final CallContext context) {
 
         return paymentSqlDao.inTransaction(new Transaction<PaymentModelDao, PaymentSqlDao>() {
 
@@ -85,28 +83,6 @@ public class AuditedPaymentDao implements PaymentDao {
         });
     }
 
-    /*
-    private int getNbTimedoutAttemptsFromTransaction(final UUID paymentId, final PaymentAttemptSqlDao transactional) {
-        List<PaymentAttemptModelDao> attempts = transactional.getPaymentAttempts(paymentId.toString());
-        return Collections2.filter(attempts, new Predicate<PaymentAttemptModelDao>() {
-            @Override
-            public boolean apply(PaymentAttemptModelDao input) {
-                return input.getPaymentStatus() == PaymentStatus.TIMEDOUT;
-            }
-        }).size();
-    }
-
-
-    private void scheduleTimeoutRetryFromTransaction(final UUID paymentId, final PaymentAttemptSqlDao transactional, final boolean scheduleTimeoutRetry) {
-
-        if (scheduleTimeoutRetry) {
-            int retryAttempt = getNbTimedoutAttemptsFromTransaction(paymentId, transactional) + 1;
-            timedoutSchduler.scheduleRetryFromTransaction(paymentId, retryAttempt, transactional);
-        }
-    }
-*/
-
-
     private PaymentModelDao insertPaymentFromTransaction(final PaymentModelDao payment, final CallContext context, final PaymentSqlDao transactional) {
         transactional.insertPayment(payment, context);
         final PaymentModelDao savedPayment = transactional.getPayment(payment.getId().toString());
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptModelDao.java b/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptModelDao.java
index 9c65c80..383bd0f 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptModelDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptModelDao.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Copyright 2010-2011 Ning, Inc.
  *
  * Ning licenses this file to you under the Apache License, version 2.0
@@ -46,6 +46,10 @@ public class PaymentAttemptModelDao extends EntityBase {
         this.paymentError = paymentError;
     }
 
+    public PaymentAttemptModelDao(final UUID accountId, final UUID invoiceId, final UUID paymentId, final PaymentStatus paymentStatus, final DateTime effectiveDate, final BigDecimal requestedAmount) {
+        this(UUID.randomUUID(), accountId, invoiceId, paymentId, paymentStatus, effectiveDate, requestedAmount, null);
+    }
+
     public PaymentAttemptModelDao(final UUID accountId, final UUID invoiceId, final UUID paymentId, final DateTime effectiveDate, final BigDecimal requestedAmount) {
         this(UUID.randomUUID(), accountId, invoiceId, paymentId, PaymentStatus.UNKNOWN, effectiveDate, requestedAmount, null);
     }
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java b/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java
index 711ae93..68a7fec 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java
@@ -25,9 +25,9 @@ import com.ning.billing.util.callcontext.CallContext;
 public interface PaymentDao {
 
     // STEPH do we need object returned?
-    public PaymentModelDao insertPaymentWithAttempt(final PaymentModelDao paymentInfo, final PaymentAttemptModelDao attempt, final boolean scheduleTimeoutRetry, final CallContext context);
+    public PaymentModelDao insertPaymentWithAttempt(final PaymentModelDao paymentInfo, final PaymentAttemptModelDao attempt, final CallContext context);
 
-    public PaymentAttemptModelDao insertNewAttemptForPayment(final UUID paymentId, final PaymentAttemptModelDao attempt, final boolean scheduleTimeoutRetry, final CallContext context);
+    public PaymentAttemptModelDao insertNewAttemptForPayment(final UUID paymentId, final PaymentAttemptModelDao attempt, final CallContext context);
 
     public void updateStatusForPaymentWithAttempt(final UUID paymentId, final PaymentStatus paymentStatus, final String paymentError, final String extPaymentRefId, final UUID attemptId, final CallContext context);
 
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/PaymentModelDao.java b/payment/src/main/java/com/ning/billing/payment/dao/PaymentModelDao.java
index 24554bd..24a9f74 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/PaymentModelDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/PaymentModelDao.java
@@ -38,7 +38,6 @@ public class PaymentModelDao extends EntityBase {
     private final PaymentStatus paymentStatus;
     private final String extPaymentRefId;
 
-
     public PaymentModelDao(final UUID id, final UUID accountId, final UUID invoiceId, final UUID paymentMethodId,
                            final Integer paymentNumber, final BigDecimal amount, final Currency currency,
                            final PaymentStatus paymentStatus, final DateTime effectiveDate, final String extPaymentRefId) {
@@ -55,7 +54,12 @@ public class PaymentModelDao extends EntityBase {
     }
 
     public PaymentModelDao(final UUID accountId, final UUID invoiceId, final UUID paymentMethodId,
-                           final BigDecimal amount, final Currency currency, final DateTime effectiveDate) {
+            final BigDecimal amount, final Currency currency, final DateTime effectiveDate, final PaymentStatus paymentStatus) {
+        this(UUID.randomUUID(), accountId, invoiceId, paymentMethodId, INVALID_PAYMENT_NUMBER, amount, currency, paymentStatus, effectiveDate, null);
+    }
+
+    public PaymentModelDao(final UUID accountId, final UUID invoiceId, final UUID paymentMethodId,
+            final BigDecimal amount, final Currency currency, final DateTime effectiveDate) {
         this(UUID.randomUUID(), accountId, invoiceId, paymentMethodId, INVALID_PAYMENT_NUMBER, amount, currency, PaymentStatus.UNKNOWN, effectiveDate, null);
     }
 
diff --git a/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java b/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
index 9bd547b..cc97e10 100644
--- a/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
+++ b/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
@@ -25,6 +25,8 @@ import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.api.PaymentService;
 import com.ning.billing.payment.bus.InvoiceHandler;
+import com.ning.billing.payment.bus.TagHandler;
+import com.ning.billing.payment.retry.AutoPayRetryService;
 import com.ning.billing.payment.retry.FailedPaymentRetryService;
 import com.ning.billing.payment.retry.PluginFailureRetryService;
 import com.ning.billing.util.bus.Bus;
@@ -35,23 +37,30 @@ public class DefaultPaymentService implements PaymentService {
 
     private static final Logger log = LoggerFactory.getLogger(DefaultPaymentService.class);
 
-    // STEPH for retry crappiness
     public static final String SERVICE_NAME = "payment-service";
 
-    private final InvoiceHandler requestProcessor;
+    private final InvoiceHandler invoiceHandler;
+    private final TagHandler tagHandler;
     private final Bus eventBus;
     private final PaymentApi api;
     private final FailedPaymentRetryService failedRetryService;
     private final PluginFailureRetryService timedoutRetryService;
+    private final AutoPayRetryService autoPayoffRetryService;
 
     @Inject
-    public DefaultPaymentService(final InvoiceHandler requestProcessor, final PaymentApi api, final Bus eventBus,
-                                 final FailedPaymentRetryService failedRetryService, final PluginFailureRetryService timedoutRetryService) {
-        this.requestProcessor = requestProcessor;
+    public DefaultPaymentService(final InvoiceHandler invoiceHandler,
+            final TagHandler tagHandler,
+            final PaymentApi api, final Bus eventBus,
+            final FailedPaymentRetryService failedRetryService,
+            final PluginFailureRetryService timedoutRetryService,
+            final AutoPayRetryService autoPayoffRetryService) {
+        this.invoiceHandler = invoiceHandler;
+        this.tagHandler = tagHandler;
         this.eventBus = eventBus;
         this.api = api;
         this.failedRetryService = failedRetryService;
         this.timedoutRetryService = timedoutRetryService;
+        this.autoPayoffRetryService = autoPayoffRetryService;
     }
 
     @Override
@@ -63,12 +72,14 @@ public class DefaultPaymentService implements PaymentService {
     public void initialize() throws NotificationQueueAlreadyExists {
         failedRetryService.initialize(SERVICE_NAME);
         timedoutRetryService.initialize(SERVICE_NAME);
+        autoPayoffRetryService.initialize(SERVICE_NAME);
     }
 
     @LifecycleHandlerType(LifecycleHandlerType.LifecycleLevel.REGISTER_EVENTS)
     public void registerForNotifications() {
         try {
-            eventBus.register(requestProcessor);
+            eventBus.register(invoiceHandler);
+            eventBus.register(tagHandler);
         } catch (Bus.EventBusException e) {
             log.error("Unable to register with the EventBus!", e);
         }
@@ -78,12 +89,14 @@ public class DefaultPaymentService implements PaymentService {
     public void start() {
         failedRetryService.start();
         timedoutRetryService.start();
+        autoPayoffRetryService.start();
     }
 
     @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
     public void stop() throws NoSuchNotificationQueue {
         failedRetryService.stop();
         timedoutRetryService.stop();
+        autoPayoffRetryService.stop();
     }
 
     @Override
diff --git a/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java b/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java
index 803b818..348a0ed 100644
--- a/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java
+++ b/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java
@@ -33,6 +33,7 @@ import com.ning.billing.payment.api.DefaultPaymentApi;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.api.PaymentService;
 import com.ning.billing.payment.bus.InvoiceHandler;
+import com.ning.billing.payment.bus.TagHandler;
 import com.ning.billing.payment.core.PaymentMethodProcessor;
 import com.ning.billing.payment.core.PaymentProcessor;
 import com.ning.billing.payment.core.RefundProcessor;
@@ -40,6 +41,8 @@ import com.ning.billing.payment.dao.AuditedPaymentDao;
 import com.ning.billing.payment.dao.PaymentDao;
 import com.ning.billing.payment.provider.DefaultPaymentProviderPluginRegistry;
 import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
+import com.ning.billing.payment.retry.AutoPayRetryService;
+import com.ning.billing.payment.retry.AutoPayRetryService.AutoPayRetryServiceScheduler;
 import com.ning.billing.payment.retry.FailedPaymentRetryService;
 import com.ning.billing.payment.retry.FailedPaymentRetryService.FailedPaymentRetryServiceScheduler;
 import com.ning.billing.payment.retry.PluginFailureRetryService;
@@ -76,8 +79,10 @@ public class PaymentModule extends AbstractModule {
     protected void installRetryEngines() {
         bind(FailedPaymentRetryService.class).asEagerSingleton();
         bind(PluginFailureRetryService.class).asEagerSingleton();
+        bind(AutoPayRetryService.class).asEagerSingleton();
         bind(FailedPaymentRetryServiceScheduler.class).asEagerSingleton();
         bind(PluginFailureRetryServiceScheduler.class).asEagerSingleton();
+        bind(AutoPayRetryServiceScheduler.class).asEagerSingleton();
     }
 
     protected void installProcessors() {
@@ -105,6 +110,7 @@ public class PaymentModule extends AbstractModule {
         bind(PaymentProviderPluginRegistry.class).to(DefaultPaymentProviderPluginRegistry.class).asEagerSingleton();
         bind(PaymentApi.class).to(DefaultPaymentApi.class).asEagerSingleton();
         bind(InvoiceHandler.class).asEagerSingleton();
+        bind(TagHandler.class).asEagerSingleton();
         bind(PaymentService.class).to(DefaultPaymentService.class).asEagerSingleton();
         installPaymentProviderPlugins(paymentConfig);
         installPaymentDao();
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
new file mode 100644
index 0000000..1ebb798
--- /dev/null
+++ b/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.ning.billing.payment.retry;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.config.PaymentConfig;
+import com.ning.billing.payment.core.PaymentProcessor;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
+
+public class AutoPayRetryService extends BaseRetryService implements RetryService {
+
+
+    private static final Logger log = LoggerFactory.getLogger(FailedPaymentRetryService.class);
+
+    public static final String QUEUE_NAME = "autopayoff";
+
+    private final PaymentProcessor paymentProcessor;
+
+    @Inject
+    public AutoPayRetryService(final NotificationQueueService notificationQueueService,
+            final Clock clock,
+            final PaymentConfig config,
+            final PaymentProcessor paymentProcessor) {
+        super(notificationQueueService, clock, config);
+        this.paymentProcessor = paymentProcessor;
+    }
+
+
+    @Override
+    public String getQueueName() {
+        return QUEUE_NAME;
+    }
+
+    @Override
+    public void retry(final UUID paymentId) {
+        paymentProcessor.retryAutoPayOff(paymentId);
+    }
+
+    public static class AutoPayRetryServiceScheduler extends RetryServiceScheduler {
+
+
+        @Inject
+        public AutoPayRetryServiceScheduler(final NotificationQueueService notificationQueueService) {
+            super(notificationQueueService);
+        }
+
+        @Override
+        public boolean scheduleRetry(final UUID paymentId, final DateTime timeOfRetry) {
+            return super.scheduleRetry(paymentId, timeOfRetry);
+        }
+
+        @Override
+        public String getQueueName() {
+            return QUEUE_NAME;
+        }
+    }
+}
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
index 2259ce6..9fd3231 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
@@ -77,7 +77,7 @@ public class FailedPaymentRetryService extends BaseRetryService implements Retry
             if (timeOfRetry == null) {
                 return false;
             }
-            return scheduleRetry(paymentId, timeOfRetry);
+            return super.scheduleRetry(paymentId, timeOfRetry);
         }
 
 
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
index 2fca8e0..6ea381c 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Copyright 2010-2011 Ning, Inc.
  *
  * Ning licenses this file to you under the Apache License, version 2.0
@@ -78,7 +78,7 @@ public class PluginFailureRetryService extends BaseRetryService implements Retry
             if (nextRetryDate == null) {
                 return false;
             }
-            return scheduleRetry(paymentId, nextRetryDate);
+            return super.scheduleRetry(paymentId, nextRetryDate);
         }
 
         public boolean scheduleRetryFromTransaction(final UUID paymentId, final int retryAttempt, final Transmogrifier transactionalDao) {
diff --git a/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
index 80b75f4..c1ff2b7 100644
--- a/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
@@ -34,7 +34,7 @@ public class MockPaymentDao implements PaymentDao {
 
     @Override
     public PaymentModelDao insertPaymentWithAttempt(final PaymentModelDao paymentInfo, final PaymentAttemptModelDao attempt,
-                                                    final boolean scheduleTimeoutRetry, final CallContext context) {
+                                                    final CallContext context) {
         synchronized (this) {
             payments.put(paymentInfo.getId(), paymentInfo);
             attempts.put(attempt.getId(), attempt);
@@ -44,7 +44,7 @@ public class MockPaymentDao implements PaymentDao {
 
     @Override
     public PaymentAttemptModelDao insertNewAttemptForPayment(final UUID paymentId,
-                                                             final PaymentAttemptModelDao attempt, final boolean scheduleTimeoutRetry, final CallContext context) {
+                                                             final PaymentAttemptModelDao attempt, final CallContext context) {
         synchronized (this) {
             attempts.put(attempt.getId(), attempt);
         }
diff --git a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
index e0cdd79..1af7d2c 100644
--- a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
@@ -155,7 +155,7 @@ public class TestPaymentDao {
 
         final PaymentModelDao payment = new PaymentModelDao(accountId, invoiceId, paymentMethodId, amount, currency, effectiveDate);
         final PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), amount);
-        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, true, context);
+        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, context);
 
         final PaymentStatus paymentStatus = PaymentStatus.SUCCESS;
         final String paymentError = "No error";
@@ -198,7 +198,7 @@ public class TestPaymentDao {
         final PaymentModelDao payment = new PaymentModelDao(accountId, invoiceId, paymentMethodId, amount, currency, effectiveDate);
         final PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), amount);
 
-        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, true, context);
+        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, context);
         assertEquals(savedPayment.getId(), payment.getId());
         assertEquals(savedPayment.getAccountId(), accountId);
         assertEquals(savedPayment.getInvoiceId(), invoiceId);
@@ -250,11 +250,11 @@ public class TestPaymentDao {
 
         final PaymentModelDao payment = new PaymentModelDao(accountId, invoiceId, paymentMethodId, amount, currency, effectiveDate);
         final PaymentAttemptModelDao firstAttempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), amount);
-        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, firstAttempt, true, context);
+        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, firstAttempt, context);
 
         final BigDecimal newAmount = new BigDecimal(15.23).setScale(2, RoundingMode.HALF_EVEN);
         final PaymentAttemptModelDao secondAttempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), newAmount);
-        paymentDao.insertNewAttemptForPayment(payment.getId(), secondAttempt, true, context);
+        paymentDao.insertNewAttemptForPayment(payment.getId(), secondAttempt, context);
 
         final List<PaymentModelDao> payments = paymentDao.getPaymentsForInvoice(invoiceId);
         assertEquals(payments.size(), 1);