killbill-uncached
Changes
entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java 79(+71 -8)
entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/IBundleSqlDao.sql.stg 13(+13 -0)
Details
diff --git a/account/src/main/java/com/ning/billing/account/glue/AccountModule.java b/account/src/main/java/com/ning/billing/account/glue/AccountModule.java
index 24ca0a3..b4583ec 100644
--- a/account/src/main/java/com/ning/billing/account/glue/AccountModule.java
+++ b/account/src/main/java/com/ning/billing/account/glue/AccountModule.java
@@ -25,6 +25,7 @@ import com.ning.billing.account.dao.AccountDao;
import com.ning.billing.account.dao.FieldStoreDao;
import com.ning.billing.account.dao.IAccountDao;
import com.ning.billing.account.dao.IFieldStoreDao;
+
import org.skife.config.ConfigurationObjectFactory;
public class AccountModule extends AbstractModule {
@@ -51,6 +52,11 @@ public class AccountModule extends AbstractModule {
bind(IFieldStoreDao.class).to(FieldStoreDao.class).asEagerSingleton();
}
+ protected void installInjectorMagic() {
+ bind(InjectorMagic.class).asEagerSingleton();
+ }
+
+
@Override
protected void configure() {
installConfig();
@@ -58,6 +64,7 @@ public class AccountModule extends AbstractModule {
installAccountUserApi();
installAccountService();
installFieldStore();
+ installInjectorMagic();
}
}
diff --git a/analytics/src/test/java/com/ning/billing/analytics/MockIEntitlementUserApi.java b/analytics/src/test/java/com/ning/billing/analytics/MockIEntitlementUserApi.java
index 4ed2e8c..1aabb67 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/MockIEntitlementUserApi.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/MockIEntitlementUserApi.java
@@ -115,4 +115,9 @@ public class MockIEntitlementUserApi implements IEntitlementUserApi
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public List<ISubscription> getSubscriptionsForKey(String bundleKey) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/api/src/main/java/com/ning/billing/entitlement/api/test/IEntitlementTestApi.java b/api/src/main/java/com/ning/billing/entitlement/api/test/IEntitlementTestApi.java
index fbaa68e..0473cd9 100644
--- a/api/src/main/java/com/ning/billing/entitlement/api/test/IEntitlementTestApi.java
+++ b/api/src/main/java/com/ning/billing/entitlement/api/test/IEntitlementTestApi.java
@@ -16,8 +16,11 @@
package com.ning.billing.entitlement.api.test;
+import java.util.UUID;
+
+
public interface IEntitlementTestApi {
- public void doProcessReadyEvents();
+ public void doProcessReadyEvents(UUID [] subscriptionsIds, Boolean recursive, Boolean oneEventOnly);
}
diff --git a/api/src/main/java/com/ning/billing/entitlement/api/user/IEntitlementUserApi.java b/api/src/main/java/com/ning/billing/entitlement/api/user/IEntitlementUserApi.java
index f45f450..258e76d 100644
--- a/api/src/main/java/com/ning/billing/entitlement/api/user/IEntitlementUserApi.java
+++ b/api/src/main/java/com/ning/billing/entitlement/api/user/IEntitlementUserApi.java
@@ -35,6 +35,8 @@ public interface IEntitlementUserApi {
public List<ISubscription> getSubscriptionsForBundle(UUID bundleId);
+ public List<ISubscription> getSubscriptionsForKey(String bundleKey);
+
public ISubscriptionBundle createBundleForAccount(IAccount account, String bundleKey)
throws EntitlementUserApiException;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/alignment/PlanAligner.java b/entitlement/src/main/java/com/ning/billing/entitlement/alignment/PlanAligner.java
index 01ec2c0..5a4305f 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/alignment/PlanAligner.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/alignment/PlanAligner.java
@@ -16,6 +16,7 @@
package com.ning.billing.entitlement.alignment;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -152,6 +153,11 @@ public class PlanAligner implements IPlanAligner {
private List<TimedPhase> getPhaseAlignments(Subscription subscription, IPlan plan,
DateTime effectiveDate, DateTime planStartDate) {
+ // The plan can be null with the nasty endpoint from test API.
+ if (plan == null) {
+ return Collections.emptyList();
+ }
+
List<TimedPhase> result = new LinkedList<IPlanAligner.TimedPhase>();
DateTime curPhaseStart = planStartDate;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/test/EntitlementTestApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/test/EntitlementTestApi.java
index 61cee03..703281b 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/test/EntitlementTestApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/test/EntitlementTestApi.java
@@ -16,11 +16,15 @@
package com.ning.billing.entitlement.api.test;
+import java.util.List;
+import java.util.UUID;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.ning.billing.config.IEntitlementConfig;
+import com.ning.billing.entitlement.api.user.ISubscription;
import com.ning.billing.entitlement.engine.core.IApiEventProcessor;
public class EntitlementTestApi implements IEntitlementTestApi {
@@ -37,10 +41,10 @@ public class EntitlementTestApi implements IEntitlementTestApi {
}
@Override
- public void doProcessReadyEvents() {
+ public void doProcessReadyEvents(UUID [] subscriptionsIds, Boolean recursive, Boolean oneEventOnly) {
if (config.isEventProcessingOff()) {
log.warn("Running event processing loop");
- apiEventProcessor.processAllReadyEvents();
+ apiEventProcessor.processAllReadyEvents(subscriptionsIds, recursive, oneEventOnly);
}
}
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/EntitlementUserApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/EntitlementUserApi.java
index 93fe561..127eedb 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/EntitlementUserApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/EntitlementUserApi.java
@@ -76,6 +76,12 @@ public class EntitlementUserApi implements IEntitlementUserApi {
}
@Override
+ public List<ISubscription> getSubscriptionsForKey(String bundleKey) {
+ return dao.getSubscriptionsForKey(bundleKey);
+ }
+
+
+ @Override
public List<ISubscription> getSubscriptionsForBundle(UUID bundleId) {
return dao.getSubscriptions(bundleId);
}
@@ -158,6 +164,4 @@ public class EntitlementUserApi implements IEntitlementUserApi {
// STEPH Also update startDate for bundle ?
return dao.createSubscription(subscription, events);
}
-
-
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessor.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessor.java
index 541de37..73eb863 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessor.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessor.java
@@ -16,6 +16,7 @@
package com.ning.billing.entitlement.engine.core;
+import java.util.Collection;
import java.util.List;
import com.google.inject.Inject;
@@ -34,13 +35,17 @@ public class ApiEventProcessor extends ApiEventProcessorBase {
@Override
protected boolean doProcessEvents(int sequenceId) {
- long prev = nbProcessedEvents;
List<IEvent> claimedEvents = dao.getEventsReady(apiProcessorId, sequenceId);
if (claimedEvents.size() == 0) {
return false;
}
log.debug(String.format("ApiEventProcessor got %d events", claimedEvents.size()));
+ return doProcessEventsFromList(sequenceId, claimedEvents);
+ }
+
+ protected boolean doProcessEventsFromList(int sequenceId, Collection<IEvent> claimedEvents) {
+ long prev = nbProcessedEvents;
for (IEvent cur : claimedEvents) {
log.debug(String.format("ApiEventProcessor seq = %d got event %s", sequenceId, cur.getId()));
listener.processEventReady(cur);
@@ -52,6 +57,6 @@ public class ApiEventProcessor extends ApiEventProcessorBase {
log.debug(String.format("ApiEventProcessor cleared events %d", nbProcessedEvents - prev));
//log.debug(String.format("ApiEventProcessor seq = %d cleared events %s", sequenceId, claimedEvents.get(0).getId()));
return true;
- }
+ }
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java
index f3faf8e..ff98ed8 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java
@@ -17,6 +17,10 @@
package com.ning.billing.entitlement.engine.core;
import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
@@ -28,8 +32,11 @@ import org.skife.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
import com.google.inject.Inject;
import com.ning.billing.config.IEntitlementConfig;
+import com.ning.billing.entitlement.api.user.ISubscription;
import com.ning.billing.entitlement.engine.dao.IEntitlementDao;
import com.ning.billing.entitlement.events.IEvent;
import com.ning.billing.util.clock.IClock;
@@ -158,6 +165,11 @@ public abstract class ApiEventProcessorBase implements IApiEventProcessor {
@Override
public void stopNotifications() {
+
+ if (config.isEventProcessingOff()) {
+ return;
+ }
+
synchronized(this) {
isProcessingEvents = false;
try {
@@ -172,20 +184,71 @@ public abstract class ApiEventProcessorBase implements IApiEventProcessor {
}
+ //
// Used for system test purpose only when event processing has been disabled.
+ // This is not necessarily pretty
+ //
@Override
- public void processAllReadyEvents() {
+ public void processAllReadyEvents(final UUID [] subscriptionsIds, final Boolean recursive, final Boolean oneEventOnly) {
+ processAllReadyEventsRecursively(subscriptionsIds, recursive, oneEventOnly);
+ }
+ private boolean processAllReadyEventsRecursively(final UUID [] subscriptionsIds,
+ final Boolean recursive,
+ final Boolean oneEventOnly) {
- boolean keepProcessing = false;
- /*
+ int curSequenceId = sequenceId.getAndIncrement();
+
+ //Get all current ready events
+ List<IEvent> claimedEvents = new LinkedList<IEvent>();
do {
- */
- keepProcessing = doProcessEvents(sequenceId.incrementAndGet());
- /*
- } while (keepProcessing);
- */
+ List<IEvent> tmpEvents = dao.getEventsReady(apiProcessorId, curSequenceId);
+ if (tmpEvents.size() == 0) {
+ break;
+ }
+ claimedEvents.addAll(tmpEvents);
+ if (oneEventOnly) {
+ break;
+ }
+ } while(true);
+ if (claimedEvents.size() == 0) {
+ return false;
+ }
+
+ // Filter for specific subscriptions if needed
+ Collection<IEvent> claimedEventsFiltered = null;
+ if (subscriptionsIds == null) {
+ claimedEventsFiltered = claimedEvents;
+ } else {
+
+ claimedEventsFiltered = Collections2.filter(claimedEvents, new Predicate<IEvent>() {
+ @Override
+ public boolean apply(IEvent input) {
+ for (UUID cur : subscriptionsIds) {
+ if (cur.equals(input.getId())) {
+ return true;
+ }
+ }
+ return false;
+ }
+ });
+ }
+ // If only one event is requested extract it
+ if (oneEventOnly) {
+ List<IEvent> oneEventList = new ArrayList<IEvent>(1);
+ oneEventList.add(claimedEventsFiltered.iterator().next());
+ claimedEventsFiltered = oneEventList;
+ }
+
+ // Call processing method
+ doProcessEventsFromList(curSequenceId, claimedEventsFiltered);
+ // Keep going is recursive
+ if (recursive && !oneEventOnly) {
+ processAllReadyEventsRecursively(subscriptionsIds, recursive, oneEventOnly);
+ }
+ return true;
}
protected abstract boolean doProcessEvents(int sequenceId);
+ protected abstract boolean doProcessEventsFromList(int sequenceId, Collection<IEvent> events);
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/IApiEventProcessor.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/IApiEventProcessor.java
index f51ae7e..dc04996 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/IApiEventProcessor.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/IApiEventProcessor.java
@@ -16,9 +16,11 @@
package com.ning.billing.entitlement.engine.core;
+import java.util.UUID;
+
public interface IApiEventProcessor extends IEventNotifier {
- public void processAllReadyEvents();
+ public void processAllReadyEvents(UUID [] subscriptionsIds, Boolean recursive, Boolean oneEventOnly);
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
index 197e4bd..b490fcd 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
@@ -107,6 +107,15 @@ public class EntitlementDao implements IEntitlementDao {
}
@Override
+ public List<ISubscription> getSubscriptionsForKey(String bundleKey) {
+ ISubscriptionBundle bundle = bundlesDao.getBundleFromKey(bundleKey);
+ if (bundle == null) {
+ return Collections.emptyList();
+ }
+ return subscriptionsDao.getSubscriptionsFromBundleId(bundle.getId().toString());
+ }
+
+ @Override
public void updateSubscription(Subscription subscription) {
Date ctd = (subscription.getChargedThroughDate() != null) ? subscription.getChargedThroughDate().toDate() : null;
Date ptd = (subscription.getPaidThroughDate() != null) ? subscription.getPaidThroughDate().toDate() : null;
@@ -178,7 +187,7 @@ public class EntitlementDao implements IEntitlementDao {
}
@Override
- public void clearEventsReady(final UUID ownerId, final List<IEvent> cleared) {
+ public void clearEventsReady(final UUID ownerId, final Collection<IEvent> cleared) {
log.debug(String.format("EntitlementDao clearEventsReady START cleared size = %d", cleared.size()));
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/IBundleSqlDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/IBundleSqlDao.java
index 737bf52..e77c83c 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/IBundleSqlDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/IBundleSqlDao.java
@@ -53,8 +53,13 @@ public interface IBundleSqlDao extends Transactional<IEventSqlDao>, CloseMe, Tra
@SqlQuery
@Mapper(ISubscriptionBundleSqlMapper.class)
+ public ISubscriptionBundle getBundleFromKey(@Bind("name") String name);
+
+ @SqlQuery
+ @Mapper(ISubscriptionBundleSqlMapper.class)
public List<ISubscriptionBundle> getBundleFromAccount(@Bind("account_id") String accountId);
+
public static class SubscriptionBundleBinder implements Binder<Bind, SubscriptionBundle> {
private Date getDate(DateTime dateTime) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/IEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/IEntitlementDao.java
index 1f35fa0..2d46439 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/IEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/IEntitlementDao.java
@@ -16,6 +16,7 @@
package com.ning.billing.entitlement.engine.dao;
+import java.util.Collection;
import java.util.List;
import java.util.UUID;
@@ -43,6 +44,8 @@ public interface IEntitlementDao {
public List<ISubscription> getSubscriptions(UUID bundleId);
+ public List<ISubscription> getSubscriptionsForKey(String bundleKey);
+
// Update
public void updateSubscription(Subscription subscription);
@@ -55,7 +58,7 @@ public interface IEntitlementDao {
public List<IEvent> getEventsReady(UUID ownerId, int sequenceId);
- public void clearEventsReady(UUID ownerId, List<IEvent> cleared);
+ public void clearEventsReady(UUID ownerId, Collection<IEvent> cleared);
// Subscription creation, cancellation, changePlan apis
public ISubscription createSubscription(Subscription subscription, List<IEvent> initialEvents);
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/IBundleSqlDao.sql.stg b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/IBundleSqlDao.sql.stg
index 22a85f8..045a241 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/IBundleSqlDao.sql.stg
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/IBundleSqlDao.sql.stg
@@ -27,6 +27,18 @@ getBundleFromId(id) ::= <<
;
>>
+getBundleFromKey(name) ::= <<
+ select
+ id
+ , start_dt
+ , name
+ , account_id
+ from bundles
+ where
+ name = :name
+ ;
+>>
+
getBundleFromAccount(account_id) ::= <<
select
@@ -39,3 +51,4 @@ getBundleFromAccount(account_id) ::= <<
account_id = :account_id
;
>>
+
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorMemoryMock.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorMemoryMock.java
index 79aef55..b5848ce 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorMemoryMock.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorMemoryMock.java
@@ -16,12 +16,14 @@
package com.ning.billing.entitlement.engine.core;
+import java.util.Collection;
import java.util.List;
import com.google.inject.Inject;
import com.ning.billing.config.IEntitlementConfig;
import com.ning.billing.entitlement.engine.dao.IEntitlementDao;
import com.ning.billing.entitlement.events.IEvent;
+import com.ning.billing.entitlement.exceptions.EntitlementError;
import com.ning.billing.util.clock.IClock;
public class ApiEventProcessorMemoryMock extends ApiEventProcessorBase {
@@ -51,4 +53,11 @@ public class ApiEventProcessorMemoryMock extends ApiEventProcessorBase {
log.info(String.format("doProcessEvents : clearEvents"));
return true;
}
+
+
+ @Override
+ protected boolean doProcessEventsFromList(int sequenceId,
+ Collection<IEvent> events) {
+ throw new EntitlementError("Method not implemented");
+ }
}
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/EntitlementDaoMemoryMock.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/EntitlementDaoMemoryMock.java
index 1e043eb..c6fff2f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/EntitlementDaoMemoryMock.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/EntitlementDaoMemoryMock.java
@@ -17,6 +17,7 @@
package com.ning.billing.entitlement.engine.dao;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
@@ -53,6 +54,8 @@ public class EntitlementDaoMemoryMock implements IEntitlementDao, IEntitlementDa
private final IClock clock;
private final IEntitlementConfig config;
+
+
@Inject
public EntitlementDaoMemoryMock(IClock clock, IEntitlementConfig config) {
super();
@@ -108,6 +111,18 @@ public class EntitlementDaoMemoryMock implements IEntitlementDao, IEntitlementDa
}
@Override
+ public List<ISubscription> getSubscriptionsForKey(String bundleKey) {
+
+ for (ISubscriptionBundle cur : bundles) {
+ if (cur.getKey().equals(bundleKey)) {
+ return getSubscriptions(cur.getId());
+ }
+ }
+ return Collections.emptyList();
+ }
+
+
+ @Override
public ISubscription createSubscription(Subscription subscription, List<IEvent> initalEvents) {
synchronized(events) {
@@ -199,7 +214,7 @@ public class EntitlementDaoMemoryMock implements IEntitlementDao, IEntitlementDa
}
@Override
- public void clearEventsReady(UUID ownerId, List<IEvent> cleared) {
+ public void clearEventsReady(UUID ownerId, Collection<IEvent> cleared) {
synchronized(events) {
for (IEvent cur : cleared) {
if (cur.getOwner().equals(ownerId)) {
@@ -328,4 +343,5 @@ public class EntitlementDaoMemoryMock implements IEntitlementDao, IEntitlementDa
}
}
}
+
}