killbill-memoizeit

subscription: implement bundles pagination and search APIs Signed-off-by:

1/31/2014 2:38:02 PM

Details

diff --git a/api/src/main/java/com/ning/billing/subscription/api/SubscriptionBaseInternalApi.java b/api/src/main/java/com/ning/billing/subscription/api/SubscriptionBaseInternalApi.java
index 9662d65..8c2ce8f 100644
--- a/api/src/main/java/com/ning/billing/subscription/api/SubscriptionBaseInternalApi.java
+++ b/api/src/main/java/com/ning/billing/subscription/api/SubscriptionBaseInternalApi.java
@@ -24,21 +24,20 @@ import javax.annotation.Nullable;
 
 import org.joda.time.DateTime;
 
+import com.ning.billing.callcontext.InternalCallContext;
+import com.ning.billing.callcontext.InternalTenantContext;
 import com.ning.billing.catalog.api.PlanPhaseSpecifier;
 import com.ning.billing.entitlement.api.EntitlementAOStatusDryRun;
+import com.ning.billing.events.EffectiveSubscriptionInternalEvent;
 import com.ning.billing.subscription.api.user.SubscriptionBaseApiException;
 import com.ning.billing.subscription.api.user.SubscriptionBaseBundle;
-import com.ning.billing.callcontext.InternalCallContext;
-import com.ning.billing.callcontext.InternalTenantContext;
-import com.ning.billing.events.EffectiveSubscriptionInternalEvent;
-
+import com.ning.billing.util.entity.Pagination;
 
 public interface SubscriptionBaseInternalApi {
 
     public SubscriptionBase createSubscription(final UUID bundleId, final PlanPhaseSpecifier spec, final DateTime requestedDateWithMs,
                                                final InternalCallContext context) throws SubscriptionBaseApiException;
 
-
     public SubscriptionBaseBundle createBundleForAccount(final UUID accountId, final String bundleName, final InternalCallContext context)
             throws SubscriptionBaseApiException;
 
@@ -49,6 +48,10 @@ public interface SubscriptionBaseInternalApi {
 
     public List<SubscriptionBaseBundle> getBundlesForKey(final String bundleKey, final InternalTenantContext context);
 
+    public Pagination<SubscriptionBaseBundle> getBundles(final Long offset, final Long limit, final InternalTenantContext context);
+
+    public Pagination<SubscriptionBaseBundle> searchBundles(final String searchKey, final Long offset, final Long limit, final InternalTenantContext context);
+
     public Iterable<UUID> getNonAOSubscriptionIdsForKey(final String bundleKey, final InternalTenantContext context);
 
     public List<SubscriptionBase> getSubscriptionsForBundle(final UUID bundleId, final InternalTenantContext context);
@@ -74,6 +77,5 @@ public interface SubscriptionBaseInternalApi {
     public List<EntitlementAOStatusDryRun> getDryRunChangePlanStatus(final UUID subscriptionId, @Nullable final String baseProductName,
                                                                      final DateTime requestedDate, final InternalTenantContext context) throws SubscriptionBaseApiException;
 
-
     public void updateExternalKey(final UUID bundleId, final String newExternalKey, final InternalCallContext context);
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultSubscriptionApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultSubscriptionApi.java
index 0c18efd..cc42d68 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultSubscriptionApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultSubscriptionApi.java
@@ -27,6 +27,8 @@ import java.util.UUID;
 import javax.inject.Inject;
 
 import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.ning.billing.ErrorCode;
 import com.ning.billing.ObjectType;
@@ -34,8 +36,6 @@ import com.ning.billing.callcontext.InternalCallContext;
 import com.ning.billing.callcontext.InternalTenantContext;
 import com.ning.billing.entitlement.AccountEntitlements;
 import com.ning.billing.entitlement.EntitlementInternalApi;
-import com.ning.billing.entitlement.EventsStream;
-import com.ning.billing.entitlement.api.svcs.DefaultEntitlementInternalApi;
 import com.ning.billing.entitlement.engine.core.EntitlementUtils;
 import com.ning.billing.subscription.api.SubscriptionBase;
 import com.ning.billing.subscription.api.SubscriptionBaseInternalApi;
@@ -48,15 +48,22 @@ import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.TenantContext;
 import com.ning.billing.util.customfield.ShouldntHappenException;
 import com.ning.billing.util.dao.NonEntityDao;
+import com.ning.billing.util.entity.Pagination;
+import com.ning.billing.util.entity.dao.DefaultPaginationHelper.SourcePaginationBuilder;
 
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 
+import static com.ning.billing.util.entity.dao.DefaultPaginationHelper.getEntityPaginationNoException;
+
 public class DefaultSubscriptionApi implements SubscriptionApi {
 
+    private static final Logger log = LoggerFactory.getLogger(DefaultSubscriptionApi.class);
+
     private static final Comparator<SubscriptionBundle> SUBSCRIPTION_BUNDLE_COMPARATOR = new Comparator<SubscriptionBundle>() {
         @Override
         public int compare(final SubscriptionBundle o1, final SubscriptionBundle o2) {
@@ -164,7 +171,7 @@ public class DefaultSubscriptionApi implements SubscriptionApi {
             }
             final SubscriptionBase subscriptionBase = subscriptionBaseInternalApi.getSubscriptionFromId(activeSubscriptionIdForKey, internalContext);
             return getSubscriptionBundle(subscriptionBase.getBundleId(), context);
-        } catch (SubscriptionBaseApiException e) {
+        } catch (final SubscriptionBaseApiException e) {
             throw new SubscriptionApiException(e);
         }
     }
@@ -188,12 +195,60 @@ public class DefaultSubscriptionApi implements SubscriptionApi {
         return getSubscriptionBundlesForAccount(accountId, context);
     }
 
+    @Override
+    public Pagination<SubscriptionBundle> getSubscriptionBundles(final Long offset, final Long limit, final TenantContext context) {
+        final InternalTenantContext internalContext = internalCallContextFactory.createInternalTenantContext(context);
+        return getEntityPaginationNoException(limit,
+                                              new SourcePaginationBuilder<SubscriptionBaseBundle, SubscriptionApiException>() {
+                                                  @Override
+                                                  public Pagination<SubscriptionBaseBundle> build() {
+                                                      return subscriptionBaseInternalApi.getBundles(offset, limit, internalContext);
+                                                  }
+                                              },
+                                              new Function<SubscriptionBaseBundle, SubscriptionBundle>() {
+                                                  @Override
+                                                  public SubscriptionBundle apply(final SubscriptionBaseBundle subscriptionBaseBundle) {
+                                                      try {
+                                                          return getSubscriptionBundle(subscriptionBaseBundle.getId(), context);
+                                                      } catch (final SubscriptionApiException e) {
+                                                          log.warn("Error retrieving subscription", e);
+                                                          return null;
+                                                      }
+                                                  }
+                                              }
+                                             );
+    }
+
+    @Override
+    public Pagination<SubscriptionBundle> searchSubscriptionBundles(final String searchKey, final Long offset, final Long limit, final TenantContext context) {
+        final InternalTenantContext internalContext = internalCallContextFactory.createInternalTenantContext(context);
+        return getEntityPaginationNoException(limit,
+                                              new SourcePaginationBuilder<SubscriptionBaseBundle, SubscriptionApiException>() {
+                                                  @Override
+                                                  public Pagination<SubscriptionBaseBundle> build() {
+                                                      return subscriptionBaseInternalApi.searchBundles(searchKey, offset, limit, internalContext);
+                                                  }
+                                              },
+                                              new Function<SubscriptionBaseBundle, SubscriptionBundle>() {
+                                                  @Override
+                                                  public SubscriptionBundle apply(final SubscriptionBaseBundle subscriptionBaseBundle) {
+                                                      try {
+                                                          return getSubscriptionBundle(subscriptionBaseBundle.getId(), context);
+                                                      } catch (final SubscriptionApiException e) {
+                                                          log.warn("Error retrieving subscription", e);
+                                                          return null;
+                                                      }
+                                                  }
+                                              }
+                                             );
+    }
+
     private List<SubscriptionBundle> getSubscriptionBundlesForAccount(final UUID accountId, final TenantContext tenantContext) throws SubscriptionApiException {
         // Retrieve entitlements
         final AccountEntitlements accountEntitlements;
         try {
             accountEntitlements = entitlementInternalApi.getAllEntitlementsForAccountId(accountId, tenantContext);
-        } catch (EntitlementApiException e) {
+        } catch (final EntitlementApiException e) {
             throw new SubscriptionApiException(e);
         }
 
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/BundleResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/BundleResource.java
index ef4a230..413559b 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/BundleResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/BundleResource.java
@@ -16,8 +16,12 @@
 
 package com.ning.billing.jaxrs.resources;
 
+import java.net.URI;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
@@ -57,9 +61,13 @@ import com.ning.billing.util.api.CustomFieldUserApi;
 import com.ning.billing.util.api.TagApiException;
 import com.ning.billing.util.api.TagDefinitionApiException;
 import com.ning.billing.util.api.TagUserApi;
+import com.ning.billing.util.audit.AccountAuditLogs;
 import com.ning.billing.util.callcontext.CallContext;
 import com.ning.billing.util.callcontext.TenantContext;
+import com.ning.billing.util.entity.Pagination;
 
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 
 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
@@ -107,6 +115,58 @@ public class BundleResource extends JaxRsResourceBase {
         return Response.status(Status.OK).entity(json).build();
     }
 
+    @GET
+    @Path("/" + PAGINATION)
+    @Produces(APPLICATION_JSON)
+    public Response getBundles(@QueryParam(QUERY_SEARCH_OFFSET) @DefaultValue("0") final Long offset,
+                               @QueryParam(QUERY_SEARCH_LIMIT) @DefaultValue("100") final Long limit,
+                               @QueryParam(QUERY_AUDIT) @DefaultValue("NONE") final AuditMode auditMode,
+                               @javax.ws.rs.core.Context final HttpServletRequest request) throws SubscriptionApiException {
+        final TenantContext tenantContext = context.createContext(request);
+        final Pagination<SubscriptionBundle> bundles = subscriptionApi.getSubscriptionBundles(offset, limit, tenantContext);
+        final URI nextPageUri = uriBuilder.nextPage(BundleResource.class, "getBundles", bundles.getNextOffset(), limit, ImmutableMap.<String, String>of(QUERY_AUDIT, auditMode.getLevel().toString()));
+        final AtomicReference<Map<UUID, AccountAuditLogs>> accountsAuditLogs = new AtomicReference<Map<UUID, AccountAuditLogs>>(new HashMap<UUID, AccountAuditLogs>());
+        return buildStreamingPaginationResponse(bundles,
+                                                new Function<SubscriptionBundle, BundleJson>() {
+                                                    @Override
+                                                    public BundleJson apply(final SubscriptionBundle bundle) {
+                                                        // Cache audit logs per account
+                                                        if (accountsAuditLogs.get().get(bundle.getAccountId()) == null) {
+                                                            accountsAuditLogs.get().put(bundle.getAccountId(), auditUserApi.getAccountAuditLogs(bundle.getAccountId(), auditMode.getLevel(), tenantContext));
+                                                        }
+                                                        return new BundleJson(bundle, accountsAuditLogs.get().get(bundle.getAccountId()));
+                                                    }
+                                                },
+                                                nextPageUri);
+    }
+
+    @GET
+    @Path("/" + SEARCH + "/{searchKey:" + ANYTHING_PATTERN + "}")
+    @Produces(APPLICATION_JSON)
+    public Response searchBundles(@PathParam("searchKey") final String searchKey,
+                                  @QueryParam(QUERY_SEARCH_OFFSET) @DefaultValue("0") final Long offset,
+                                  @QueryParam(QUERY_SEARCH_LIMIT) @DefaultValue("100") final Long limit,
+                                  @QueryParam(QUERY_AUDIT) @DefaultValue("NONE") final AuditMode auditMode,
+                                  @javax.ws.rs.core.Context final HttpServletRequest request) throws SubscriptionApiException {
+        final TenantContext tenantContext = context.createContext(request);
+        final Pagination<SubscriptionBundle> bundles = subscriptionApi.searchSubscriptionBundles(searchKey, offset, limit, tenantContext);
+        final URI nextPageUri = uriBuilder.nextPage(BundleResource.class, "searchBundles", bundles.getNextOffset(), limit, ImmutableMap.<String, String>of("searchKey", searchKey,
+                                                                                                                                                           QUERY_AUDIT, auditMode.getLevel().toString()));
+        final AtomicReference<Map<UUID, AccountAuditLogs>> accountsAuditLogs = new AtomicReference<Map<UUID, AccountAuditLogs>>(new HashMap<UUID, AccountAuditLogs>());
+        return buildStreamingPaginationResponse(bundles,
+                                                new Function<SubscriptionBundle, BundleJson>() {
+                                                    @Override
+                                                    public BundleJson apply(final SubscriptionBundle bundle) {
+                                                        // Cache audit logs per account
+                                                        if (accountsAuditLogs.get().get(bundle.getAccountId()) == null) {
+                                                            accountsAuditLogs.get().put(bundle.getAccountId(), auditUserApi.getAccountAuditLogs(bundle.getAccountId(), auditMode.getLevel(), tenantContext));
+                                                        }
+                                                        return new BundleJson(bundle, accountsAuditLogs.get().get(bundle.getAccountId()));
+                                                    }
+                                                },
+                                                nextPageUri);
+    }
+
     @PUT
     @Path("/{bundleId:" + UUID_PATTERN + "}/" + PAUSE)
     @Consumes(APPLICATION_JSON)
diff --git a/server/src/test/java/com/ning/billing/jaxrs/TestBundle.java b/server/src/test/java/com/ning/billing/jaxrs/TestBundle.java
index 5f90877..77da877 100644
--- a/server/src/test/java/com/ning/billing/jaxrs/TestBundle.java
+++ b/server/src/test/java/com/ning/billing/jaxrs/TestBundle.java
@@ -27,6 +27,7 @@ import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.client.model.Account;
 import com.ning.billing.client.model.Bundle;
+import com.ning.billing.client.model.Bundles;
 import com.ning.billing.client.model.Subscription;
 
 import static org.testng.Assert.assertEquals;
@@ -99,4 +100,31 @@ public class TestBundle extends TestJaxrsBase {
         assertEquals(newBundle.getExternalKey(), originalBundle.getExternalKey());
         assertEquals(newBundle.getAccountId(), newAccount.getAccountId());
     }
+
+    @Test(groups = "slow", description = "Can paginate and search through all bundles")
+    public void testBundlesPagination() throws Exception {
+        final Account accountJson = createAccount();
+
+        for (int i = 0; i < 5; i++) {
+            createEntitlement(accountJson.getAccountId(), UUID.randomUUID().toString(), "Shotgun", ProductCategory.BASE, BillingPeriod.MONTHLY, true);
+        }
+
+        final Bundles allBundles = killBillClient.getBundles();
+        Assert.assertEquals(allBundles.size(), 5);
+
+        for (final Bundle bundle : allBundles) {
+            Assert.assertEquals(killBillClient.searchBundles(bundle.getBundleId().toString()).size(), 1);
+            Assert.assertEquals(killBillClient.searchBundles(bundle.getAccountId().toString()).size(), 5);
+            Assert.assertEquals(killBillClient.searchBundles(bundle.getExternalKey()).size(), 1);
+        }
+
+        Bundles page = killBillClient.getBundles(0L, 1L);
+        for (int i = 0; i < 5; i++) {
+            Assert.assertNotNull(page);
+            Assert.assertEquals(page.size(), 1);
+            Assert.assertEquals(page.get(0), allBundles.get(i));
+            page = page.getNext();
+        }
+        Assert.assertNull(page);
+    }
 }
diff --git a/subscription/src/main/java/com/ning/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java b/subscription/src/main/java/com/ning/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java
index b19d8ed..36d6d61 100644
--- a/subscription/src/main/java/com/ning/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java
+++ b/subscription/src/main/java/com/ning/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java
@@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory;
 
 import com.ning.billing.ErrorCode;
 import com.ning.billing.ObjectType;
+import com.ning.billing.callcontext.InternalCallContext;
+import com.ning.billing.callcontext.InternalTenantContext;
 import com.ning.billing.catalog.api.Catalog;
 import com.ning.billing.catalog.api.CatalogApiException;
 import com.ning.billing.catalog.api.CatalogService;
@@ -43,8 +45,10 @@ import com.ning.billing.clock.DefaultClock;
 import com.ning.billing.entitlement.api.Entitlement.EntitlementState;
 import com.ning.billing.entitlement.api.EntitlementAOStatusDryRun;
 import com.ning.billing.entitlement.api.EntitlementAOStatusDryRun.DryRunChangeReason;
+import com.ning.billing.events.EffectiveSubscriptionInternalEvent;
 import com.ning.billing.subscription.api.SubscriptionApiBase;
 import com.ning.billing.subscription.api.SubscriptionBase;
+import com.ning.billing.subscription.api.SubscriptionBaseInternalApi;
 import com.ning.billing.subscription.api.user.DefaultEffectiveSubscriptionEvent;
 import com.ning.billing.subscription.api.user.DefaultSubscriptionBase;
 import com.ning.billing.subscription.api.user.DefaultSubscriptionBaseApiService;
@@ -57,18 +61,19 @@ import com.ning.billing.subscription.api.user.SubscriptionBaseTransitionData;
 import com.ning.billing.subscription.api.user.SubscriptionBuilder;
 import com.ning.billing.subscription.engine.addon.AddonUtils;
 import com.ning.billing.subscription.engine.dao.SubscriptionDao;
+import com.ning.billing.subscription.engine.dao.model.SubscriptionBundleModelDao;
 import com.ning.billing.subscription.exceptions.SubscriptionBaseError;
-import com.ning.billing.callcontext.InternalCallContext;
-import com.ning.billing.callcontext.InternalTenantContext;
-import com.ning.billing.events.EffectiveSubscriptionInternalEvent;
-import com.ning.billing.subscription.api.SubscriptionBaseInternalApi;
 import com.ning.billing.util.dao.NonEntityDao;
+import com.ning.billing.util.entity.Pagination;
+import com.ning.billing.util.entity.dao.DefaultPaginationHelper.SourcePaginationBuilder;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 
+import static com.ning.billing.util.entity.dao.DefaultPaginationHelper.getEntityPaginationNoException;
+
 public class DefaultSubscriptionInternalApi extends SubscriptionApiBase implements SubscriptionBaseInternalApi {
 
     private final Logger log = LoggerFactory.getLogger(DefaultSubscriptionInternalApi.class);
@@ -187,11 +192,48 @@ public class DefaultSubscriptionInternalApi extends SubscriptionApiBase implemen
     }
 
     @Override
+    public Pagination<SubscriptionBaseBundle> getBundles(final Long offset, final Long limit, final InternalTenantContext context) {
+        return getEntityPaginationNoException(limit,
+                                              new SourcePaginationBuilder<SubscriptionBundleModelDao, SubscriptionBaseApiException>() {
+                                                  @Override
+                                                  public Pagination<SubscriptionBundleModelDao> build() {
+                                                      return dao.get(offset, limit, context);
+                                                  }
+                                              },
+                                              new Function<SubscriptionBundleModelDao, SubscriptionBaseBundle>() {
+                                                  @Override
+                                                  public SubscriptionBaseBundle apply(final SubscriptionBundleModelDao bundleModelDao) {
+                                                      return SubscriptionBundleModelDao.toSubscriptionbundle(bundleModelDao);
+                                                  }
+                                              }
+                                             );
+    }
+
+    @Override
+    public Pagination<SubscriptionBaseBundle> searchBundles(final String searchKey, final Long offset, final Long limit, final InternalTenantContext context) {
+        return getEntityPaginationNoException(limit,
+                                              new SourcePaginationBuilder<SubscriptionBundleModelDao, SubscriptionBaseApiException>() {
+                                                  @Override
+                                                  public Pagination<SubscriptionBundleModelDao> build() {
+                                                      return dao.searchSubscriptionBundles(searchKey, offset, limit, context);
+                                                  }
+                                              },
+                                              new Function<SubscriptionBundleModelDao, SubscriptionBaseBundle>() {
+                                                  @Override
+                                                  public SubscriptionBaseBundle apply(final SubscriptionBundleModelDao bundleModelDao) {
+                                                      return SubscriptionBundleModelDao.toSubscriptionbundle(bundleModelDao);
+                                                  }
+                                              }
+                                             );
+
+    }
+
+    @Override
     public Iterable<UUID> getNonAOSubscriptionIdsForKey(final String bundleKey, final InternalTenantContext context) {
         return dao.getNonAOSubscriptionIdsForKey(bundleKey, context);
     }
 
-    public static SubscriptionBaseBundle getActiveBundleForKeyNotException(final List<SubscriptionBaseBundle> existingBundles, final SubscriptionDao dao, final Clock clock, final InternalTenantContext context)  {
+    public static SubscriptionBaseBundle getActiveBundleForKeyNotException(final List<SubscriptionBaseBundle> existingBundles, final SubscriptionDao dao, final Clock clock, final InternalTenantContext context) {
         for (SubscriptionBaseBundle cur : existingBundles) {
             final List<SubscriptionBase> subscriptions = dao.getSubscriptions(cur.getId(), context);
             for (SubscriptionBase s : subscriptions) {
@@ -330,10 +372,10 @@ public class DefaultSubscriptionInternalApi extends SubscriptionApiBase implemen
                 reason = DryRunChangeReason.AO_NOT_AVAILABLE_IN_NEW_PLAN;
             }
             final EntitlementAOStatusDryRun status = new DefaultSubscriptionStatusDryRun(cur.getId(),
-                                                                                        cur.getCurrentPlan().getProduct().getName(),
-                                                                                        cur.getCurrentPhase().getPhaseType(),
-                                                                                        cur.getCurrentPlan().getBillingPeriod(),
-                                                                                        cur.getCurrentPriceList().getName(), reason);
+                                                                                         cur.getCurrentPlan().getProduct().getName(),
+                                                                                         cur.getCurrentPhase().getPhaseType(),
+                                                                                         cur.getCurrentPlan().getBillingPeriod(),
+                                                                                         cur.getCurrentPriceList().getName(), reason);
             result.add(status);
         }
         return result;
diff --git a/subscription/src/main/java/com/ning/billing/subscription/engine/dao/BundleSqlDao.java b/subscription/src/main/java/com/ning/billing/subscription/engine/dao/BundleSqlDao.java
index 5fce8b0..7746e81 100644
--- a/subscription/src/main/java/com/ning/billing/subscription/engine/dao/BundleSqlDao.java
+++ b/subscription/src/main/java/com/ning/billing/subscription/engine/dao/BundleSqlDao.java
@@ -17,18 +17,20 @@
 package com.ning.billing.subscription.engine.dao;
 
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 
 import org.skife.jdbi.v2.sqlobject.Bind;
 import org.skife.jdbi.v2.sqlobject.BindBean;
 import org.skife.jdbi.v2.sqlobject.SqlQuery;
 import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.FetchSize;
 
-import com.ning.billing.subscription.engine.dao.model.SubscriptionBundleModelDao;
-import com.ning.billing.subscription.api.user.SubscriptionBaseBundle;
-import com.ning.billing.util.audit.ChangeType;
 import com.ning.billing.callcontext.InternalCallContext;
 import com.ning.billing.callcontext.InternalTenantContext;
+import com.ning.billing.subscription.api.user.SubscriptionBaseBundle;
+import com.ning.billing.subscription.engine.dao.model.SubscriptionBundleModelDao;
+import com.ning.billing.util.audit.ChangeType;
 import com.ning.billing.util.entity.dao.Audited;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoStringTemplate;
@@ -60,4 +62,13 @@ public interface BundleSqlDao extends EntitySqlDao<SubscriptionBundleModelDao, S
     @SqlQuery
     public List<SubscriptionBundleModelDao> getBundlesForKey(@Bind("externalKey") String externalKey,
                                                              @BindBean final InternalTenantContext context);
+
+    @SqlQuery
+    // Magic value to force MySQL to stream from the database
+    // See http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html (ResultSet)
+    @FetchSize(Integer.MIN_VALUE)
+    public Iterator<SubscriptionBundleModelDao> searchBundles(@Bind("searchKey") final String searchKey,
+                                                              @Bind("offset") final Long offset,
+                                                              @Bind("rowCount") final Long rowCount,
+                                                              @BindBean final InternalTenantContext context);
 }
diff --git a/subscription/src/main/java/com/ning/billing/subscription/engine/dao/DefaultSubscriptionDao.java b/subscription/src/main/java/com/ning/billing/subscription/engine/dao/DefaultSubscriptionDao.java
index a183a12..3c876f8 100644
--- a/subscription/src/main/java/com/ning/billing/subscription/engine/dao/DefaultSubscriptionDao.java
+++ b/subscription/src/main/java/com/ning/billing/subscription/engine/dao/DefaultSubscriptionDao.java
@@ -46,6 +46,7 @@ import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.clock.Clock;
+import com.ning.billing.entitlement.api.SubscriptionApiException;
 import com.ning.billing.entity.EntityPersistenceException;
 import com.ning.billing.events.EffectiveSubscriptionInternalEvent;
 import com.ning.billing.events.RepairSubscriptionInternalEvent;
@@ -85,6 +86,9 @@ import com.ning.billing.subscription.events.user.ApiEventType;
 import com.ning.billing.subscription.exceptions.SubscriptionBaseError;
 import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.dao.NonEntityDao;
+import com.ning.billing.util.entity.Pagination;
+import com.ning.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.PaginationIteratorBuilder;
+import com.ning.billing.util.entity.dao.EntityDaoBase;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
@@ -96,15 +100,13 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 
-public class DefaultSubscriptionDao implements SubscriptionDao {
+public class DefaultSubscriptionDao extends EntityDaoBase<SubscriptionBundleModelDao, SubscriptionBaseBundle, SubscriptionApiException> implements SubscriptionDao {
 
     private static final Logger log = LoggerFactory.getLogger(DefaultSubscriptionDao.class);
 
     private final Clock clock;
-    private final EntitySqlDaoTransactionalJdbiWrapper transactionalSqlDao;
     private final NotificationQueueService notificationQueueService;
     private final AddonUtils addonUtils;
     private final PersistentBus eventBus;
@@ -114,8 +116,8 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
     public DefaultSubscriptionDao(final IDBI dbi, final Clock clock, final AddonUtils addonUtils,
                                   final NotificationQueueService notificationQueueService, final PersistentBus eventBus, final CatalogService catalogService,
                                   final CacheControllerDispatcher cacheControllerDispatcher, final NonEntityDao nonEntityDao) {
+        super(new EntitySqlDaoTransactionalJdbiWrapper(dbi, clock, cacheControllerDispatcher, nonEntityDao), BundleSqlDao.class);
         this.clock = clock;
-        this.transactionalSqlDao = new EntitySqlDaoTransactionalJdbiWrapper(dbi, clock, cacheControllerDispatcher, nonEntityDao);
         this.notificationQueueService = notificationQueueService;
         this.addonUtils = addonUtils;
         this.eventBus = eventBus;
@@ -123,6 +125,11 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
     }
 
     @Override
+    protected SubscriptionApiException generateAlreadyExistsException(final SubscriptionBundleModelDao entity, final InternalCallContext context) {
+        return new SubscriptionApiException(ErrorCode.SUB_CREATE_ACTIVE_BUNDLE_KEY_EXISTS, entity.getExternalKey());
+    }
+
+    @Override
     public List<SubscriptionBaseBundle> getSubscriptionBundlesForAccountAndKey(final UUID accountId, final String bundleKey, final InternalTenantContext context) {
         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<SubscriptionBaseBundle>>() {
             @Override
@@ -183,6 +190,20 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
     }
 
     @Override
+    public Pagination<SubscriptionBundleModelDao> searchSubscriptionBundles(final String searchKey, final Long offset, final Long limit, final InternalTenantContext context) {
+        return paginationHelper.getPagination(BundleSqlDao.class,
+                                              new PaginationIteratorBuilder<SubscriptionBundleModelDao, SubscriptionBaseBundle, BundleSqlDao>() {
+                                                  @Override
+                                                  public Iterator<SubscriptionBundleModelDao> build(final BundleSqlDao bundleSqlDao, final Long limit) {
+                                                      return bundleSqlDao.searchBundles(searchKey, offset, limit, context);
+                                                  }
+                                              },
+                                              offset,
+                                              limit,
+                                              context);
+    }
+
+    @Override
     public Iterable<UUID> getNonAOSubscriptionIdsForKey(final String bundleKey, final InternalTenantContext context) {
         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Iterable<UUID>>() {
             @Override
diff --git a/subscription/src/main/java/com/ning/billing/subscription/engine/dao/RepairSubscriptionDao.java b/subscription/src/main/java/com/ning/billing/subscription/engine/dao/RepairSubscriptionDao.java
index 26f50cd..bffc74c 100644
--- a/subscription/src/main/java/com/ning/billing/subscription/engine/dao/RepairSubscriptionDao.java
+++ b/subscription/src/main/java/com/ning/billing/subscription/engine/dao/RepairSubscriptionDao.java
@@ -27,6 +27,16 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
 
+import javax.inject.Inject;
+
+import org.skife.jdbi.v2.IDBI;
+
+import com.ning.billing.ErrorCode;
+import com.ning.billing.callcontext.InternalCallContext;
+import com.ning.billing.callcontext.InternalTenantContext;
+import com.ning.billing.clock.Clock;
+import com.ning.billing.entitlement.api.SubscriptionApiException;
+import com.ning.billing.subscription.api.SubscriptionBase;
 import com.ning.billing.subscription.api.migration.AccountMigrationData;
 import com.ning.billing.subscription.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.subscription.api.timeline.RepairSubscriptionLifecycleDao;
@@ -34,22 +44,35 @@ import com.ning.billing.subscription.api.timeline.SubscriptionDataRepair;
 import com.ning.billing.subscription.api.transfer.TransferCancelData;
 import com.ning.billing.subscription.api.user.DefaultSubscriptionBase;
 import com.ning.billing.subscription.api.user.DefaultSubscriptionBaseBundle;
+import com.ning.billing.subscription.api.user.SubscriptionBaseBundle;
+import com.ning.billing.subscription.engine.dao.model.SubscriptionBundleModelDao;
 import com.ning.billing.subscription.events.SubscriptionBaseEvent;
 import com.ning.billing.subscription.exceptions.SubscriptionBaseError;
-import com.ning.billing.subscription.api.SubscriptionBase;
-import com.ning.billing.subscription.api.user.SubscriptionBaseBundle;
-import com.ning.billing.callcontext.InternalCallContext;
-import com.ning.billing.callcontext.InternalTenantContext;
+import com.ning.billing.util.cache.CacheControllerDispatcher;
+import com.ning.billing.util.dao.NonEntityDao;
+import com.ning.billing.util.entity.Pagination;
+import com.ning.billing.util.entity.dao.EntityDaoBase;
+import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
 
-public class RepairSubscriptionDao implements SubscriptionDao, RepairSubscriptionLifecycleDao {
+public class RepairSubscriptionDao extends EntityDaoBase<SubscriptionBundleModelDao, SubscriptionBaseBundle, SubscriptionApiException> implements SubscriptionDao, RepairSubscriptionLifecycleDao {
 
     private static final String NOT_IMPLEMENTED = "Not implemented";
 
     private final ThreadLocal<Map<UUID, SubscriptionRepairEvent>> preThreadsInRepairSubscriptions = new ThreadLocal<Map<UUID, SubscriptionRepairEvent>>();
 
+    @Inject
+    public RepairSubscriptionDao(final IDBI dbi, final Clock clock, final CacheControllerDispatcher cacheControllerDispatcher, final NonEntityDao nonEntityDao) {
+        super(new EntitySqlDaoTransactionalJdbiWrapper(dbi, clock, cacheControllerDispatcher, nonEntityDao), BundleSqlDao.class);
+    }
+
+    @Override
+    protected SubscriptionApiException generateAlreadyExistsException(final SubscriptionBundleModelDao entity, final InternalCallContext context) {
+        return new SubscriptionApiException(ErrorCode.SUB_CREATE_ACTIVE_BUNDLE_KEY_EXISTS, entity.getExternalKey());
+    }
+
     private static final class SubscriptionEventWithOrderingId {
 
         private final SubscriptionBaseEvent event;
@@ -311,6 +334,11 @@ public class RepairSubscriptionDao implements SubscriptionDao, RepairSubscriptio
     }
 
     @Override
+    public Pagination<SubscriptionBundleModelDao> searchSubscriptionBundles(final String searchKey, final Long offset, final Long limit, final InternalTenantContext context) {
+        throw new SubscriptionBaseError(NOT_IMPLEMENTED);
+    }
+
+    @Override
     public List<UUID> getNonAOSubscriptionIdsForKey(final String bundleKey, final InternalTenantContext context) {
         throw new SubscriptionBaseError(NOT_IMPLEMENTED);
     }
diff --git a/subscription/src/main/java/com/ning/billing/subscription/engine/dao/SubscriptionDao.java b/subscription/src/main/java/com/ning/billing/subscription/engine/dao/SubscriptionDao.java
index 3caa123..165a868 100644
--- a/subscription/src/main/java/com/ning/billing/subscription/engine/dao/SubscriptionDao.java
+++ b/subscription/src/main/java/com/ning/billing/subscription/engine/dao/SubscriptionDao.java
@@ -20,6 +20,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import com.ning.billing.callcontext.InternalCallContext;
+import com.ning.billing.callcontext.InternalTenantContext;
+import com.ning.billing.entitlement.api.SubscriptionApiException;
+import com.ning.billing.subscription.api.SubscriptionBase;
 import com.ning.billing.subscription.api.migration.AccountMigrationData;
 import com.ning.billing.subscription.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.subscription.api.timeline.SubscriptionDataRepair;
@@ -27,19 +31,21 @@ import com.ning.billing.subscription.api.transfer.TransferCancelData;
 import com.ning.billing.subscription.api.user.DefaultSubscriptionBase;
 import com.ning.billing.subscription.api.user.DefaultSubscriptionBaseBundle;
 import com.ning.billing.subscription.api.user.SubscriptionBaseBundle;
+import com.ning.billing.subscription.engine.dao.model.SubscriptionBundleModelDao;
 import com.ning.billing.subscription.events.SubscriptionBaseEvent;
-import com.ning.billing.subscription.api.SubscriptionBase;
-import com.ning.billing.callcontext.InternalCallContext;
-import com.ning.billing.callcontext.InternalTenantContext;
+import com.ning.billing.util.entity.Pagination;
+import com.ning.billing.util.entity.dao.EntityDao;
 
-public interface SubscriptionDao {
+public interface SubscriptionDao extends EntityDao<SubscriptionBundleModelDao, SubscriptionBaseBundle, SubscriptionApiException> {
 
     // Bundle apis
     public List<SubscriptionBaseBundle> getSubscriptionBundleForAccount(UUID accountId, InternalTenantContext context);
 
     public List<SubscriptionBaseBundle> getSubscriptionBundlesForKey(String bundleKey, InternalTenantContext context);
 
-    public Iterable<UUID> getNonAOSubscriptionIdsForKey(final String bundleKey, final InternalTenantContext context);
+    public Pagination<SubscriptionBundleModelDao> searchSubscriptionBundles(String searchKey, Long offset, Long limit, InternalTenantContext context);
+
+    public Iterable<UUID> getNonAOSubscriptionIdsForKey(String bundleKey, InternalTenantContext context);
 
     public List<SubscriptionBaseBundle> getSubscriptionBundlesForAccountAndKey(UUID accountId, String bundleKey, InternalTenantContext context);
 
@@ -80,7 +86,7 @@ public interface SubscriptionDao {
 
     public void cancelSubscription(DefaultSubscriptionBase subscription, SubscriptionBaseEvent cancelEvent, InternalCallContext context, int cancelSeq);
 
-    public void cancelSubscriptions(final List<DefaultSubscriptionBase> subscriptions, final List<SubscriptionBaseEvent> cancelEvents, final InternalCallContext context);
+    public void cancelSubscriptions(List<DefaultSubscriptionBase> subscriptions, List<SubscriptionBaseEvent> cancelEvents, InternalCallContext context);
 
     public void uncancelSubscription(DefaultSubscriptionBase subscription, List<SubscriptionBaseEvent> uncancelEvents, InternalCallContext context);
 
@@ -90,7 +96,7 @@ public interface SubscriptionDao {
 
     public void transfer(UUID srcAccountId, UUID destAccountId, BundleMigrationData data, List<TransferCancelData> transferCancelData, InternalCallContext fromContext, InternalCallContext toContext);
 
-    public void updateBundleExternalKey(UUID bundleId, String externalKey, final InternalCallContext context);
+    public void updateBundleExternalKey(UUID bundleId, String externalKey, InternalCallContext context);
 
     // Repair
     public void repair(UUID accountId, UUID bundleId, List<SubscriptionDataRepair> inRepair, InternalCallContext context);
diff --git a/subscription/src/main/resources/com/ning/billing/subscription/engine/dao/BundleSqlDao.sql.stg b/subscription/src/main/resources/com/ning/billing/subscription/engine/dao/BundleSqlDao.sql.stg
index 7238aee..f5f9ade 100644
--- a/subscription/src/main/resources/com/ning/billing/subscription/engine/dao/BundleSqlDao.sql.stg
+++ b/subscription/src/main/resources/com/ning/billing/subscription/engine/dao/BundleSqlDao.sql.stg
@@ -77,3 +77,19 @@ account_id = :accountId
 <defaultOrderBy()>
 ;
 >>
+
+searchBundles() ::= <<
+select SQL_CALC_FOUND_ROWS
+<allTableFields("t.")>
+from <tableName()> t
+where 1 = 1
+and (
+     <idField("t.")> = :searchKey
+  or t.external_key = :searchKey
+  or t.account_id = :searchKey
+)
+<AND_CHECK_TENANT("t.")>
+order by <recordIdField("t.")> ASC
+limit :offset, :rowCount
+;
+>>
diff --git a/subscription/src/test/java/com/ning/billing/subscription/engine/dao/MockSubscriptionDaoMemory.java b/subscription/src/test/java/com/ning/billing/subscription/engine/dao/MockSubscriptionDaoMemory.java
index 79f9e47..3ffe068 100644
--- a/subscription/src/test/java/com/ning/billing/subscription/engine/dao/MockSubscriptionDaoMemory.java
+++ b/subscription/src/test/java/com/ning/billing/subscription/engine/dao/MockSubscriptionDaoMemory.java
@@ -36,6 +36,7 @@ import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.catalog.api.TimeUnit;
 import com.ning.billing.clock.Clock;
+import com.ning.billing.entitlement.api.SubscriptionApiException;
 import com.ning.billing.notificationq.api.NotificationEvent;
 import com.ning.billing.notificationq.api.NotificationQueue;
 import com.ning.billing.notificationq.api.NotificationQueueService;
@@ -52,16 +53,20 @@ import com.ning.billing.subscription.api.user.SubscriptionBaseBundle;
 import com.ning.billing.subscription.api.user.SubscriptionBuilder;
 import com.ning.billing.subscription.engine.core.DefaultSubscriptionBaseService;
 import com.ning.billing.subscription.engine.core.SubscriptionNotificationKey;
+import com.ning.billing.subscription.engine.dao.model.SubscriptionBundleModelDao;
 import com.ning.billing.subscription.events.SubscriptionBaseEvent;
 import com.ning.billing.subscription.events.SubscriptionBaseEvent.EventType;
 import com.ning.billing.subscription.events.user.ApiEvent;
 import com.ning.billing.subscription.events.user.ApiEventType;
+import com.ning.billing.util.entity.DefaultPagination;
+import com.ning.billing.util.entity.Pagination;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import com.ning.billing.util.entity.dao.MockEntityDaoBase;
 
 import com.google.inject.Inject;
 
-public class MockSubscriptionDaoMemory implements SubscriptionDao {
+public class MockSubscriptionDaoMemory extends MockEntityDaoBase<SubscriptionBundleModelDao, SubscriptionBaseBundle, SubscriptionApiException> implements SubscriptionDao {
 
     protected static final Logger log = LoggerFactory.getLogger(SubscriptionDao.class);
 
@@ -114,6 +119,20 @@ public class MockSubscriptionDaoMemory implements SubscriptionDao {
     }
 
     @Override
+    public Pagination<SubscriptionBundleModelDao> searchSubscriptionBundles(final String searchKey, final Long offset, final Long limit, final InternalTenantContext context) {
+        final List<SubscriptionBundleModelDao> results = new LinkedList<SubscriptionBundleModelDao>();
+        for (final SubscriptionBundleModelDao bundleModelDao : getAll(context)) {
+            if (bundleModelDao.getId().toString().equals(searchKey) ||
+                bundleModelDao.getExternalKey().equals(searchKey) ||
+                bundleModelDao.getAccountId().toString().equals(searchKey)) {
+                results.add(bundleModelDao);
+            }
+        }
+
+        return DefaultPagination.<SubscriptionBundleModelDao>build(offset, limit, results);
+    }
+
+    @Override
     public List<UUID> getNonAOSubscriptionIdsForKey(final String bundleKey, final InternalTenantContext context) {
         throw new UnsupportedOperationException();
     }