killbill-memoizeit
Changes
beatrix/pom.xml 5(+5 -0)
Details
beatrix/pom.xml 5(+5 -0)
diff --git a/beatrix/pom.xml b/beatrix/pom.xml
index 01421db..b789b0e 100644
--- a/beatrix/pom.xml
+++ b/beatrix/pom.xml
@@ -195,6 +195,11 @@
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
index e355fee..bfba1d1 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
@@ -27,18 +27,16 @@ import com.ning.billing.ObjectType;
import com.ning.billing.account.api.Account;
import com.ning.billing.account.api.AccountApiException;
import com.ning.billing.beatrix.bus.api.ExternalBus;
+import com.ning.billing.bus.BusPersistentEvent;
import com.ning.billing.bus.PersistentBus.EventBusException;
import com.ning.billing.entitlement.api.SubscriptionTransitionType;
import com.ning.billing.notification.plugin.api.ExtBusEventType;
-import com.ning.billing.tenant.api.TenantUserApi;
-import com.ning.billing.util.Hostname;
import com.ning.billing.util.callcontext.CallOrigin;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.callcontext.UserType;
import com.ning.billing.util.events.AccountChangeInternalEvent;
import com.ning.billing.util.events.AccountCreationInternalEvent;
-import com.ning.billing.util.events.BusExternalEvent;
import com.ning.billing.util.events.BusInternalEvent;
import com.ning.billing.util.events.BusInternalEvent.BusInternalEventType;
import com.ning.billing.util.events.ControlTagCreationInternalEvent;
@@ -88,7 +86,7 @@ public class BeatrixListener {
final InternalCallContext internalContext = internalCallContextFactory.createInternalCallContext(event.getTenantRecordId(), event.getAccountRecordId(), "BeatrixListener", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
try {
- final BusExternalEvent externalEvent = computeExtBusEventEntryFromBusInternalEvent(event, internalContext);
+ final BusPersistentEvent externalEvent = computeExtBusEventEntryFromBusInternalEvent(event, internalContext);
if (externalEvent != null) {
((PersistentExternalBus) externalBus).post(externalEvent, internalContext);
}
@@ -98,7 +96,7 @@ public class BeatrixListener {
}
- private BusExternalEvent computeExtBusEventEntryFromBusInternalEvent(final BusInternalEvent event, final InternalCallContext context) {
+ private BusPersistentEvent computeExtBusEventEntryFromBusInternalEvent(final BusInternalEvent event, final InternalCallContext context) {
ObjectType objectType = null;
UUID objectId = null;
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/DefaultBusExternalEvent.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/DefaultBusExternalEvent.java
index 46717a7..5809695 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/DefaultBusExternalEvent.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/DefaultBusExternalEvent.java
@@ -3,58 +3,64 @@ package com.ning.billing.beatrix.extbus;
import java.util.UUID;
import com.ning.billing.ObjectType;
+import com.ning.billing.bus.BusPersistentEvent;
+import com.ning.billing.notification.plugin.api.ExtBusEvent;
import com.ning.billing.notification.plugin.api.ExtBusEventType;
import com.ning.billing.util.events.BusEventBase;
-import com.ning.billing.util.events.BusExternalEvent;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-public class DefaultBusExternalEvent extends BusEventBase implements BusExternalEvent {
+public class DefaultBusExternalEvent extends BusEventBase implements ExtBusEvent, BusPersistentEvent {
private final UUID objectId;
private final UUID accountId;
private final UUID tenantId;
private final ObjectType objectType;
- private final ExtBusEventType busEventType;
+ private final ExtBusEventType eventType;
@JsonCreator
public DefaultBusExternalEvent(@JsonProperty("objectId") final UUID objectId,
@JsonProperty("objectType") final ObjectType objectType,
- @JsonProperty("busEventType") final ExtBusEventType busEventType,
+ @JsonProperty("eventType") final ExtBusEventType eventType,
@JsonProperty("userToken") final UUID userToken,
@JsonProperty("accountId") final UUID accountId,
@JsonProperty("tenantId") final UUID tenantId) {
super(userToken, null, null);
- this.busEventType = busEventType;
+ this.eventType = eventType;
this.objectType = objectType;
this.objectId = objectId;
this.accountId = accountId;
this.tenantId = tenantId;
}
+ @Override
public UUID getObjectId() {
return objectId;
}
+ @Override
public UUID getAccountId() {
return accountId;
}
+ @Override
public UUID getTenantId() {
return tenantId;
}
- public ObjectType getObjectType() {
- return objectType;
+ @Override
+ public ExtBusEventType getEventType() {
+ return eventType;
}
@Override
- public ExtBusEventType getBusEventType() {
- return busEventType;
+ public ObjectType getObjectType() {
+ return objectType;
}
+
@Override
public boolean equals(final Object o) {
if (this == o) {
@@ -69,7 +75,7 @@ public class DefaultBusExternalEvent extends BusEventBase implements BusExternal
if (accountId != null ? !accountId.equals(that.accountId) : that.accountId != null) {
return false;
}
- if (busEventType != that.busEventType) {
+ if (eventType != that.eventType) {
return false;
}
if (objectId != null ? !objectId.equals(that.objectId) : that.objectId != null) {
@@ -91,7 +97,7 @@ public class DefaultBusExternalEvent extends BusEventBase implements BusExternal
result = 31 * result + (accountId != null ? accountId.hashCode() : 0);
result = 31 * result + (tenantId != null ? tenantId.hashCode() : 0);
result = 31 * result + objectType.hashCode();
- result = 31 * result + busEventType.hashCode();
+ result = 31 * result + eventType.hashCode();
return result;
}
}
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
index 9da5d7f..b27ced5 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
@@ -19,6 +19,7 @@ package com.ning.billing.beatrix.extbus;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -26,12 +27,15 @@ import org.skife.jdbi.v2.IDBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.ning.billing.ObjectType;
import com.ning.billing.beatrix.bus.api.ExternalBus;
import com.ning.billing.bus.BusPersistentEvent;
import com.ning.billing.bus.PersistentBus.EventBusException;
import com.ning.billing.bus.PersistentBusConfig;
import com.ning.billing.bus.dao.BusEventEntry;
import com.ning.billing.bus.dao.PersistentBusSqlDao;
+import com.ning.billing.notification.plugin.api.ExtBusEvent;
+import com.ning.billing.notification.plugin.api.ExtBusEventType;
import com.ning.billing.queue.PersistentQueueBase;
import com.ning.billing.util.Hostname;
import com.ning.billing.util.bus.DefaultBusService;
@@ -49,6 +53,8 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
private static final long DELTA_IN_PROCESSING_TIME_MS = 1000L * 60L * 5L; // 5 minutes
private static final int MAX_BUS_EVENTS = 1;
+ final String bus_ext_events = "bus_ext_events";
+
private static final Logger log = LoggerFactory.getLogger(PersistentExternalBus.class);
private final PersistentBusSqlDao dao;
@@ -90,15 +96,10 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
for (final BusEventEntry cur : events) {
final String jsonWithAccountAndTenantRecorId = tweakJsonToIncludeAccountAndTenantRecordId(cur.getBusEventJson(), cur.getAccountRecordId(), cur.getTenantRecordId());
final BusPersistentEvent evt = deserializeEvent(cur.getBusEventClass(), objectMapper, jsonWithAccountAndTenantRecorId);
-
- //TODO STEPH needs to be fixed with accountId and tenantId...
-
- //final UUID accountId = getAccountIdFromRecordId(evt.getAccountRecordId());
- //final BusPersistentEvent evtWithAccountAndTenantId = new BusPersistentEvent(evt, );
result++;
// STEPH exception handling is done by GUAVA-- logged a bug Issue-780
eventBusDelegate.post(evt);
- dao.clearBusEvent(cur.getId(), com.ning.billing.Hostname.get());
+ dao.clearBusEvent(cur.getId(), com.ning.billing.Hostname.get(), bus_ext_events);
}
return result;
}
@@ -109,10 +110,10 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
final Date now = clock.getUTCNow().toDate();
final Date nextAvailable = clock.getUTCNow().plus(DELTA_IN_PROCESSING_TIME_MS).toDate();
- final List<BusEventEntry> entries = dao.getNextBusEventEntries(config.getPrefetchAmount(), com.ning.billing.Hostname.get(), now);
+ final List<BusEventEntry> entries = dao.getNextBusEventEntries(config.getPrefetchAmount(), com.ning.billing.Hostname.get(), now, bus_ext_events);
final List<BusEventEntry> claimedEntries = new LinkedList<BusEventEntry>();
for (final BusEventEntry entry : entries) {
- final boolean claimed = (dao.claimBusEvent(com.ning.billing.Hostname.get(), nextAvailable, entry.getId(), now) == 1);
+ final boolean claimed = (dao.claimBusEvent(com.ning.billing.Hostname.get(), nextAvailable, entry.getId(), now, bus_ext_events) == 1);
if (claimed) {
dao.insertClaimedHistory(com.ning.billing.Hostname.get(), now, entry.getId(), entry.getAccountRecordId(), entry.getTenantRecordId());
claimedEntries.add(entry);
@@ -137,7 +138,7 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
try {
json = objectMapper.writeValueAsString(event);
final BusEventEntry entry = new BusEventEntry(com.ning.billing.Hostname.get(), event.getClass().getName(), json, event.getUserToken(), event.getAccountRecordId(), event.getTenantRecordId());
- dao.insertBusEvent(entry);
+ dao.insertBusEvent(entry, bus_ext_events);
} catch (JsonProcessingException e) {
throw new EventBusException("Failed to serialize ext bus event", e);
diff --git a/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql b/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql
index 789fdf6..5a5d608 100644
--- a/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql
+++ b/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql
@@ -3,21 +3,22 @@
DROP TABLE IF EXISTS bus_ext_events;
CREATE TABLE bus_ext_events (
record_id int(11) unsigned NOT NULL AUTO_INCREMENT,
- event_type varchar(32) NOT NULL,
- object_id varchar(64) DEFAULT NULL,
- object_type varchar(32) NOT NULL,
+ class_name varchar(128) NOT NULL,
+ event_json varchar(2048) NOT NULL,
user_token char(36),
created_date datetime NOT NULL,
creating_owner char(50) NOT NULL,
processing_owner char(50) DEFAULT NULL,
processing_available_date datetime DEFAULT NULL,
processing_state varchar(14) DEFAULT 'AVAILABLE',
- account_record_id int(11) unsigned DEFAULT NULL,
- tenant_record_id int(11) unsigned DEFAULT NULL,
+ account_record_id int(11) unsigned default null,
+ tenant_record_id int(11) unsigned default null,
PRIMARY KEY(record_id)
);
-CREATE INDEX `idx_bus_ext_where` ON bus_ext_events (`processing_state`,`processing_owner`,`processing_available_date`);
-CREATE INDEX bus_ext_events_tenant_account_record_id ON bus_ext_events(tenant_record_id, account_record_id);
+CREATE INDEX `idx_ext_bus_where` ON bus_ext_events (`processing_state`,`processing_owner`,`processing_available_date`);
+CREATE INDEX ext_bus_events_tenant_account_record_id ON bus_ext_events(tenant_record_id, account_record_id);
+
+
DROP TABLE IF EXISTS claimed_bus_ext_events;
CREATE TABLE claimed_bus_ext_events (
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/extbus/TestEventJson.java b/beatrix/src/test/java/com/ning/billing/beatrix/extbus/TestEventJson.java
index 97c93fd..a356a70 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/extbus/TestEventJson.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/extbus/TestEventJson.java
@@ -6,11 +6,12 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import com.ning.billing.ObjectType;
+import com.ning.billing.beatrix.BeatrixTestSuite;
import com.ning.billing.notification.plugin.api.ExtBusEventType;
-import com.ning.billing.payment.PaymentTestSuiteNoDB;
import com.ning.billing.util.jackson.ObjectMapper;
-public class TestEventJson extends PaymentTestSuiteNoDB {
+
+public class TestEventJson extends BeatrixTestSuite {
private final ObjectMapper mapper = new ObjectMapper();
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/osgi/TestBasicOSGIWithTestBundle.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/osgi/TestBasicOSGIWithTestBundle.java
index d02c542..744af92 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/osgi/TestBasicOSGIWithTestBundle.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/osgi/TestBasicOSGIWithTestBundle.java
@@ -152,7 +152,7 @@ public class TestBasicOSGIWithTestBundle extends TestOSGIBase {
private void assertWithCallback(final AwaitCallback callback, final String error) {
try {
- await().atMost(5, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+ await().atMost(5000000, TimeUnit.SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return callback.isSuccess();
diff --git a/util/src/main/java/com/ning/billing/util/bus/InMemoryBusModule.java b/util/src/main/java/com/ning/billing/util/bus/InMemoryBusModule.java
index 21034e5..19b6942 100644
--- a/util/src/main/java/com/ning/billing/util/bus/InMemoryBusModule.java
+++ b/util/src/main/java/com/ning/billing/util/bus/InMemoryBusModule.java
@@ -23,6 +23,6 @@ import com.ning.billing.util.glue.BusModule;
public class InMemoryBusModule extends BusModule {
public InMemoryBusModule(final ConfigSource configSource) {
- super(BusType.MEMORY, configSource);
+ super(BusType.MEMORY, configSource, "foo");
}
}
diff --git a/util/src/main/java/com/ning/billing/util/glue/BusModule.java b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
index aaeaecb..34afc53 100644
--- a/util/src/main/java/com/ning/billing/util/glue/BusModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
@@ -19,6 +19,7 @@ package com.ning.billing.util.glue;
import org.skife.config.ConfigSource;
import org.skife.config.ConfigurationObjectFactory;
+import com.ning.billing.bus.BusTableName;
import com.ning.billing.bus.DefaultPersistentBus;
import com.ning.billing.bus.InMemoryPersistentBus;
import com.ning.billing.bus.PersistentBus;
@@ -27,19 +28,26 @@ import com.ning.billing.util.bus.DefaultBusService;
import com.ning.billing.util.svcsapi.bus.BusService;
import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
public class BusModule extends AbstractModule {
private final BusType type;
private final ConfigSource configSource;
+ private final String tableName;
public BusModule(final ConfigSource configSource) {
- this(BusType.PERSISTENT, configSource);
+ this(BusType.PERSISTENT, configSource, "bus_events");
}
- protected BusModule(final BusType type, final ConfigSource configSource) {
+ public BusModule(final ConfigSource configSource, final String tableName) {
+ this(BusType.PERSISTENT, configSource, tableName);
+ }
+
+ protected BusModule(final BusType type, final ConfigSource configSource, final String tableName) {
this.type = type;
this.configSource = configSource;
+ this.tableName = tableName;
}
public enum BusType {
@@ -70,6 +78,7 @@ public class BusModule extends AbstractModule {
private void configurePersistentEventBus() {
configurePersistentBusConfig();
+ bind(String.class).annotatedWith(BusTableName.class).toInstance(tableName);
bind(PersistentBus.class).to(DefaultPersistentBus.class).asEagerSingleton();
}