killbill-aplcache

Changes

beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java 133(+0 -133)

beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java 127(+0 -127)

Details

diff --git a/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountChangeEvent.java b/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountChangeEvent.java
index 45f7372..32a3884 100644
--- a/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountChangeEvent.java
+++ b/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountChangeEvent.java
@@ -23,17 +23,16 @@ import java.util.UUID;
 import com.ning.billing.account.api.DefaultChangedField;
 import com.ning.billing.account.dao.AccountModelDao;
 import com.ning.billing.util.events.AccountChangeInternalEvent;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.ChangedField;
-import com.ning.billing.util.events.DefaultBusInternalEvent;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 
-public class DefaultAccountChangeEvent extends DefaultBusInternalEvent implements AccountChangeInternalEvent {
+public class DefaultAccountChangeEvent extends BusEventBase implements AccountChangeInternalEvent {
 
-    private final UUID userToken;
     private final List<ChangedField> changedFields;
     private final UUID accountId;
 
@@ -44,7 +43,6 @@ public class DefaultAccountChangeEvent extends DefaultBusInternalEvent implement
                                      @JsonProperty("accountRecordId") final Long accountRecordId,
                                      @JsonProperty("tenantRecordId") final Long tenantRecordId) {
         super(userToken, accountRecordId, tenantRecordId);
-        this.userToken = userToken;
         this.accountId = accountId;
         this.changedFields = changedFields;
     }
@@ -53,7 +51,6 @@ public class DefaultAccountChangeEvent extends DefaultBusInternalEvent implement
                                      final Long accountRecordId, final Long tenantRecordId) {
         super(userToken, accountRecordId, tenantRecordId);
         this.accountId = id;
-        this.userToken = userToken;
         this.changedFields = calculateChangedFields(oldData, newData);
     }
 
@@ -64,11 +61,6 @@ public class DefaultAccountChangeEvent extends DefaultBusInternalEvent implement
     }
 
     @Override
-    public UUID getUserToken() {
-        return userToken;
-    }
-
-    @Override
     public UUID getAccountId() {
         return accountId;
     }
@@ -93,8 +85,6 @@ public class DefaultAccountChangeEvent extends DefaultBusInternalEvent implement
                  + ((accountId == null) ? 0 : accountId.hashCode());
         result = prime * result
                  + ((changedFields == null) ? 0 : changedFields.hashCode());
-        result = prime * result
-                 + ((userToken == null) ? 0 : userToken.hashCode());
         return result;
     }
 
@@ -124,13 +114,6 @@ public class DefaultAccountChangeEvent extends DefaultBusInternalEvent implement
         } else if (!changedFields.equals(other.changedFields)) {
             return false;
         }
-        if (userToken == null) {
-            if (other.userToken != null) {
-                return false;
-            }
-        } else if (!userToken.equals(other.userToken)) {
-            return false;
-        }
         return true;
     }
 
diff --git a/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountCreationEvent.java b/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountCreationEvent.java
index 417eefa..0a1fd13 100644
--- a/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountCreationEvent.java
+++ b/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountCreationEvent.java
@@ -24,13 +24,13 @@ import com.ning.billing.account.api.AccountData;
 import com.ning.billing.account.dao.AccountModelDao;
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.util.events.AccountCreationInternalEvent;
-import com.ning.billing.util.events.DefaultBusInternalEvent;
+import com.ning.billing.util.events.BusEventBase;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultAccountCreationEvent extends DefaultBusInternalEvent implements AccountCreationInternalEvent {
+public class DefaultAccountCreationEvent extends BusEventBase implements AccountCreationInternalEvent {
 
     private final UUID id;
     private final AccountData data;
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
index 19d3852..e355fee 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
@@ -13,6 +13,7 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
+
 package com.ning.billing.beatrix.extbus;
 
 import java.util.UUID;
@@ -23,11 +24,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.ning.billing.ObjectType;
+import com.ning.billing.account.api.Account;
+import com.ning.billing.account.api.AccountApiException;
 import com.ning.billing.beatrix.bus.api.ExternalBus;
-import com.ning.billing.beatrix.extbus.dao.ExtBusEventEntry;
 import com.ning.billing.bus.PersistentBus.EventBusException;
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
 import com.ning.billing.notification.plugin.api.ExtBusEventType;
+import com.ning.billing.tenant.api.TenantUserApi;
 import com.ning.billing.util.Hostname;
 import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.InternalCallContext;
@@ -35,7 +38,9 @@ import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.events.AccountChangeInternalEvent;
 import com.ning.billing.util.events.AccountCreationInternalEvent;
+import com.ning.billing.util.events.BusExternalEvent;
 import com.ning.billing.util.events.BusInternalEvent;
+import com.ning.billing.util.events.BusInternalEvent.BusInternalEventType;
 import com.ning.billing.util.events.ControlTagCreationInternalEvent;
 import com.ning.billing.util.events.ControlTagDeletionInternalEvent;
 import com.ning.billing.util.events.CustomFieldCreationEvent;
@@ -48,7 +53,11 @@ import com.ning.billing.util.events.PaymentInfoInternalEvent;
 import com.ning.billing.util.events.SubscriptionInternalEvent;
 import com.ning.billing.util.events.UserTagCreationInternalEvent;
 import com.ning.billing.util.events.UserTagDeletionInternalEvent;
+import com.ning.billing.util.svcapi.account.AccountInternalApi;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
 import com.google.common.eventbus.Subscribe;
 
 public class BeatrixListener {
@@ -57,154 +66,183 @@ public class BeatrixListener {
 
     private final ExternalBus externalBus;
     private final InternalCallContextFactory internalCallContextFactory;
-    private final String hostname;
+    private final AccountInternalApi accountApi;
+
+
+    protected final ObjectMapper objectMapper;
 
     @Inject
     public BeatrixListener(final ExternalBus externalBus,
-            final InternalCallContextFactory internalCallContextFactory) {
+                           final InternalCallContextFactory internalCallContextFactory,
+                           final AccountInternalApi accountApi) {
         this.externalBus = externalBus;
         this.internalCallContextFactory = internalCallContextFactory;
-        this.hostname = Hostname.get();
+        this.accountApi = accountApi;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.registerModule(new JodaModule());
+        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
     }
 
     @Subscribe
     public void handleAllInternalKillbillEvents(final BusInternalEvent event) {
-        final ExtBusEventEntry externalEvent = computeExtBusEventEntryFromBusInternalEvent(event);
 
-        if (externalEvent != null) {
-            final InternalCallContext internalContext =  internalCallContextFactory.createInternalCallContext(event.getTenantRecordId(), event.getAccountRecordId(), "BeatrixListener", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
-            try {
+        final InternalCallContext internalContext = internalCallContextFactory.createInternalCallContext(event.getTenantRecordId(), event.getAccountRecordId(), "BeatrixListener", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+        try {
+            final BusExternalEvent externalEvent = computeExtBusEventEntryFromBusInternalEvent(event, internalContext);
+            if (externalEvent != null) {
                 ((PersistentExternalBus) externalBus).post(externalEvent, internalContext);
-            } catch (EventBusException e) {
-               log.warn("Failed to dispatch external bus events", e);
             }
+        } catch (EventBusException e) {
+            log.warn("Failed to dispatch external bus events", e);
         }
     }
 
 
-    private ExtBusEventEntry computeExtBusEventEntryFromBusInternalEvent(final BusInternalEvent event) {
+    private BusExternalEvent computeExtBusEventEntryFromBusInternalEvent(final BusInternalEvent event, final InternalCallContext context) {
 
-        ObjectType objectType  = null;
+        ObjectType objectType = null;
         UUID objectId = null;
         ExtBusEventType eventBusType = null;
 
-        switch(event.getBusEventType()) {
-        case ACCOUNT_CREATE:
-            AccountCreationInternalEvent realEventACR = (AccountCreationInternalEvent) event;
-            objectType = ObjectType.ACCOUNT;
-            objectId = realEventACR.getId();
-            eventBusType = ExtBusEventType.ACCOUNT_CREATION;
-            break;
-
-        case ACCOUNT_CHANGE:
-            AccountChangeInternalEvent realEventACH = (AccountChangeInternalEvent) event;
-            objectType = ObjectType.ACCOUNT;
-            objectId = realEventACH.getAccountId();
-            eventBusType = ExtBusEventType.ACCOUNT_CHANGE;
-            break;
-
-        case SUBSCRIPTION_TRANSITION:
-            SubscriptionInternalEvent realEventST  = (SubscriptionInternalEvent) event;
-            objectType = ObjectType.SUBSCRIPTION;
-            objectId = realEventST.getSubscriptionId();
-            if (realEventST.getTransitionType() == SubscriptionTransitionType.CREATE ||
+
+        switch (event.getBusEventType()) {
+            case ACCOUNT_CREATE:
+                AccountCreationInternalEvent realEventACR = (AccountCreationInternalEvent) event;
+                objectType = ObjectType.ACCOUNT;
+                objectId = realEventACR.getId();
+                eventBusType = ExtBusEventType.ACCOUNT_CREATION;
+                break;
+
+            case ACCOUNT_CHANGE:
+                AccountChangeInternalEvent realEventACH = (AccountChangeInternalEvent) event;
+                objectType = ObjectType.ACCOUNT;
+                objectId = realEventACH.getAccountId();
+                eventBusType = ExtBusEventType.ACCOUNT_CHANGE;
+                break;
+
+            case SUBSCRIPTION_TRANSITION:
+                SubscriptionInternalEvent realEventST = (SubscriptionInternalEvent) event;
+                objectType = ObjectType.SUBSCRIPTION;
+                objectId = realEventST.getSubscriptionId();
+                if (realEventST.getTransitionType() == SubscriptionTransitionType.CREATE ||
                     realEventST.getTransitionType() == SubscriptionTransitionType.RE_CREATE ||
                     realEventST.getTransitionType() == SubscriptionTransitionType.TRANSFER ||
                     realEventST.getTransitionType() == SubscriptionTransitionType.MIGRATE_ENTITLEMENT) {
-                eventBusType = ExtBusEventType.SUBSCRIPTION_CREATION;
-            } else if (realEventST.getTransitionType() == SubscriptionTransitionType.CANCEL) {
-                eventBusType = ExtBusEventType.SUBSCRIPTION_CANCEL;
-            } else if (realEventST.getTransitionType() == SubscriptionTransitionType.PHASE) {
-                eventBusType = ExtBusEventType.SUBSCRIPTION_PHASE;
-            } else if (realEventST.getTransitionType() == SubscriptionTransitionType.CHANGE) {
-                eventBusType = ExtBusEventType.SUBSCRIPTION_CHANGE;
-            } else if (realEventST.getTransitionType() == SubscriptionTransitionType.UNCANCEL) {
-                eventBusType = ExtBusEventType.SUBSCRIPTION_UNCANCEL;
-            }
-            break;
-
-        case INVOICE_CREATION:
-            InvoiceCreationInternalEvent realEventInv = (InvoiceCreationInternalEvent) event;
-            objectType = ObjectType.INVOICE;
-            objectId = realEventInv.getInvoiceId();
-            eventBusType = ExtBusEventType.INVOICE_CREATION;
-            break;
-
-        case INVOICE_ADJUSTMENT:
-            InvoiceAdjustmentInternalEvent realEventInvAdj = (InvoiceAdjustmentInternalEvent) event;
-            objectType = ObjectType.INVOICE;
-            objectId = realEventInvAdj.getInvoiceId();
-            eventBusType = ExtBusEventType.INVOICE_ADJUSTMENT;
-            break;
-
-        case PAYMENT_INFO:
-            PaymentInfoInternalEvent realEventPay = (PaymentInfoInternalEvent) event;
-            objectType = ObjectType.PAYMENT;
-            objectId = realEventPay.getPaymentId();
-            eventBusType = ExtBusEventType.PAYMENT_SUCCESS;
-            break;
-
-        case PAYMENT_ERROR:
-            PaymentErrorInternalEvent realEventPayErr = (PaymentErrorInternalEvent) event;
-            objectType = ObjectType.PAYMENT;
-            objectId = realEventPayErr.getPaymentId();
-            eventBusType = ExtBusEventType.PAYMENT_FAILED;
-            break;
-
-        case OVERDUE_CHANGE:
-            OverdueChangeInternalEvent realEventOC = (OverdueChangeInternalEvent) event;
-            // TODO When Killbil supports more than overdue for bundle, this will break...
-            objectType = ObjectType.BUNDLE;
-            objectId = realEventOC.getOverdueObjectId();
-            eventBusType = ExtBusEventType.OVERDUE_CHANGE;
-            break;
-
-       case USER_TAG_CREATION:
-           UserTagCreationInternalEvent realUserTagEventCr = (UserTagCreationInternalEvent) event;
-           objectType = ObjectType.TAG;
-           objectId = realUserTagEventCr.getTagId();
-           eventBusType = ExtBusEventType.TAG_CREATION;
-           break;
-
-       case CONTROL_TAG_CREATION:
-           ControlTagCreationInternalEvent realTagEventCr = (ControlTagCreationInternalEvent) event;
-           objectType = ObjectType.TAG;
-           objectId = realTagEventCr.getTagId();
-           eventBusType = ExtBusEventType.TAG_CREATION;
-           break;
-
-       case USER_TAG_DELETION:
-           UserTagDeletionInternalEvent realUserTagEventDel = (UserTagDeletionInternalEvent) event;
-           objectType = ObjectType.TAG;
-           objectId = realUserTagEventDel.getObjectId();
-           eventBusType = ExtBusEventType.TAG_DELETION;
-           break;
-
-       case CONTROL_TAG_DELETION:
-           ControlTagDeletionInternalEvent realTagEventDel = (ControlTagDeletionInternalEvent) event;
-           objectType = ObjectType.TAG;
-           objectId = realTagEventDel.getTagId();
-           eventBusType = ExtBusEventType.TAG_DELETION;
-           break;
-
-       case CUSTOM_FIELD_CREATION:
-           CustomFieldCreationEvent realCustomEveventCr = (CustomFieldCreationEvent) event;
-           objectType = ObjectType.CUSTOM_FIELD;
-           objectId = realCustomEveventCr.getCustomFieldId();
-           eventBusType = ExtBusEventType.CUSTOM_FIELD_CREATION;
-           break;
-
-       case CUSTOM_FIELD_DELETION:
-           CustomFieldDeletionEvent realCustomEveventDel = (CustomFieldDeletionEvent) event;
-           objectType = ObjectType.CUSTOM_FIELD;
-           objectId = realCustomEveventDel.getCustomFieldId();
-           eventBusType = ExtBusEventType.CUSTOM_FIELD_DELETION;
-           break;
-
-        default:
+                    eventBusType = ExtBusEventType.SUBSCRIPTION_CREATION;
+                } else if (realEventST.getTransitionType() == SubscriptionTransitionType.CANCEL) {
+                    eventBusType = ExtBusEventType.SUBSCRIPTION_CANCEL;
+                } else if (realEventST.getTransitionType() == SubscriptionTransitionType.PHASE) {
+                    eventBusType = ExtBusEventType.SUBSCRIPTION_PHASE;
+                } else if (realEventST.getTransitionType() == SubscriptionTransitionType.CHANGE) {
+                    eventBusType = ExtBusEventType.SUBSCRIPTION_CHANGE;
+                } else if (realEventST.getTransitionType() == SubscriptionTransitionType.UNCANCEL) {
+                    eventBusType = ExtBusEventType.SUBSCRIPTION_UNCANCEL;
+                }
+                break;
+
+            case INVOICE_CREATION:
+                InvoiceCreationInternalEvent realEventInv = (InvoiceCreationInternalEvent) event;
+                objectType = ObjectType.INVOICE;
+                objectId = realEventInv.getInvoiceId();
+                eventBusType = ExtBusEventType.INVOICE_CREATION;
+                break;
+
+            case INVOICE_ADJUSTMENT:
+                InvoiceAdjustmentInternalEvent realEventInvAdj = (InvoiceAdjustmentInternalEvent) event;
+                objectType = ObjectType.INVOICE;
+                objectId = realEventInvAdj.getInvoiceId();
+                eventBusType = ExtBusEventType.INVOICE_ADJUSTMENT;
+                break;
+
+            case PAYMENT_INFO:
+                PaymentInfoInternalEvent realEventPay = (PaymentInfoInternalEvent) event;
+                objectType = ObjectType.PAYMENT;
+                objectId = realEventPay.getPaymentId();
+                eventBusType = ExtBusEventType.PAYMENT_SUCCESS;
+                break;
+
+            case PAYMENT_ERROR:
+                PaymentErrorInternalEvent realEventPayErr = (PaymentErrorInternalEvent) event;
+                objectType = ObjectType.PAYMENT;
+                objectId = realEventPayErr.getPaymentId();
+                eventBusType = ExtBusEventType.PAYMENT_FAILED;
+                break;
+
+            case OVERDUE_CHANGE:
+                OverdueChangeInternalEvent realEventOC = (OverdueChangeInternalEvent) event;
+                // TODO When Killbil supports more than overdue for bundle, this will break...
+                objectType = ObjectType.BUNDLE;
+                objectId = realEventOC.getOverdueObjectId();
+                eventBusType = ExtBusEventType.OVERDUE_CHANGE;
+                break;
+
+            case USER_TAG_CREATION:
+                UserTagCreationInternalEvent realUserTagEventCr = (UserTagCreationInternalEvent) event;
+                objectType = ObjectType.TAG;
+                objectId = realUserTagEventCr.getTagId();
+                eventBusType = ExtBusEventType.TAG_CREATION;
+                break;
+
+            case CONTROL_TAG_CREATION:
+                ControlTagCreationInternalEvent realTagEventCr = (ControlTagCreationInternalEvent) event;
+                objectType = ObjectType.TAG;
+                objectId = realTagEventCr.getTagId();
+                eventBusType = ExtBusEventType.TAG_CREATION;
+                break;
+
+            case USER_TAG_DELETION:
+                UserTagDeletionInternalEvent realUserTagEventDel = (UserTagDeletionInternalEvent) event;
+                objectType = ObjectType.TAG;
+                objectId = realUserTagEventDel.getObjectId();
+                eventBusType = ExtBusEventType.TAG_DELETION;
+                break;
+
+            case CONTROL_TAG_DELETION:
+                ControlTagDeletionInternalEvent realTagEventDel = (ControlTagDeletionInternalEvent) event;
+                objectType = ObjectType.TAG;
+                objectId = realTagEventDel.getTagId();
+                eventBusType = ExtBusEventType.TAG_DELETION;
+                break;
+
+            case CUSTOM_FIELD_CREATION:
+                CustomFieldCreationEvent realCustomEveventCr = (CustomFieldCreationEvent) event;
+                objectType = ObjectType.CUSTOM_FIELD;
+                objectId = realCustomEveventCr.getCustomFieldId();
+                eventBusType = ExtBusEventType.CUSTOM_FIELD_CREATION;
+                break;
+
+            case CUSTOM_FIELD_DELETION:
+                CustomFieldDeletionEvent realCustomEveventDel = (CustomFieldDeletionEvent) event;
+                objectType = ObjectType.CUSTOM_FIELD;
+                objectId = realCustomEveventDel.getCustomFieldId();
+                eventBusType = ExtBusEventType.CUSTOM_FIELD_DELETION;
+                break;
+
+            default:
         }
+
+        final UUID accountId = getAccountIdFromRecordId(event.getBusEventType(), objectId, event.getAccountRecordId(), context);
+        final UUID tenantId = context.toTenantContext().getTenantId();
+
         return eventBusType != null ?
-                new ExtBusEventEntry(hostname, objectType, objectId, event.getUserToken(), eventBusType, event.getAccountRecordId(), event.getAccountRecordId()) : null;
+               new DefaultBusExternalEvent(objectId, objectType, eventBusType, event.getUserToken(), accountId, tenantId) :
+               null;
     }
 
+    private final UUID getAccountIdFromRecordId(final BusInternalEventType eventType, final UUID objectId, final Long recordId, final InternalCallContext context) {
+
+        // accountRecord_id is not set for ACCOUNT_CREATE event as we are in the transaction and value is known yet
+        if (eventType == BusInternalEventType.ACCOUNT_CREATE) {
+            return objectId;
+        }
+        try {
+            final Account account = accountApi.getAccountByRecordId(recordId, context);
+            return account.getId();
+        } catch (final AccountApiException e) {
+            log.warn("Failed to retrieve acount from recordId {}", recordId);
+            return null;
+        }
+    }
+
+
 }
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/DefaultBusExternalEvent.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/DefaultBusExternalEvent.java
new file mode 100644
index 0000000..46717a7
--- /dev/null
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/DefaultBusExternalEvent.java
@@ -0,0 +1,97 @@
+package com.ning.billing.beatrix.extbus;
+
+import java.util.UUID;
+
+import com.ning.billing.ObjectType;
+import com.ning.billing.notification.plugin.api.ExtBusEventType;
+import com.ning.billing.util.events.BusEventBase;
+import com.ning.billing.util.events.BusExternalEvent;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class DefaultBusExternalEvent extends BusEventBase implements BusExternalEvent {
+
+    private final UUID objectId;
+    private final UUID accountId;
+    private final UUID tenantId;
+    private final ObjectType objectType;
+    private final ExtBusEventType busEventType;
+
+
+    @JsonCreator
+    public DefaultBusExternalEvent(@JsonProperty("objectId") final UUID objectId,
+                                   @JsonProperty("objectType") final ObjectType objectType,
+                                   @JsonProperty("busEventType") final ExtBusEventType busEventType,
+                                   @JsonProperty("userToken") final UUID userToken,
+                                   @JsonProperty("accountId") final UUID accountId,
+                                   @JsonProperty("tenantId") final UUID tenantId) {
+        super(userToken, null, null);
+        this.busEventType = busEventType;
+        this.objectType = objectType;
+        this.objectId = objectId;
+        this.accountId = accountId;
+        this.tenantId = tenantId;
+    }
+
+    public UUID getObjectId() {
+        return objectId;
+    }
+
+    public UUID getAccountId() {
+        return accountId;
+    }
+
+    public UUID getTenantId() {
+        return tenantId;
+    }
+
+    public ObjectType getObjectType() {
+        return objectType;
+    }
+
+    @Override
+    public ExtBusEventType getBusEventType() {
+        return busEventType;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof DefaultBusExternalEvent)) {
+            return false;
+        }
+
+        final DefaultBusExternalEvent that = (DefaultBusExternalEvent) o;
+
+        if (accountId != null ? !accountId.equals(that.accountId) : that.accountId != null) {
+            return false;
+        }
+        if (busEventType != that.busEventType) {
+            return false;
+        }
+        if (objectId != null ? !objectId.equals(that.objectId) : that.objectId != null) {
+            return false;
+        }
+        if (objectType != that.objectType) {
+            return false;
+        }
+        if (tenantId != null ? !tenantId.equals(that.tenantId) : that.tenantId != null) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = objectId != null ? objectId.hashCode() : 0;
+        result = 31 * result + (accountId != null ? accountId.hashCode() : 0);
+        result = 31 * result + (tenantId != null ? tenantId.hashCode() : 0);
+        result = 31 * result + objectType.hashCode();
+        result = 31 * result + busEventType.hashCode();
+        return result;
+    }
+}
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
index b01518d..9da5d7f 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
@@ -16,10 +16,9 @@
 
 package com.ning.billing.beatrix.extbus;
 
-import java.util.Collections;
 import java.util.Date;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
@@ -27,26 +26,21 @@ import org.skife.jdbi.v2.IDBI;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.ning.billing.ObjectType;
-import com.ning.billing.account.api.Account;
-import com.ning.billing.account.api.AccountApiException;
 import com.ning.billing.beatrix.bus.api.ExternalBus;
-import com.ning.billing.beatrix.extbus.dao.ExtBusEventEntry;
-import com.ning.billing.beatrix.extbus.dao.ExtBusSqlDao;
+import com.ning.billing.bus.BusPersistentEvent;
 import com.ning.billing.bus.PersistentBus.EventBusException;
 import com.ning.billing.bus.PersistentBusConfig;
-import com.ning.billing.notification.plugin.api.ExtBusEvent;
-import com.ning.billing.notification.plugin.api.ExtBusEventType;
+import com.ning.billing.bus.dao.BusEventEntry;
+import com.ning.billing.bus.dao.PersistentBusSqlDao;
 import com.ning.billing.queue.PersistentQueueBase;
 import com.ning.billing.util.Hostname;
 import com.ning.billing.util.bus.DefaultBusService;
-import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
-import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.eventbus.EventBus;
 import com.google.inject.Inject;
 
@@ -57,13 +51,10 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
 
     private static final Logger log = LoggerFactory.getLogger(PersistentExternalBus.class);
 
-    private final ExtBusSqlDao dao;
+    private final PersistentBusSqlDao dao;
 
     private final EventBusDelegate eventBusDelegate;
     private final Clock clock;
-    private final String hostname;
-    private final InternalCallContextFactory internalCallContextFactory;
-    private final AccountInternalApi accountApi;
 
     private static final class EventBusDelegate extends EventBus {
 
@@ -82,70 +73,52 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
                                   DefaultBusService.EVENT_BUS_TH_NAME);
             }
         }), config.getNbThreads(), config);
-        this.dao = dbi.onDemand(ExtBusSqlDao.class);
+        this.dao = dbi.onDemand(PersistentBusSqlDao.class);
         this.clock = clock;
         this.eventBusDelegate = new EventBusDelegate("Killbill EventBus");
-        this.hostname = Hostname.get();
-        this.internalCallContextFactory = internalCallContextFactory;
-        this.accountApi = accountApi;
     }
 
     @Override
     public int doProcessEvents() {
 
-        // TODO API_FIX Retrieving and clearing bus events is not done per tenant so pass default INTERNAL_TENANT_RECORD_ID; not sure this is something we want to do anyway ?
-        final InternalCallContext context = internalCallContextFactory.createInternalCallContext(InternalCallContextFactory.INTERNAL_TENANT_RECORD_ID, null, "ExtPersistentBus", CallOrigin.INTERNAL, UserType.SYSTEM, null);
-        final List<ExtBusEventEntry> events = getNextBusEvent(context);
+        final List<BusEventEntry> events = getNextBusEvent();
         if (events.size() == 0) {
             return 0;
         }
 
         int result = 0;
-        for (final ExtBusEventEntry cur : events) {
-            // The accountRecordId for a newly created account is not set
-            final UUID accountId;
-            if (cur.getObjectType() == ObjectType.ACCOUNT && cur.getExtBusType() == ExtBusEventType.ACCOUNT_CREATION) {
-                accountId = cur.getObjectId();
-            } else {
-                accountId = getAccountIdFromRecordId(cur.getAccountRecordId(), context);
-            }
-            final ExtBusEvent event = new DefaultBusEvent(cur.getExtBusType(), cur.getObjectType(), cur.getObjectId(), accountId, null);
+        for (final BusEventEntry cur : events) {
+            final String jsonWithAccountAndTenantRecorId = tweakJsonToIncludeAccountAndTenantRecordId(cur.getBusEventJson(), cur.getAccountRecordId(), cur.getTenantRecordId());
+            final BusPersistentEvent evt = deserializeEvent(cur.getBusEventClass(), objectMapper, jsonWithAccountAndTenantRecorId);
+
+            //TODO STEPH needs to be fixed with accountId and tenantId...
+
+            //final UUID accountId = getAccountIdFromRecordId(evt.getAccountRecordId());
+            //final BusPersistentEvent evtWithAccountAndTenantId = new BusPersistentEvent(evt, );
             result++;
             // STEPH exception handling is done by GUAVA-- logged a bug Issue-780
-            eventBusDelegate.post(event);
-            final InternalCallContext rehydratedContext = internalCallContextFactory.createInternalCallContext(cur.getTenantRecordId(), cur.getAccountRecordId(), context);
-            dao.clearBusExtEvent(cur.getId(), hostname, rehydratedContext);
+            eventBusDelegate.post(evt);
+            dao.clearBusEvent(cur.getId(), com.ning.billing.Hostname.get());
         }
         return result;
     }
 
-    private final UUID getAccountIdFromRecordId(final Long recordId, final InternalCallContext context) {
-        try {
-            final Account account = accountApi.getAccountByRecordId(recordId, context);
-            return account.getId();
-        } catch (final AccountApiException e) {
-            log.warn("Failed to retrieve acount from recordId {}", recordId);
-            return null;
-        }
-    }
 
-    private List<ExtBusEventEntry> getNextBusEvent(final InternalCallContext context) {
+    private List<BusEventEntry> getNextBusEvent() {
+
         final Date now = clock.getUTCNow().toDate();
         final Date nextAvailable = clock.getUTCNow().plus(DELTA_IN_PROCESSING_TIME_MS).toDate();
 
-        final ExtBusEventEntry input = dao.getNextBusExtEventEntry(MAX_BUS_EVENTS, hostname, now, context);
-        if (input == null) {
-            return Collections.emptyList();
-        }
-
-        // We need to re-hydrate the context with the record ids from the ExtBusEventEntry
-        final InternalCallContext rehydratedContext = internalCallContextFactory.createInternalCallContext(input.getTenantRecordId(), input.getAccountRecordId(), context);
-        final boolean claimed = (dao.claimBusExtEvent(hostname, nextAvailable, input.getId(), now, rehydratedContext) == 1);
-        if (claimed) {
-            dao.insertClaimedExtHistory(hostname, now, input.getId(), rehydratedContext);
-            return Collections.singletonList(input);
+        final List<BusEventEntry> entries = dao.getNextBusEventEntries(config.getPrefetchAmount(), com.ning.billing.Hostname.get(), now);
+        final List<BusEventEntry> claimedEntries = new LinkedList<BusEventEntry>();
+        for (final BusEventEntry entry : entries) {
+            final boolean claimed = (dao.claimBusEvent(com.ning.billing.Hostname.get(), nextAvailable, entry.getId(), now) == 1);
+            if (claimed) {
+                dao.insertClaimedHistory(com.ning.billing.Hostname.get(), now, entry.getId(), entry.getAccountRecordId(), entry.getTenantRecordId());
+                claimedEntries.add(entry);
+            }
         }
-        return Collections.emptyList();
+        return claimedEntries;
     }
 
     @Override
@@ -158,7 +131,27 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
         eventBusDelegate.unregister(handlerInstance);
     }
 
-    public void post(final ExtBusEventEntry event, final InternalCallContext context) throws EventBusException {
-        dao.insertBusExtEvent(event, context);
+    public void post(final BusPersistentEvent event, final InternalCallContext context) throws EventBusException {
+
+        final String json;
+        try {
+            json = objectMapper.writeValueAsString(event);
+            final BusEventEntry entry = new BusEventEntry(com.ning.billing.Hostname.get(), event.getClass().getName(), json, event.getUserToken(), event.getAccountRecordId(), event.getTenantRecordId());
+            dao.insertBusEvent(entry);
+
+        } catch (JsonProcessingException e) {
+            throw new EventBusException("Failed to serialize ext bus event", e);
+        }
+    }
+
+    private String tweakJsonToIncludeAccountAndTenantRecordId(final String input, final Long accountRecordId, final Long tenantRecordId) {
+        final int lastIndexPriorFinalBracket = input.lastIndexOf("}");
+        final StringBuilder tmp = new StringBuilder(input.substring(0, lastIndexPriorFinalBracket));
+        tmp.append(",\"accountRecordId\":");
+        tmp.append(accountRecordId);
+        tmp.append(",\"tenantRecordId\":");
+        tmp.append(tenantRecordId);
+        tmp.append("}");
+        return tmp.toString();
     }
 }
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/extbus/TestEventJson.java b/beatrix/src/test/java/com/ning/billing/beatrix/extbus/TestEventJson.java
new file mode 100644
index 0000000..97c93fd
--- /dev/null
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/extbus/TestEventJson.java
@@ -0,0 +1,34 @@
+package com.ning.billing.beatrix.extbus;
+
+import java.util.UUID;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.ObjectType;
+import com.ning.billing.notification.plugin.api.ExtBusEventType;
+import com.ning.billing.payment.PaymentTestSuiteNoDB;
+import com.ning.billing.util.jackson.ObjectMapper;
+
+public class TestEventJson extends PaymentTestSuiteNoDB {
+
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    @Test(groups = "fast")
+    public void testBusExternalEvent() throws Exception {
+        final UUID objectId = UUID.randomUUID();
+        final UUID userToken = UUID.randomUUID();
+        final UUID accountId = UUID.randomUUID();
+        final UUID tenantId = UUID.randomUUID();
+        final ObjectType objectType = ObjectType.ACCOUNT;
+        final ExtBusEventType extBusEventType = ExtBusEventType.ACCOUNT_CREATION;
+
+        final DefaultBusExternalEvent e = new DefaultBusExternalEvent(objectId, objectType, extBusEventType, userToken, accountId, tenantId);
+        final String json = mapper.writeValueAsString(e);
+
+        final Class<?> claz = Class.forName(DefaultBusExternalEvent.class.getName());
+        final Object obj = mapper.readValue(json, claz);
+        Assert.assertTrue(obj.equals(e));
+    }
+
+}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultRepairEntitlementEvent.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultRepairEntitlementEvent.java
index 3da90ec..9084606 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultRepairEntitlementEvent.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultRepairEntitlementEvent.java
@@ -19,14 +19,14 @@ import java.util.UUID;
 
 import org.joda.time.DateTime;
 
-import com.ning.billing.util.events.DefaultBusInternalEvent;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.RepairEntitlementInternalEvent;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultRepairEntitlementEvent extends DefaultBusInternalEvent implements RepairEntitlementInternalEvent {
+public class DefaultRepairEntitlementEvent extends BusEventBase implements RepairEntitlementInternalEvent {
 
     private final UUID bundleId;
     private final UUID accountId;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionEvent.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionEvent.java
index 4a980f2..9c740c2 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionEvent.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionEvent.java
@@ -21,14 +21,14 @@ import java.util.UUID;
 import org.joda.time.DateTime;
 
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
-import com.ning.billing.util.events.DefaultBusInternalEvent;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.SubscriptionInternalEvent;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public abstract class DefaultSubscriptionEvent extends DefaultBusInternalEvent implements SubscriptionInternalEvent {
+public abstract class DefaultSubscriptionEvent extends BusEventBase implements SubscriptionInternalEvent {
 
     private final Long totalOrdering;
     private final UUID subscriptionId;
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceAdjustmentEvent.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceAdjustmentEvent.java
index a5b6c69..0c780ad 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceAdjustmentEvent.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceAdjustmentEvent.java
@@ -18,14 +18,14 @@ package com.ning.billing.invoice.api.user;
 
 import java.util.UUID;
 
-import com.ning.billing.util.events.DefaultBusInternalEvent;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.InvoiceAdjustmentInternalEvent;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultInvoiceAdjustmentEvent extends DefaultBusInternalEvent implements InvoiceAdjustmentInternalEvent {
+public class DefaultInvoiceAdjustmentEvent extends BusEventBase implements InvoiceAdjustmentInternalEvent {
 
     private final UUID invoiceId;
     private final UUID accountId;
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceCreationEvent.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceCreationEvent.java
index e7f8cd0..caa08af 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceCreationEvent.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceCreationEvent.java
@@ -20,14 +20,14 @@ import java.math.BigDecimal;
 import java.util.UUID;
 
 import com.ning.billing.catalog.api.Currency;
-import com.ning.billing.util.events.DefaultBusInternalEvent;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.InvoiceCreationInternalEvent;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultInvoiceCreationEvent extends DefaultBusInternalEvent implements InvoiceCreationInternalEvent {
+public class DefaultInvoiceCreationEvent extends BusEventBase implements InvoiceCreationInternalEvent {
 
     private final UUID invoiceId;
     private final UUID accountId;
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultNullInvoiceEvent.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultNullInvoiceEvent.java
index 3fea8d4..9d646e8 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultNullInvoiceEvent.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultNullInvoiceEvent.java
@@ -20,14 +20,14 @@ import java.util.UUID;
 
 import org.joda.time.LocalDate;
 
-import com.ning.billing.util.events.DefaultBusInternalEvent;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.NullInvoiceInternalEvent;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultNullInvoiceEvent extends DefaultBusInternalEvent implements NullInvoiceInternalEvent {
+public class DefaultNullInvoiceEvent extends BusEventBase implements NullInvoiceInternalEvent {
 
     private final UUID accountId;
     private final LocalDate processingDate;
diff --git a/overdue/src/main/java/com/ning/billing/overdue/applicator/DefaultOverdueChangeEvent.java b/overdue/src/main/java/com/ning/billing/overdue/applicator/DefaultOverdueChangeEvent.java
index b589cae..2c98826 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/applicator/DefaultOverdueChangeEvent.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/applicator/DefaultOverdueChangeEvent.java
@@ -19,14 +19,14 @@ package com.ning.billing.overdue.applicator;
 import java.util.UUID;
 
 import com.ning.billing.junction.api.Type;
-import com.ning.billing.util.events.DefaultBusInternalEvent;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.OverdueChangeInternalEvent;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultOverdueChangeEvent extends DefaultBusInternalEvent implements OverdueChangeInternalEvent {
+public class DefaultOverdueChangeEvent extends BusEventBase implements OverdueChangeInternalEvent {
 
     private final UUID overdueObjectId;
     private final Type overdueObjectType;
diff --git a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentErrorEvent.java b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentErrorEvent.java
index bbe9d39..af7ea49 100644
--- a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentErrorEvent.java
+++ b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentErrorEvent.java
@@ -18,7 +18,7 @@ package com.ning.billing.payment.api;
 
 import java.util.UUID;
 
-import com.ning.billing.util.events.DefaultBusInternalEvent;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.PaymentErrorInternalEvent;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -27,7 +27,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "error")
-public class DefaultPaymentErrorEvent extends DefaultBusInternalEvent implements PaymentErrorInternalEvent {
+public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErrorInternalEvent {
 
     private final UUID id;
     private final String message;
diff --git a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentInfoEvent.java b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentInfoEvent.java
index f78eb2d..d0c7212 100644
--- a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentInfoEvent.java
+++ b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentInfoEvent.java
@@ -21,14 +21,14 @@ import java.util.UUID;
 
 import org.joda.time.DateTime;
 
-import com.ning.billing.util.events.DefaultBusInternalEvent;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.PaymentInfoInternalEvent;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultPaymentInfoEvent extends DefaultBusInternalEvent implements PaymentInfoInternalEvent {
+public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfoInternalEvent {
 
     private final UUID accountId;
     private final UUID invoiceId;
diff --git a/util/src/main/java/com/ning/billing/util/customfield/api/DefaultCustomFieldCreationEvent.java b/util/src/main/java/com/ning/billing/util/customfield/api/DefaultCustomFieldCreationEvent.java
index fe085d6..9346ef9 100644
--- a/util/src/main/java/com/ning/billing/util/customfield/api/DefaultCustomFieldCreationEvent.java
+++ b/util/src/main/java/com/ning/billing/util/customfield/api/DefaultCustomFieldCreationEvent.java
@@ -19,15 +19,14 @@ package com.ning.billing.util.customfield.api;
 import java.util.UUID;
 
 import com.ning.billing.ObjectType;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.CustomFieldCreationEvent;
-import com.ning.billing.util.events.DefaultBusInternalEvent;
-import com.ning.billing.util.tag.TagDefinition;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultCustomFieldCreationEvent extends DefaultBusInternalEvent implements CustomFieldCreationEvent {
+public class DefaultCustomFieldCreationEvent extends BusEventBase implements CustomFieldCreationEvent {
 
     private final UUID customFieldId;
     private final UUID objectId;
diff --git a/util/src/main/java/com/ning/billing/util/customfield/api/DefaultCustomFieldDeletionEvent.java b/util/src/main/java/com/ning/billing/util/customfield/api/DefaultCustomFieldDeletionEvent.java
index b17adb4..fe3bf32 100644
--- a/util/src/main/java/com/ning/billing/util/customfield/api/DefaultCustomFieldDeletionEvent.java
+++ b/util/src/main/java/com/ning/billing/util/customfield/api/DefaultCustomFieldDeletionEvent.java
@@ -19,14 +19,14 @@ package com.ning.billing.util.customfield.api;
 import java.util.UUID;
 
 import com.ning.billing.ObjectType;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.CustomFieldDeletionEvent;
-import com.ning.billing.util.events.DefaultBusInternalEvent;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultCustomFieldDeletionEvent extends DefaultBusInternalEvent implements CustomFieldDeletionEvent {
+public class DefaultCustomFieldDeletionEvent extends BusEventBase implements CustomFieldDeletionEvent {
 
     private final UUID customFieldId;
     private final UUID objectId;
diff --git a/util/src/main/java/com/ning/billing/util/events/BusExternalEvent.java b/util/src/main/java/com/ning/billing/util/events/BusExternalEvent.java
new file mode 100644
index 0000000..400f52d
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/events/BusExternalEvent.java
@@ -0,0 +1,9 @@
+package com.ning.billing.util.events;
+
+import com.ning.billing.bus.BusPersistentEvent;
+import com.ning.billing.notification.plugin.api.ExtBusEventType;
+
+public interface BusExternalEvent extends BusPersistentEvent {
+
+    public ExtBusEventType getBusEventType();
+}
diff --git a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagCreationEvent.java b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagCreationEvent.java
index 16191e7..19d53f6 100644
--- a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagCreationEvent.java
+++ b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagCreationEvent.java
@@ -19,15 +19,15 @@ package com.ning.billing.util.tag.api.user;
 import java.util.UUID;
 
 import com.ning.billing.ObjectType;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.ControlTagCreationInternalEvent;
-import com.ning.billing.util.events.DefaultBusInternalEvent;
 import com.ning.billing.util.tag.TagDefinition;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultControlTagCreationEvent extends DefaultBusInternalEvent implements ControlTagCreationInternalEvent {
+public class DefaultControlTagCreationEvent extends BusEventBase implements ControlTagCreationInternalEvent {
 
     private final UUID tagId;
     private final UUID objectId;
diff --git a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagDefinitionCreationEvent.java b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagDefinitionCreationEvent.java
index 3a83d5e..cdaaa31 100644
--- a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagDefinitionCreationEvent.java
+++ b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagDefinitionCreationEvent.java
@@ -18,15 +18,15 @@ package com.ning.billing.util.tag.api.user;
 
 import java.util.UUID;
 
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.ControlTagDefinitionCreationInternalEvent;
-import com.ning.billing.util.events.DefaultBusInternalEvent;
 import com.ning.billing.util.tag.TagDefinition;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultControlTagDefinitionCreationEvent extends DefaultBusInternalEvent implements ControlTagDefinitionCreationInternalEvent {
+public class DefaultControlTagDefinitionCreationEvent extends BusEventBase implements ControlTagDefinitionCreationInternalEvent {
 
     private final UUID tagDefinitionId;
     private final TagDefinition tagDefinition;
diff --git a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagDefinitionDeletionEvent.java b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagDefinitionDeletionEvent.java
index fa93c78..186666b 100644
--- a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagDefinitionDeletionEvent.java
+++ b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagDefinitionDeletionEvent.java
@@ -18,15 +18,15 @@ package com.ning.billing.util.tag.api.user;
 
 import java.util.UUID;
 
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.ControlTagDefinitionDeletionInternalEvent;
-import com.ning.billing.util.events.DefaultBusInternalEvent;
 import com.ning.billing.util.tag.TagDefinition;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultControlTagDefinitionDeletionEvent extends DefaultBusInternalEvent implements ControlTagDefinitionDeletionInternalEvent {
+public class DefaultControlTagDefinitionDeletionEvent extends BusEventBase implements ControlTagDefinitionDeletionInternalEvent {
 
     private final UUID tagDefinitionId;
     private final TagDefinition tagDefinition;
diff --git a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagDeletionEvent.java b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagDeletionEvent.java
index 74d6f40..320f03b 100644
--- a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagDeletionEvent.java
+++ b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultControlTagDeletionEvent.java
@@ -19,15 +19,15 @@ package com.ning.billing.util.tag.api.user;
 import java.util.UUID;
 
 import com.ning.billing.ObjectType;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.ControlTagDeletionInternalEvent;
-import com.ning.billing.util.events.DefaultBusInternalEvent;
 import com.ning.billing.util.tag.TagDefinition;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultControlTagDeletionEvent extends DefaultBusInternalEvent implements ControlTagDeletionInternalEvent {
+public class DefaultControlTagDeletionEvent extends BusEventBase implements ControlTagDeletionInternalEvent {
     private final UUID tagId;
     final UUID objectId;
     final ObjectType objectType;
diff --git a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagCreationEvent.java b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagCreationEvent.java
index 5ab8721..f80d7cb 100644
--- a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagCreationEvent.java
+++ b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagCreationEvent.java
@@ -19,7 +19,7 @@ package com.ning.billing.util.tag.api.user;
 import java.util.UUID;
 
 import com.ning.billing.ObjectType;
-import com.ning.billing.util.events.DefaultBusInternalEvent;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.UserTagCreationInternalEvent;
 import com.ning.billing.util.tag.TagDefinition;
 
@@ -27,7 +27,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultUserTagCreationEvent extends DefaultBusInternalEvent implements UserTagCreationInternalEvent {
+public class DefaultUserTagCreationEvent extends BusEventBase implements UserTagCreationInternalEvent {
     private final UUID tagId;
     private final UUID objectId;
     private final ObjectType objectType;
diff --git a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagDefinitionCreationEvent.java b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagDefinitionCreationEvent.java
index 7287e56..869cb89 100644
--- a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagDefinitionCreationEvent.java
+++ b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagDefinitionCreationEvent.java
@@ -18,7 +18,7 @@ package com.ning.billing.util.tag.api.user;
 
 import java.util.UUID;
 
-import com.ning.billing.util.events.DefaultBusInternalEvent;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.UserTagDefinitionCreationInternalEvent;
 import com.ning.billing.util.tag.TagDefinition;
 
@@ -26,7 +26,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultUserTagDefinitionCreationEvent extends DefaultBusInternalEvent implements UserTagDefinitionCreationInternalEvent {
+public class DefaultUserTagDefinitionCreationEvent extends BusEventBase implements UserTagDefinitionCreationInternalEvent {
 
     private final UUID tagDefinitionId;
     private final TagDefinition tagDefinition;
diff --git a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagDefinitionDeletionEvent.java b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagDefinitionDeletionEvent.java
index ad6b60e..66043ec 100644
--- a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagDefinitionDeletionEvent.java
+++ b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagDefinitionDeletionEvent.java
@@ -18,7 +18,7 @@ package com.ning.billing.util.tag.api.user;
 
 import java.util.UUID;
 
-import com.ning.billing.util.events.DefaultBusInternalEvent;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.UserTagDefinitionDeletionInternalEvent;
 import com.ning.billing.util.tag.TagDefinition;
 
@@ -26,7 +26,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultUserTagDefinitionDeletionEvent extends DefaultBusInternalEvent implements UserTagDefinitionDeletionInternalEvent {
+public class DefaultUserTagDefinitionDeletionEvent extends BusEventBase implements UserTagDefinitionDeletionInternalEvent {
 
     private final UUID tagDefinitionId;
     private final TagDefinition tagDefinition;
diff --git a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagDeletionEvent.java b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagDeletionEvent.java
index 31d8b8f..7b006a5 100644
--- a/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagDeletionEvent.java
+++ b/util/src/main/java/com/ning/billing/util/tag/api/user/DefaultUserTagDeletionEvent.java
@@ -19,7 +19,7 @@ package com.ning.billing.util.tag.api.user;
 import java.util.UUID;
 
 import com.ning.billing.ObjectType;
-import com.ning.billing.util.events.DefaultBusInternalEvent;
+import com.ning.billing.util.events.BusEventBase;
 import com.ning.billing.util.events.UserTagDeletionInternalEvent;
 import com.ning.billing.util.tag.TagDefinition;
 
@@ -27,7 +27,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultUserTagDeletionEvent extends DefaultBusInternalEvent implements UserTagDeletionInternalEvent {
+public class DefaultUserTagDeletionEvent extends BusEventBase implements UserTagDeletionInternalEvent {
     private final UUID tagId;
     private final UUID objectId;
     private final ObjectType objectType;