killbill-uncached

See #343 Add Pagination API for Janitor opeation to make sure

7/8/2015 8:12:19 PM

Details

diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
index 81672ff..864832a 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
@@ -83,7 +83,7 @@ abstract class CompletionTaskBase<T> implements Runnable {
             log.info("Janitor was requested to stop");
             return;
         }
-        final List<T> items = getItemsForIteration();
+        final Iterable<T> items = getItemsForIteration();
         for (final T item : items) {
             if (isStopped) {
                 log.info("Janitor was requested to stop");
@@ -101,7 +101,7 @@ abstract class CompletionTaskBase<T> implements Runnable {
         this.isStopped = true;
     }
 
-    public abstract List<T> getItemsForIteration();
+    public abstract Iterable<T> getItemsForIteration();
 
     public abstract void doIteration(final T item);
 
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
index 040b81b..746f036 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
@@ -43,6 +43,7 @@ import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
 import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.billing.util.entity.Pagination;
 import org.killbill.clock.Clock;
 import org.killbill.commons.locker.GlobalLocker;
 
@@ -70,10 +71,10 @@ public class IncompletePaymentAttemptTask extends CompletionTaskBase<PaymentAtte
     }
 
     @Override
-    public List<PaymentAttemptModelDao> getItemsForIteration() {
-        final List<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByStateAcrossTenants(retrySMHelper.getInitialState().getName(), getCreatedDateBefore());
-        if (!incompleteAttempts.isEmpty()) {
-            log.info("Janitor AttemptCompletionTask start run: found {} incomplete attempts", incompleteAttempts.size());
+    public Iterable<PaymentAttemptModelDao> getItemsForIteration() {
+        final Pagination<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByStateAcrossTenants(retrySMHelper.getInitialState().getName(), getCreatedDateBefore(), 0L, Long.MAX_VALUE);
+        if (incompleteAttempts.getTotalNbRecords() > 0) {
+            log.info("Janitor AttemptCompletionTask start run: found {} incomplete attempts", incompleteAttempts.getTotalNbRecords());
         }
         return incompleteAttempts;
     }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
index 010b216..3f33684 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
@@ -48,6 +48,7 @@ import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.TenantContext;
 import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.billing.util.entity.Pagination;
 import org.killbill.clock.Clock;
 import org.killbill.commons.locker.GlobalLocker;
 
@@ -63,8 +64,6 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
                                                                                                           .add(TransactionStatus.PENDING)
                                                                                                           .add(TransactionStatus.UNKNOWN)
                                                                                                           .build();
-    private static final int MAX_ITEMS_PER_LOOP = 100;
-
     @Inject
     public IncompletePaymentTransactionTask(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
                                             final PaymentDao paymentDao, final Clock clock,
@@ -74,10 +73,10 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
     }
 
     @Override
-    public List<PaymentTransactionModelDao> getItemsForIteration() {
-        final List<PaymentTransactionModelDao> result = paymentDao.getByTransactionStatusAcrossTenants(TRANSACTION_STATUSES_TO_CONSIDER, getCreatedDateBefore(), getCreatedDateAfter(), MAX_ITEMS_PER_LOOP);
-        if (!result.isEmpty()) {
-            log.info("Janitor IncompletePaymentTransactionTask start run: found {} pending/unknown payments", result.size());
+    public Iterable<PaymentTransactionModelDao> getItemsForIteration() {
+        final Pagination<PaymentTransactionModelDao> result = paymentDao.getByTransactionStatusAcrossTenants(TRANSACTION_STATUSES_TO_CONSIDER, getCreatedDateBefore(), getCreatedDateAfter(), 0L, Long.MAX_VALUE);
+        if (result.getTotalNbRecords() > 0) {
+            log.info("Janitor IncompletePaymentTransactionTask start run: found {} pending/unknown payments", result.getTotalNbRecords());
         }
         return result;
     }
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
index 3b41115..0921bea 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
@@ -20,11 +20,9 @@ package org.killbill.billing.payment.dao;
 
 import java.math.BigDecimal;
 import java.util.Collection;
-import java.util.HashMap;
+import java.util.Date;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
 import javax.annotation.Nullable;
@@ -41,10 +39,12 @@ import org.killbill.billing.payment.api.DefaultPaymentInfoEvent;
 import org.killbill.billing.payment.api.DefaultPaymentPluginErrorEvent;
 import org.killbill.billing.payment.api.Payment;
 import org.killbill.billing.payment.api.PaymentMethod;
+import org.killbill.billing.payment.api.PaymentTransaction;
 import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.api.TransactionType;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.dao.NonEntityDao;
+import org.killbill.billing.util.entity.Entity;
 import org.killbill.billing.util.entity.Pagination;
 import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper;
 import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.PaginationIteratorBuilder;
@@ -58,7 +58,6 @@ import org.skife.jdbi.v2.IDBI;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -122,14 +121,24 @@ public class DefaultPaymentDao implements PaymentDao {
     }
 
     @Override
-    public List<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate) {
-        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentAttemptModelDao>>() {
-            @Override
-            public List<PaymentAttemptModelDao> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
-                final PaymentAttemptSqlDao transactional = entitySqlDaoWrapperFactory.become(PaymentAttemptSqlDao.class);
-                return transactional.getByStateNameAcrossTenants(stateName, createdBeforeDate.toDate());
-            }
-        });
+    public Pagination<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate, final Long offset, final Long limit) {
+
+        final Date createdBefore = createdBeforeDate.toDate();
+        return paginationHelper.getPagination(PaymentAttemptSqlDao.class, new PaginationIteratorBuilder<PaymentAttemptModelDao, Entity, PaymentAttemptSqlDao>() {
+                                                  @Override
+                                                  public Long getCount(final PaymentAttemptSqlDao sqlDao, final InternalTenantContext context) {
+                                                      return sqlDao.getCountByStateNameAcrossTenants(stateName, createdBefore);
+                                                  }
+                                                  @Override
+                                                  public Iterator<PaymentAttemptModelDao> build(final PaymentAttemptSqlDao sqlDao, final Long limit, final InternalTenantContext context) {
+                                                      return sqlDao.getByStateNameAcrossTenants(stateName, createdBefore, offset, limit);
+                                                  }
+                                              },
+                                              offset,
+                                              limit,
+                                              null
+                                             );
+
     }
 
     @Override
@@ -157,18 +166,29 @@ public class DefaultPaymentDao implements PaymentDao {
     }
 
     @Override
-    public List<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, final DateTime createdBeforeDate, final DateTime createdAfterDate, final int limit) {
-        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentTransactionModelDao>>() {
-            @Override
-            public List<PaymentTransactionModelDao> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
-                final TransactionSqlDao transactional = entitySqlDaoWrapperFactory.become(TransactionSqlDao.class);
+    public Pagination<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, final DateTime createdBeforeDate, final DateTime createdAfterDate, final Long offset, final Long limit) {
 
-                final Collection<String> allTransactionStatus = ImmutableList.copyOf(Iterables.transform(transactionStatuses,  Functions.toStringFunction()));
-                 return transactional.getByTransactionStatusPriorDateAcrossTenants(allTransactionStatus, createdBeforeDate.toDate(), createdAfterDate.toDate(), limit);
-            }
-        });
-    }
+        final Collection<String> allTransactionStatus = ImmutableList.copyOf(Iterables.transform(transactionStatuses, Functions.toStringFunction()));
+        final Date createdBefore = createdBeforeDate.toDate();
+        final Date createdAfter = createdAfterDate.toDate();
+
+        return paginationHelper.getPagination(TransactionSqlDao.class,
+                                              new PaginationIteratorBuilder<PaymentTransactionModelDao, PaymentTransaction, TransactionSqlDao>() {
+                                                  @Override
+                                                  public Long getCount(final TransactionSqlDao sqlDao, final InternalTenantContext context) {
+                                                      return sqlDao.getCountByTransactionStatusPriorDateAcrossTenants(allTransactionStatus, createdBefore, createdAfter);
+                                                  }
 
+                                                  @Override
+                                                  public Iterator<PaymentTransactionModelDao> build(final TransactionSqlDao sqlDao, final Long limit, final InternalTenantContext context) {
+                                                      return sqlDao.getByTransactionStatusPriorDateAcrossTenants(allTransactionStatus, createdBefore, createdAfter, offset, limit);
+                                                  }
+                                              },
+                                              offset,
+                                              limit,
+                                              null
+                                             );
+    }
 
     @Override
     public List<PaymentTransactionModelDao> getPaymentTransactionsByExternalKey(final String transactionExternalKey, final InternalTenantContext context) {
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java
index 544e02e..cc2959e 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java
@@ -17,6 +17,7 @@
 package org.killbill.billing.payment.dao;
 
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 
 import org.killbill.billing.callcontext.InternalCallContext;
@@ -50,7 +51,13 @@ public interface PaymentAttemptSqlDao extends EntitySqlDao<PaymentAttemptModelDa
                                                          @BindBean final InternalTenantContext context);
 
     @SqlQuery
-    List<PaymentAttemptModelDao> getByStateNameAcrossTenants(@Bind("stateName") final String stateName,
-                                                             @Bind("createdBeforeDate") final Date createdBeforeDate);
+    Long getCountByStateNameAcrossTenants(@Bind("stateName") final String stateName,
+                                          @Bind("createdBeforeDate") final Date createdBeforeDate);
+
+    @SqlQuery
+    Iterator<PaymentAttemptModelDao> getByStateNameAcrossTenants(@Bind("stateName") final String stateName,
+                                                                 @Bind("createdBeforeDate") final Date createdBeforeDate,
+                                                                 @Bind("offset") final Long offset,
+                                                                 @Bind("rowCount") final Long rowCount);
 
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
index 2ac7ea1..535a623 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
@@ -30,13 +30,13 @@ import org.killbill.billing.util.entity.Pagination;
 
 public interface PaymentDao {
 
-    public List<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, int limit);
+    public Pagination<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, final Long offset, final Long limit);
 
     public PaymentAttemptModelDao insertPaymentAttemptWithProperties(PaymentAttemptModelDao attempt, InternalCallContext context);
 
     public void updatePaymentAttempt(UUID paymentAttemptId, UUID transactionId, String state, InternalCallContext context);
 
-    public List<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(String stateName, DateTime createdBeforeDate);
+    public Pagination<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(String stateName, DateTime createdBeforeDate, final Long offset, final Long limit);
 
     public List<PaymentAttemptModelDao> getPaymentAttempts(String paymentExternalKey, InternalTenantContext context);
 
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java
index 9e9ed12..0f5475b 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java
@@ -70,8 +70,8 @@ public interface PaymentSqlDao extends EntitySqlDao<PaymentModelDao, Payment> {
     @SqlQuery
     @SmartFetchSize(shouldStream = true)
     public Iterator<PaymentModelDao> getByPluginName(@Bind("pluginName") final String pluginName,
-                                                           @Bind("offset") final Long offset,
-                                                           @Bind("rowCount") final Long rowCount,
+                                                     @Bind("offset") final Long offset,
+                                                     @Bind("rowCount") final Long rowCount,
                                                            @BindBean final InternalTenantContext context);
 
     @SqlQuery
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
index d291ca0..d31d589 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
@@ -19,6 +19,7 @@ package org.killbill.billing.payment.dao;
 import java.math.BigDecimal;
 import java.util.Collection;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 
@@ -52,10 +53,16 @@ public interface TransactionSqlDao extends EntitySqlDao<PaymentTransactionModelD
                                                                          @BindBean final InternalTenantContext context);
 
     @SqlQuery
-    List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(@TransactionStatusCollectionBinder final Collection<String> statuses,
+    Long getCountByTransactionStatusPriorDateAcrossTenants(@TransactionStatusCollectionBinder final Collection<String> statuses,
+                                                           @Bind("createdBeforeDate") final Date createdBeforeDate,
+                                                           @Bind("createdAfterDate") final Date createdAfterDate);
+
+    @SqlQuery
+    Iterator<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(@TransactionStatusCollectionBinder final Collection<String> statuses,
                                                                                   @Bind("createdBeforeDate") final Date createdBeforeDate,
                                                                                   @Bind("createdAfterDate") final Date createdAfterDate,
-                                                                                  @Bind("limit") final int limit);
+                                                                                  @Bind("offset") final Long offset,
+                                                                                  @Bind("rowCount") final Long rowCount);
 
     @SqlQuery
     public List<PaymentTransactionModelDao> getByPaymentId(@Bind("paymentId") final UUID paymentId,
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
index c1f77a9..83860cc 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
@@ -73,6 +73,17 @@ where state_name = :stateName
 and created_date \< :createdBeforeDate
 <andCheckSoftDeletionWithComma("")>
 <defaultOrderBy()>
+limit :offset, :rowCount
+;
+>>
+
+getCountByStateNameAcrossTenants() ::= <<
+select
+count(1) as count
+from <tableName()>
+where state_name = :stateName
+and created_date \< :createdBeforeDate
+<andCheckSoftDeletionWithComma("")>
 ;
 >>
 
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
index 1d54780..7d36552 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
@@ -90,7 +90,18 @@ created_date >= :createdAfterDate
 and created_date \< :createdBeforeDate
 and transaction_status in (<statuses: {status | :status_<i0>}; separator="," >)
 <defaultOrderBy()>
-limit :limit
+limit :offset, :rowCount
+;
+>>
+
+getCountByTransactionStatusPriorDateAcrossTenants(statuses) ::= <<
+select
+count(1) as count
+from <tableName()>
+where
+created_date >= :createdAfterDate
+and created_date \< :createdBeforeDate
+and transaction_status in (<statuses: {status | :status_<i0>}; separator="," >)
 ;
 >>
 
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
index 21aee20..c761832 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
@@ -36,6 +36,7 @@ import org.killbill.billing.catalog.api.Currency;
 import org.killbill.billing.dao.MockNonEntityDao;
 import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.api.TransactionType;
+import org.killbill.billing.util.entity.DefaultPagination;
 import org.killbill.billing.util.entity.Pagination;
 
 import com.google.common.base.Predicate;
@@ -64,8 +65,8 @@ public class MockPaymentDao implements PaymentDao {
     }
 
     @Override
-    public List<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, int limit) {
-        return ImmutableList.copyOf(Iterables.filter(transactions.values(), new Predicate<PaymentTransactionModelDao>() {
+    public Pagination<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, Long offset, Long limit) {
+        final List<PaymentTransactionModelDao> result=  ImmutableList.copyOf(Iterables.filter(transactions.values(), new Predicate<PaymentTransactionModelDao>() {
             @Override
             public boolean apply(final PaymentTransactionModelDao input) {
                 return Iterables.any(transactionStatuses, new Predicate<TransactionStatus>() {
@@ -76,6 +77,7 @@ public class MockPaymentDao implements PaymentDao {
                 });
             }
         }));
+        return new DefaultPagination<PaymentTransactionModelDao>(new Long(result.size()), result.iterator());
     }
 
     @Override
@@ -108,7 +110,7 @@ public class MockPaymentDao implements PaymentDao {
     }
 
     @Override
-    public List<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate) {
+    public Pagination<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate, final Long offset, final Long limit) {
         return null;
     }
 
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
index 358cf2b..e156f5b 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
@@ -18,6 +18,7 @@ package org.killbill.billing.payment.dao;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 
@@ -32,6 +33,7 @@ import org.killbill.billing.payment.dao.PluginPropertySerializer.PluginPropertyS
 import org.killbill.billing.util.callcontext.CallOrigin;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.UserType;
+import org.killbill.billing.util.entity.Pagination;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -298,7 +300,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         final List<PaymentTransactionModelDao> result = getPendingTransactions(paymentModelDao.getId());
         Assert.assertEquals(result.size(), 3);
 
-        final List<PaymentTransactionModelDao> transactions1 = paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.PENDING), newTime, initialTime, 3);
+        final Iterable<PaymentTransactionModelDao> transactions1 = paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.PENDING), newTime, initialTime, 0L, 3L);
         for (PaymentTransactionModelDao paymentTransaction : transactions1) {
             final String newPaymentState = "XXX_FAILED";
             paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, payment.getLastSuccessStateName(),
@@ -316,7 +318,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         }
         ;
 
-        final List<PaymentTransactionModelDao> transactions2 = paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.PENDING), clock.getUTCNow(), initialTime, 1);
+        final Iterable<PaymentTransactionModelDao> transactions2 = paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.PENDING), clock.getUTCNow(), initialTime, 0L, 1L);
         for (PaymentTransactionModelDao paymentTransaction : transactions2) {
             final String newPaymentState = "XXX_FAILED";
             paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, payment.getLastSuccessStateName(),
@@ -467,7 +469,46 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         assertEquals(result.size(), 2);
     }
 
+
     @Test(groups = "slow")
+    public void testPaginationForPaymentByStatesAcrossTenants() {
+        // Right before createdAfterDate, so should not be returned
+        final DateTime createdDate1 = clock.getUTCNow().minusHours(1);
+
+        final int NB_ENTRIES = 30;
+        for (int i = 0; i < NB_ENTRIES; i++) {
+            final PaymentModelDao paymentModelDao1 = new PaymentModelDao(createdDate1, createdDate1, UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID().toString());
+            final PaymentTransactionModelDao transaction1 = new PaymentTransactionModelDao(createdDate1, createdDate1, null, UUID.randomUUID().toString(),
+                                                                                           paymentModelDao1.getId(), TransactionType.AUTHORIZE, createdDate1,
+                                                                                           TransactionStatus.UNKNOWN, BigDecimal.TEN, Currency.AED,
+                                                                                           "unknown", "");
+
+            final InternalCallContext context1 = new InternalCallContext(1L,
+                                                                         1L,
+                                                                         internalCallContext.getUserToken(),
+                                                                         internalCallContext.getCreatedBy(),
+                                                                         internalCallContext.getCallOrigin(),
+                                                                         internalCallContext.getContextUserType(),
+                                                                         internalCallContext.getReasonCode(),
+                                                                         internalCallContext.getComments(),
+                                                                         createdDate1,
+                                                                         createdDate1);
+            paymentDao.insertPaymentWithFirstTransaction(paymentModelDao1, transaction1, context1);
+        }
+
+        final Pagination<PaymentTransactionModelDao> result =  paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.UNKNOWN), clock.getUTCNow(), createdDate1, 0L, Long.MAX_VALUE);
+        Assert.assertEquals(result.getTotalNbRecords(), new Long(NB_ENTRIES));
+
+        final Iterator<PaymentTransactionModelDao> iterator = result.iterator();
+        for (int i = 0; i < NB_ENTRIES; i++) {
+            System.out.println("i = " + i);
+            Assert.assertTrue(iterator.hasNext());
+            final PaymentTransactionModelDao nextEntry = iterator.next();
+            Assert.assertEquals(nextEntry.getTransactionStatus(), TransactionStatus.UNKNOWN);
+        }
+    }
+
+        @Test(groups = "slow")
     public void testPaymentAttemptsByStateAcrossTenants() {
 
         final UUID paymentMethodId = UUID.randomUUID();
@@ -519,8 +560,8 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         paymentDao.insertPaymentAttemptWithProperties(attempt2, context2);
 
 
-        final List<PaymentAttemptModelDao> result = paymentDao.getPaymentAttemptsByStateAcrossTenants(stateName, createdBeforeDate);
-        Assert.assertEquals(result.size(), 2);
+        final Pagination<PaymentAttemptModelDao> result = paymentDao.getPaymentAttemptsByStateAcrossTenants(stateName, createdBeforeDate, 0L, 2L);
+        Assert.assertEquals(result.getTotalNbRecords().longValue(), 2L);
     }
 
 
diff --git a/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java b/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java
index 48c23f1..1f9a9fd 100644
--- a/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java
+++ b/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java
@@ -18,6 +18,8 @@ package org.killbill.billing.util.entity.dao;
 
 import java.util.Iterator;
 
+import javax.annotation.Nullable;
+
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.util.entity.DefaultPagination;
 import org.killbill.billing.util.entity.Entity;
@@ -35,7 +37,7 @@ public class DefaultPaginationSqlDaoHelper {
                                                                                                                      final PaginationIteratorBuilder<M, E, S> paginationIteratorBuilder,
                                                                                                                      final Long offset,
                                                                                                                      final Long limit,
-                                                                                                                     final InternalTenantContext context) {
+                                                                                                                     @Nullable final InternalTenantContext context) {
         // Note: the connection will be busy as we stream the results out: hence we cannot use
         // SQL_CALC_FOUND_ROWS / FOUND_ROWS on the actual query.
         // We still need to know the actual number of results, mainly for the UI so that it knows if it needs to fetch
@@ -51,7 +53,7 @@ public class DefaultPaginationSqlDaoHelper {
         // We usually always want to wrap our queries in an EntitySqlDaoTransactionWrapper... except here.
         // Since we want to stream the results out, we don't want to auto-commit when this method returns.
         final EntitySqlDao<M, E> sqlDao = transactionalSqlDao.onDemandForStreamingResults(sqlDaoClazz);
-        final Long totalCount = sqlDao.getCount(context);
+        final Long totalCount = context !=  null ? sqlDao.getCount(context) : null;
         final Iterator<M> results = paginationIteratorBuilder.build((S) sqlDao, limit, context);
 
         return new DefaultPagination<M>(offset, limit, count, totalCount, results);