killbill-memoizeit

Details

beatrix/pom.xml 5(+5 -0)

diff --git a/beatrix/pom.xml b/beatrix/pom.xml
index 01421db..b789b0e 100644
--- a/beatrix/pom.xml
+++ b/beatrix/pom.xml
@@ -195,6 +195,11 @@
             <artifactId>jdbi</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.skife.config</groupId>
             <artifactId>config-magic</artifactId>
         </dependency>
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
index e355fee..bfba1d1 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
@@ -27,18 +27,16 @@ import com.ning.billing.ObjectType;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountApiException;
 import com.ning.billing.beatrix.bus.api.ExternalBus;
+import com.ning.billing.bus.BusPersistentEvent;
 import com.ning.billing.bus.PersistentBus.EventBusException;
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
 import com.ning.billing.notification.plugin.api.ExtBusEventType;
-import com.ning.billing.tenant.api.TenantUserApi;
-import com.ning.billing.util.Hostname;
 import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.events.AccountChangeInternalEvent;
 import com.ning.billing.util.events.AccountCreationInternalEvent;
-import com.ning.billing.util.events.BusExternalEvent;
 import com.ning.billing.util.events.BusInternalEvent;
 import com.ning.billing.util.events.BusInternalEvent.BusInternalEventType;
 import com.ning.billing.util.events.ControlTagCreationInternalEvent;
@@ -88,7 +86,7 @@ public class BeatrixListener {
 
         final InternalCallContext internalContext = internalCallContextFactory.createInternalCallContext(event.getTenantRecordId(), event.getAccountRecordId(), "BeatrixListener", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
         try {
-            final BusExternalEvent externalEvent = computeExtBusEventEntryFromBusInternalEvent(event, internalContext);
+            final BusPersistentEvent externalEvent = computeExtBusEventEntryFromBusInternalEvent(event, internalContext);
             if (externalEvent != null) {
                 ((PersistentExternalBus) externalBus).post(externalEvent, internalContext);
             }
@@ -98,7 +96,7 @@ public class BeatrixListener {
     }
 
 
-    private BusExternalEvent computeExtBusEventEntryFromBusInternalEvent(final BusInternalEvent event, final InternalCallContext context) {
+    private BusPersistentEvent computeExtBusEventEntryFromBusInternalEvent(final BusInternalEvent event, final InternalCallContext context) {
 
         ObjectType objectType = null;
         UUID objectId = null;
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/DefaultBusExternalEvent.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/DefaultBusExternalEvent.java
index 46717a7..5809695 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/DefaultBusExternalEvent.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/DefaultBusExternalEvent.java
@@ -3,58 +3,64 @@ package com.ning.billing.beatrix.extbus;
 import java.util.UUID;
 
 import com.ning.billing.ObjectType;
+import com.ning.billing.bus.BusPersistentEvent;
+import com.ning.billing.notification.plugin.api.ExtBusEvent;
 import com.ning.billing.notification.plugin.api.ExtBusEventType;
 import com.ning.billing.util.events.BusEventBase;
-import com.ning.billing.util.events.BusExternalEvent;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultBusExternalEvent extends BusEventBase implements BusExternalEvent {
+public class DefaultBusExternalEvent extends BusEventBase implements ExtBusEvent, BusPersistentEvent {
 
     private final UUID objectId;
     private final UUID accountId;
     private final UUID tenantId;
     private final ObjectType objectType;
-    private final ExtBusEventType busEventType;
+    private final ExtBusEventType eventType;
 
 
     @JsonCreator
     public DefaultBusExternalEvent(@JsonProperty("objectId") final UUID objectId,
                                    @JsonProperty("objectType") final ObjectType objectType,
-                                   @JsonProperty("busEventType") final ExtBusEventType busEventType,
+                                   @JsonProperty("eventType") final ExtBusEventType eventType,
                                    @JsonProperty("userToken") final UUID userToken,
                                    @JsonProperty("accountId") final UUID accountId,
                                    @JsonProperty("tenantId") final UUID tenantId) {
         super(userToken, null, null);
-        this.busEventType = busEventType;
+        this.eventType = eventType;
         this.objectType = objectType;
         this.objectId = objectId;
         this.accountId = accountId;
         this.tenantId = tenantId;
     }
 
+    @Override
     public UUID getObjectId() {
         return objectId;
     }
 
+    @Override
     public UUID getAccountId() {
         return accountId;
     }
 
+    @Override
     public UUID getTenantId() {
         return tenantId;
     }
 
-    public ObjectType getObjectType() {
-        return objectType;
+    @Override
+    public ExtBusEventType getEventType() {
+        return eventType;
     }
 
     @Override
-    public ExtBusEventType getBusEventType() {
-        return busEventType;
+    public ObjectType getObjectType() {
+        return objectType;
     }
 
+
     @Override
     public boolean equals(final Object o) {
         if (this == o) {
@@ -69,7 +75,7 @@ public class DefaultBusExternalEvent extends BusEventBase implements BusExternal
         if (accountId != null ? !accountId.equals(that.accountId) : that.accountId != null) {
             return false;
         }
-        if (busEventType != that.busEventType) {
+        if (eventType != that.eventType) {
             return false;
         }
         if (objectId != null ? !objectId.equals(that.objectId) : that.objectId != null) {
@@ -91,7 +97,7 @@ public class DefaultBusExternalEvent extends BusEventBase implements BusExternal
         result = 31 * result + (accountId != null ? accountId.hashCode() : 0);
         result = 31 * result + (tenantId != null ? tenantId.hashCode() : 0);
         result = 31 * result + objectType.hashCode();
-        result = 31 * result + busEventType.hashCode();
+        result = 31 * result + eventType.hashCode();
         return result;
     }
 }
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
index 9da5d7f..b27ced5 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
@@ -19,6 +19,7 @@ package com.ning.billing.beatrix.extbus;
 import java.util.Date;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
@@ -26,12 +27,15 @@ import org.skife.jdbi.v2.IDBI;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.ning.billing.ObjectType;
 import com.ning.billing.beatrix.bus.api.ExternalBus;
 import com.ning.billing.bus.BusPersistentEvent;
 import com.ning.billing.bus.PersistentBus.EventBusException;
 import com.ning.billing.bus.PersistentBusConfig;
 import com.ning.billing.bus.dao.BusEventEntry;
 import com.ning.billing.bus.dao.PersistentBusSqlDao;
+import com.ning.billing.notification.plugin.api.ExtBusEvent;
+import com.ning.billing.notification.plugin.api.ExtBusEventType;
 import com.ning.billing.queue.PersistentQueueBase;
 import com.ning.billing.util.Hostname;
 import com.ning.billing.util.bus.DefaultBusService;
@@ -49,6 +53,8 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
     private static final long DELTA_IN_PROCESSING_TIME_MS = 1000L * 60L * 5L; // 5 minutes
     private static final int MAX_BUS_EVENTS = 1;
 
+    final String bus_ext_events = "bus_ext_events";
+
     private static final Logger log = LoggerFactory.getLogger(PersistentExternalBus.class);
 
     private final PersistentBusSqlDao dao;
@@ -90,15 +96,10 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
         for (final BusEventEntry cur : events) {
             final String jsonWithAccountAndTenantRecorId = tweakJsonToIncludeAccountAndTenantRecordId(cur.getBusEventJson(), cur.getAccountRecordId(), cur.getTenantRecordId());
             final BusPersistentEvent evt = deserializeEvent(cur.getBusEventClass(), objectMapper, jsonWithAccountAndTenantRecorId);
-
-            //TODO STEPH needs to be fixed with accountId and tenantId...
-
-            //final UUID accountId = getAccountIdFromRecordId(evt.getAccountRecordId());
-            //final BusPersistentEvent evtWithAccountAndTenantId = new BusPersistentEvent(evt, );
             result++;
             // STEPH exception handling is done by GUAVA-- logged a bug Issue-780
             eventBusDelegate.post(evt);
-            dao.clearBusEvent(cur.getId(), com.ning.billing.Hostname.get());
+            dao.clearBusEvent(cur.getId(), com.ning.billing.Hostname.get(), bus_ext_events);
         }
         return result;
     }
@@ -109,10 +110,10 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
         final Date now = clock.getUTCNow().toDate();
         final Date nextAvailable = clock.getUTCNow().plus(DELTA_IN_PROCESSING_TIME_MS).toDate();
 
-        final List<BusEventEntry> entries = dao.getNextBusEventEntries(config.getPrefetchAmount(), com.ning.billing.Hostname.get(), now);
+        final List<BusEventEntry> entries = dao.getNextBusEventEntries(config.getPrefetchAmount(), com.ning.billing.Hostname.get(), now, bus_ext_events);
         final List<BusEventEntry> claimedEntries = new LinkedList<BusEventEntry>();
         for (final BusEventEntry entry : entries) {
-            final boolean claimed = (dao.claimBusEvent(com.ning.billing.Hostname.get(), nextAvailable, entry.getId(), now) == 1);
+            final boolean claimed = (dao.claimBusEvent(com.ning.billing.Hostname.get(), nextAvailable, entry.getId(), now, bus_ext_events) == 1);
             if (claimed) {
                 dao.insertClaimedHistory(com.ning.billing.Hostname.get(), now, entry.getId(), entry.getAccountRecordId(), entry.getTenantRecordId());
                 claimedEntries.add(entry);
@@ -137,7 +138,7 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
         try {
             json = objectMapper.writeValueAsString(event);
             final BusEventEntry entry = new BusEventEntry(com.ning.billing.Hostname.get(), event.getClass().getName(), json, event.getUserToken(), event.getAccountRecordId(), event.getTenantRecordId());
-            dao.insertBusEvent(entry);
+            dao.insertBusEvent(entry, bus_ext_events);
 
         } catch (JsonProcessingException e) {
             throw new EventBusException("Failed to serialize ext bus event", e);
diff --git a/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql b/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql
index 789fdf6..5a5d608 100644
--- a/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql
+++ b/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql
@@ -3,21 +3,22 @@
 DROP TABLE IF EXISTS bus_ext_events;
 CREATE TABLE bus_ext_events (
     record_id int(11) unsigned NOT NULL AUTO_INCREMENT,
-    event_type varchar(32) NOT NULL,
-    object_id varchar(64) DEFAULT NULL,
-    object_type varchar(32) NOT NULL,
+    class_name varchar(128) NOT NULL,
+    event_json varchar(2048) NOT NULL,
     user_token char(36),
     created_date datetime NOT NULL,
     creating_owner char(50) NOT NULL,
     processing_owner char(50) DEFAULT NULL,
     processing_available_date datetime DEFAULT NULL,
     processing_state varchar(14) DEFAULT 'AVAILABLE',
-    account_record_id int(11) unsigned DEFAULT NULL,
-    tenant_record_id int(11) unsigned DEFAULT NULL,
+    account_record_id int(11) unsigned default null,
+    tenant_record_id int(11) unsigned default null,
     PRIMARY KEY(record_id)
 );
-CREATE INDEX  `idx_bus_ext_where` ON bus_ext_events (`processing_state`,`processing_owner`,`processing_available_date`);
-CREATE INDEX bus_ext_events_tenant_account_record_id ON bus_ext_events(tenant_record_id, account_record_id);
+CREATE INDEX  `idx_ext_bus_where` ON bus_ext_events (`processing_state`,`processing_owner`,`processing_available_date`);
+CREATE INDEX ext_bus_events_tenant_account_record_id ON bus_ext_events(tenant_record_id, account_record_id);
+
+
 
 DROP TABLE IF EXISTS claimed_bus_ext_events;
 CREATE TABLE claimed_bus_ext_events (
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/extbus/TestEventJson.java b/beatrix/src/test/java/com/ning/billing/beatrix/extbus/TestEventJson.java
index 97c93fd..a356a70 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/extbus/TestEventJson.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/extbus/TestEventJson.java
@@ -6,11 +6,12 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.ning.billing.ObjectType;
+import com.ning.billing.beatrix.BeatrixTestSuite;
 import com.ning.billing.notification.plugin.api.ExtBusEventType;
-import com.ning.billing.payment.PaymentTestSuiteNoDB;
 import com.ning.billing.util.jackson.ObjectMapper;
 
-public class TestEventJson extends PaymentTestSuiteNoDB {
+
+public class TestEventJson extends BeatrixTestSuite {
 
     private final ObjectMapper mapper = new ObjectMapper();
 
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/osgi/TestBasicOSGIWithTestBundle.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/osgi/TestBasicOSGIWithTestBundle.java
index d02c542..744af92 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/osgi/TestBasicOSGIWithTestBundle.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/osgi/TestBasicOSGIWithTestBundle.java
@@ -152,7 +152,7 @@ public class TestBasicOSGIWithTestBundle extends TestOSGIBase {
 
         private void assertWithCallback(final AwaitCallback callback, final String error) {
             try {
-                await().atMost(5, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+                await().atMost(5000000, TimeUnit.SECONDS).until(new Callable<Boolean>() {
                     @Override
                     public Boolean call() throws Exception {
                         return callback.isSuccess();
diff --git a/util/src/main/java/com/ning/billing/util/bus/InMemoryBusModule.java b/util/src/main/java/com/ning/billing/util/bus/InMemoryBusModule.java
index 21034e5..19b6942 100644
--- a/util/src/main/java/com/ning/billing/util/bus/InMemoryBusModule.java
+++ b/util/src/main/java/com/ning/billing/util/bus/InMemoryBusModule.java
@@ -23,6 +23,6 @@ import com.ning.billing.util.glue.BusModule;
 public class InMemoryBusModule extends BusModule {
 
     public InMemoryBusModule(final ConfigSource configSource) {
-        super(BusType.MEMORY, configSource);
+        super(BusType.MEMORY, configSource, "foo");
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/glue/BusModule.java b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
index aaeaecb..34afc53 100644
--- a/util/src/main/java/com/ning/billing/util/glue/BusModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
@@ -19,6 +19,7 @@ package com.ning.billing.util.glue;
 import org.skife.config.ConfigSource;
 import org.skife.config.ConfigurationObjectFactory;
 
+import com.ning.billing.bus.BusTableName;
 import com.ning.billing.bus.DefaultPersistentBus;
 import com.ning.billing.bus.InMemoryPersistentBus;
 import com.ning.billing.bus.PersistentBus;
@@ -27,19 +28,26 @@ import com.ning.billing.util.bus.DefaultBusService;
 import com.ning.billing.util.svcsapi.bus.BusService;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
 
 public class BusModule extends AbstractModule {
 
     private final BusType type;
     private final ConfigSource configSource;
+    private final String tableName;
 
     public BusModule(final ConfigSource configSource) {
-        this(BusType.PERSISTENT, configSource);
+        this(BusType.PERSISTENT, configSource, "bus_events");
     }
 
-    protected BusModule(final BusType type, final ConfigSource configSource) {
+    public BusModule(final ConfigSource configSource, final String tableName) {
+        this(BusType.PERSISTENT, configSource, tableName);
+    }
+
+    protected BusModule(final BusType type, final ConfigSource configSource, final String tableName) {
         this.type = type;
         this.configSource = configSource;
+        this.tableName = tableName;
     }
 
     public enum BusType {
@@ -70,6 +78,7 @@ public class BusModule extends AbstractModule {
 
     private void configurePersistentEventBus() {
         configurePersistentBusConfig();
+        bind(String.class).annotatedWith(BusTableName.class).toInstance(tableName);
         bind(PersistentBus.class).to(DefaultPersistentBus.class).asEagerSingleton();
     }