killbill-memoizeit

Add support for new busExtEvents TENANT_CONFIG* Code review

3/9/2015 2:22:29 PM

Changes

beatrix/pom.xml 4(+4 -0)

Details

diff --git a/api/src/main/java/org/killbill/billing/events/BusInternalEvent.java b/api/src/main/java/org/killbill/billing/events/BusInternalEvent.java
index 00d615e..66f849e 100644
--- a/api/src/main/java/org/killbill/billing/events/BusInternalEvent.java
+++ b/api/src/main/java/org/killbill/billing/events/BusInternalEvent.java
@@ -44,7 +44,9 @@ public interface BusInternalEvent extends BusEvent {
         USER_TAGDEFINITION_CREATION,
         USER_TAGDEFINITION_DELETION,
         USER_TAG_CREATION,
-        USER_TAG_DELETION
+        USER_TAG_DELETION,
+        TENANT_CONFIG_CHANGE,
+        TENANT_CONFIG_DELETION;
     }
 
     public BusInternalEventType getBusEventType();
diff --git a/api/src/main/java/org/killbill/billing/events/TenantConfigChangeInternalEvent.java b/api/src/main/java/org/killbill/billing/events/TenantConfigChangeInternalEvent.java
new file mode 100644
index 0000000..f7e19bf
--- /dev/null
+++ b/api/src/main/java/org/killbill/billing/events/TenantConfigChangeInternalEvent.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.events;
+
+import java.util.UUID;
+
+public interface TenantConfigChangeInternalEvent extends BusInternalEvent {
+    UUID getId();
+    String getKey();
+}
diff --git a/api/src/main/java/org/killbill/billing/events/TenantConfigDeletionInternalEvent.java b/api/src/main/java/org/killbill/billing/events/TenantConfigDeletionInternalEvent.java
new file mode 100644
index 0000000..2acfb0e
--- /dev/null
+++ b/api/src/main/java/org/killbill/billing/events/TenantConfigDeletionInternalEvent.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.events;
+
+public interface TenantConfigDeletionInternalEvent extends BusInternalEvent{
+    String getKey();
+}

beatrix/pom.xml 4(+4 -0)

diff --git a/beatrix/pom.xml b/beatrix/pom.xml
index f70c933..245b869 100644
--- a/beatrix/pom.xml
+++ b/beatrix/pom.xml
@@ -206,6 +206,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.kill-bill.billing.plugin</groupId>
+            <artifactId>killbill-plugin-api-notification</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.kill-bill.commons</groupId>
             <artifactId>killbill-clock</artifactId>
         </dependency>
diff --git a/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/BeatrixListener.java b/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/BeatrixListener.java
index 84391b4..73123dd 100644
--- a/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/BeatrixListener.java
+++ b/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/BeatrixListener.java
@@ -43,6 +43,8 @@ import org.killbill.billing.events.PaymentErrorInternalEvent;
 import org.killbill.billing.events.PaymentInfoInternalEvent;
 import org.killbill.billing.events.PaymentPluginErrorInternalEvent;
 import org.killbill.billing.events.SubscriptionInternalEvent;
+import org.killbill.billing.events.TenantConfigChangeInternalEvent;
+import org.killbill.billing.events.TenantConfigDeletionInternalEvent;
 import org.killbill.billing.events.UserTagCreationInternalEvent;
 import org.killbill.billing.events.UserTagDeletionInternalEvent;
 import org.killbill.billing.lifecycle.glue.BusModule;
@@ -101,6 +103,7 @@ public class BeatrixListener {
         ObjectType objectType = null;
         UUID objectId = null;
         ExtBusEventType eventBusType = null;
+        String metaData = null;
 
         UUID accountId = null;
         switch (event.getBusEventType()) {
@@ -221,19 +224,35 @@ public class BeatrixListener {
                 break;
 
             case CUSTOM_FIELD_CREATION:
-                final CustomFieldCreationEvent realCustomEveventCr = (CustomFieldCreationEvent) event;
+                final CustomFieldCreationEvent realCustomFieldEventCr = (CustomFieldCreationEvent) event;
                 objectType = ObjectType.CUSTOM_FIELD;
-                objectId = realCustomEveventCr.getCustomFieldId();
+                objectId = realCustomFieldEventCr.getCustomFieldId();
                 eventBusType = ExtBusEventType.CUSTOM_FIELD_CREATION;
                 break;
 
             case CUSTOM_FIELD_DELETION:
-                final CustomFieldDeletionEvent realCustomEveventDel = (CustomFieldDeletionEvent) event;
+                final CustomFieldDeletionEvent realCustomFieldEventDel = (CustomFieldDeletionEvent) event;
                 objectType = ObjectType.CUSTOM_FIELD;
-                objectId = realCustomEveventDel.getCustomFieldId();
+                objectId = realCustomFieldEventDel.getCustomFieldId();
                 eventBusType = ExtBusEventType.CUSTOM_FIELD_DELETION;
                 break;
 
+            case TENANT_CONFIG_CHANGE:
+                final TenantConfigChangeInternalEvent realTenantConfigEventChg = (TenantConfigChangeInternalEvent) event;
+                objectType = ObjectType.TENANT_KVS;
+                objectId = realTenantConfigEventChg.getId();
+                eventBusType = ExtBusEventType.TENANT_CONFIG_CHANGE;
+                metaData = realTenantConfigEventChg.getKey();
+                break;
+
+            case TENANT_CONFIG_DELETION:
+                final TenantConfigDeletionInternalEvent realTenantConfigEventDel = (TenantConfigDeletionInternalEvent) event;
+                objectType = ObjectType.TENANT_KVS;
+                objectId = null;
+                eventBusType = ExtBusEventType.TENANT_CONFIG_DELETION;
+                metaData = realTenantConfigEventDel.getKey();
+                break;
+
             default:
         }
 
@@ -244,7 +263,7 @@ public class BeatrixListener {
                     accountId;
 
         return eventBusType != null ?
-               new DefaultBusExternalEvent(objectId, objectType, eventBusType, accountId, tenantContext.getTenantId(), context.getAccountRecordId(), context.getTenantRecordId(), context.getUserToken()) :
+               new DefaultBusExternalEvent(objectId, objectType, eventBusType, accountId, tenantContext.getTenantId(), metaData, context.getAccountRecordId(), context.getTenantRecordId(), context.getUserToken()) :
                null;
     }
 
@@ -252,6 +271,8 @@ public class BeatrixListener {
         // accountRecord_id is not set for ACCOUNT_CREATE event as we are in the transaction and value is known yet
         if (eventType == BusInternalEventType.ACCOUNT_CREATE) {
             return objectId;
+        } else if (eventType == BusInternalEventType.TENANT_CONFIG_CHANGE || eventType == BusInternalEventType.TENANT_CONFIG_DELETION) {
+            return null;
         } else if (objectId == null) {
             return null;
         } else {
diff --git a/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/DefaultBusExternalEvent.java b/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/DefaultBusExternalEvent.java
index d02be0f..6d18c8d 100644
--- a/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/DefaultBusExternalEvent.java
+++ b/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/DefaultBusExternalEvent.java
@@ -25,6 +25,8 @@ import org.killbill.billing.notification.plugin.api.ExtBusEventType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 public class DefaultBusExternalEvent implements ExtBusEvent, BusEvent {
@@ -34,6 +36,7 @@ public class DefaultBusExternalEvent implements ExtBusEvent, BusEvent {
     private final UUID tenantId;
     private final ObjectType objectType;
     private final ExtBusEventType eventType;
+    private final String metaData;
     private final Long searchKey1;
     private final Long searchKey2;
     private final UUID userToken;
@@ -44,6 +47,7 @@ public class DefaultBusExternalEvent implements ExtBusEvent, BusEvent {
                                    @JsonProperty("eventType") final ExtBusEventType eventType,
                                    @JsonProperty("accountId") final UUID accountId,
                                    @JsonProperty("tenantId") final UUID tenantId,
+                                   @JsonProperty("metaData") final String metaData,
                                    @JsonProperty("searchKey1") final Long searchKey1,
                                    @JsonProperty("searchKey2") final Long searchKey2,
                                    @JsonProperty("userToken") final UUID userToken) {
@@ -52,6 +56,7 @@ public class DefaultBusExternalEvent implements ExtBusEvent, BusEvent {
         this.objectId = objectId;
         this.accountId = accountId;
         this.tenantId = tenantId;
+        this.metaData = metaData;
         this.searchKey1 = searchKey1;
         this.searchKey2 = searchKey2;
         this.userToken = userToken;
@@ -63,6 +68,11 @@ public class DefaultBusExternalEvent implements ExtBusEvent, BusEvent {
     }
 
     @Override
+    public String getMetaData() {
+        return metaData;
+    }
+
+    @Override
     public UUID getAccountId() {
         return accountId;
     }
@@ -117,15 +127,27 @@ public class DefaultBusExternalEvent implements ExtBusEvent, BusEvent {
         if (eventType != that.eventType) {
             return false;
         }
+        if (metaData != null ? !metaData.equals(that.metaData) : that.metaData != null) {
+            return false;
+        }
         if (objectId != null ? !objectId.equals(that.objectId) : that.objectId != null) {
             return false;
         }
         if (objectType != that.objectType) {
             return false;
         }
+        if (searchKey1 != null ? !searchKey1.equals(that.searchKey1) : that.searchKey1 != null) {
+            return false;
+        }
+        if (searchKey2 != null ? !searchKey2.equals(that.searchKey2) : that.searchKey2 != null) {
+            return false;
+        }
         if (tenantId != null ? !tenantId.equals(that.tenantId) : that.tenantId != null) {
             return false;
         }
+        if (userToken != null ? !userToken.equals(that.userToken) : that.userToken != null) {
+            return false;
+        }
 
         return true;
     }
@@ -135,8 +157,12 @@ public class DefaultBusExternalEvent implements ExtBusEvent, BusEvent {
         int result = objectId != null ? objectId.hashCode() : 0;
         result = 31 * result + (accountId != null ? accountId.hashCode() : 0);
         result = 31 * result + (tenantId != null ? tenantId.hashCode() : 0);
-        result = 31 * result + objectType.hashCode();
-        result = 31 * result + eventType.hashCode();
+        result = 31 * result + (objectType != null ? objectType.hashCode() : 0);
+        result = 31 * result + (eventType != null ? eventType.hashCode() : 0);
+        result = 31 * result + (metaData != null ? metaData.hashCode() : 0);
+        result = 31 * result + (searchKey1 != null ? searchKey1.hashCode() : 0);
+        result = 31 * result + (searchKey2 != null ? searchKey2.hashCode() : 0);
+        result = 31 * result + (userToken != null ? userToken.hashCode() : 0);
         return result;
     }
 
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/extbus/TestEventJson.java b/beatrix/src/test/java/org/killbill/billing/beatrix/extbus/TestEventJson.java
index b4c1cf0..ac5be0f 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/extbus/TestEventJson.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/extbus/TestEventJson.java
@@ -18,12 +18,13 @@ package org.killbill.billing.beatrix.extbus;
 
 import java.util.UUID;
 
+import org.killbill.billing.notification.plugin.api.ExtBusEvent;
+import org.killbill.billing.notification.plugin.api.ExtBusEventType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import org.killbill.billing.ObjectType;
 import org.killbill.billing.beatrix.BeatrixTestSuite;
-import org.killbill.billing.notification.plugin.api.ExtBusEventType;
 import org.killbill.billing.util.jackson.ObjectMapper;
 
 public class TestEventJson extends BeatrixTestSuite {
@@ -33,17 +34,30 @@ public class TestEventJson extends BeatrixTestSuite {
     @Test(groups = "fast")
     public void testBusExternalEvent() throws Exception {
         final UUID objectId = UUID.randomUUID();
-        final UUID userToken = UUID.randomUUID();
         final UUID accountId = UUID.randomUUID();
         final UUID tenantId = UUID.randomUUID();
         final ObjectType objectType = ObjectType.ACCOUNT;
         final ExtBusEventType extBusEventType = ExtBusEventType.ACCOUNT_CREATION;
 
-        final DefaultBusExternalEvent e = new DefaultBusExternalEvent(objectId, objectType, extBusEventType, accountId, tenantId, 1L, 2L, UUID.randomUUID());
+        final DefaultBusExternalEvent e = new DefaultBusExternalEvent(objectId, objectType, extBusEventType, accountId, tenantId, null, 1L, 2L, UUID.randomUUID());
         final String json = mapper.writeValueAsString(e);
 
         final Class<?> claz = Class.forName(DefaultBusExternalEvent.class.getName());
-        final Object obj = mapper.readValue(json, claz);
-        Assert.assertTrue(obj.equals(e));
+        final ExtBusEvent obj = (ExtBusEvent) mapper.readValue(json, claz);
+        Assert.assertEquals(obj.getAccountId(), accountId);
+        Assert.assertEquals(obj.getObjectId(), objectId);
+        Assert.assertEquals(obj.getTenantId(), tenantId);
+        Assert.assertEquals(obj.getObjectType(), objectType);
+        Assert.assertEquals(obj.getEventType(), extBusEventType);
+    }
+
+    @Test(groups = "fast")
+    public void testBusExternalEventWithMissingMetadata() throws Exception {
+        final String jsonWithMetadata = "{\"objectId\":\"273ff2ed-5442-4d10-971f-3cc2414fe33b\",\"accountId\":\"c3b5b220-aaa1-406e-abd0-e8448b140082\",\"tenantId\":\"6962cf97-5fc2-4ef6-9099-3806acdb134d\",\"objectType\":\"ACCOUNT\",\"eventType\":\"ACCOUNT_CREATION\",\"metaData\":null}";
+        final String jsonWithoutMetadata = "{\"objectId\":\"273ff2ed-5442-4d10-971f-3cc2414fe33b\",\"accountId\":\"c3b5b220-aaa1-406e-abd0-e8448b140082\",\"tenantId\":\"6962cf97-5fc2-4ef6-9099-3806acdb134d\",\"objectType\":\"ACCOUNT\",\"eventType\":\"ACCOUNT_CREATION\"}";
+        final Class<?> claz = Class.forName(DefaultBusExternalEvent.class.getName());
+        final ExtBusEvent obj = (ExtBusEvent) mapper.readValue(jsonWithoutMetadata, claz);
+        Assert.assertTrue(obj.getObjectId().equals(UUID.fromString("273ff2ed-5442-4d10-971f-3cc2414fe33b")));
+        Assert.assertNull(obj.getMetaData());
     }
 }
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java
index ee1d9be..a5c19e3 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java
@@ -90,6 +90,7 @@ import org.killbill.billing.subscription.api.SubscriptionBaseService;
 import org.killbill.billing.subscription.api.timeline.SubscriptionBaseTimelineApi;
 import org.killbill.billing.subscription.api.transfer.SubscriptionBaseTransferApi;
 import org.killbill.billing.subscription.api.user.DefaultSubscriptionBase;
+import org.killbill.billing.tenant.api.TenantUserApi;
 import org.killbill.billing.usage.api.UsageUserApi;
 import org.killbill.billing.util.api.RecordIdApi;
 import org.killbill.billing.util.api.TagApiException;
@@ -252,6 +253,10 @@ public class TestIntegrationBase extends BeatrixTestSuiteWithEmbeddedDB {
     @Inject
     protected OverdueConfigCache overdueConfigCache;
 
+    @Inject
+    protected TenantUserApi tenantUserApi;
+
+
     protected void assertListenerStatus() {
         busHandler.assertListenerStatus();
     }
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
index 3ac628b..012fa68 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
@@ -23,13 +23,23 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.joda.time.DateTime;
+import org.killbill.billing.DBTestingHelper;
 import org.killbill.billing.account.api.Account;
 import org.killbill.billing.api.TestApiListener.NextEvent;
+import org.killbill.billing.callcontext.DefaultCallContext;
 import org.killbill.billing.catalog.api.BillingPeriod;
 import org.killbill.billing.catalog.api.PriceListSet;
 import org.killbill.billing.catalog.api.ProductCategory;
 import org.killbill.billing.entitlement.api.DefaultEntitlement;
 import org.killbill.billing.notification.plugin.api.ExtBusEvent;
+import org.killbill.billing.overdue.api.OverdueConfig;
+import org.killbill.billing.tenant.api.DefaultTenant;
+import org.killbill.billing.tenant.api.Tenant;
+import org.killbill.billing.tenant.api.TenantData;
+import org.killbill.billing.tenant.api.TenantKV.TenantKey;
+import org.killbill.billing.util.callcontext.CallContext;
+import org.killbill.billing.util.callcontext.CallOrigin;
+import org.killbill.billing.util.callcontext.UserType;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -58,22 +68,46 @@ public class TestPublicBus extends TestIntegrationBase {
     @Override
     @BeforeMethod(groups = "slow")
     public void beforeMethod() throws Exception {
-        super.beforeMethod();
 
-        publicListener = new PublicListener();
+        /*
+        We copy the initialization instead of invoking the super method so we can add the registration
+        of the publicBus event;
+        TODO modify sequence to allow optional registration of publicListener
+         */
+        //super.beforeMethod();
+
+        try {
+            DBTestingHelper.get().getInstance().cleanupAllTables();
+        } catch (final Exception ignored) {
+        }
+
+        super.beforeMethod();
 
         log.debug("RESET TEST FRAMEWORK");
 
+        controlCacheDispatcher.clearAll();
+
+        overdueConfigCache.loadDefaultOverdueConfig((OverdueConfig) null);
+
         clock.resetDeltaFromReality();
         busHandler.reset();
 
         // Start services
+        publicListener = new PublicListener();
+
         lifecycle.fireStartupSequencePriorEventRegistration();
         busService.getBus().register(busHandler);
         externalBus.register(publicListener);
+
         lifecycle.fireStartupSequencePostEventRegistration();
 
+        paymentPlugin.clear();
+
         this.externalBusCount = new AtomicInteger(0);
+
+        // Make sure we start with a clean state
+        assertListenerStatus();
+
     }
 
     @Test(groups = "{slow}")
@@ -103,9 +137,30 @@ public class TestPublicBus extends TestIntegrationBase {
         await().atMost(10, SECONDS).until(new Callable<Boolean>() {
             @Override
             public Boolean call() throws Exception {
-                // expecting ACCOUNT_CREATION, ACCOUNT_CHANGE, SUBSCRIPTION_CREATION, INVOICE_CREATION
+                // expecting ACCOUNT_CREATE, ACCOUNT_CHANGE, SUBSCRIPTION_CREATION, INVOICE_CREATION
                 return externalBusCount.get() == 4;
             }
         });
     }
+
+    @Test(groups = "{slow}")
+    public void testTenantKVChange() throws Exception {
+
+        final TenantData tenantData = new DefaultTenant(null, clock.getUTCNow(), clock.getUTCNow(), "MY_TENANT", "key", "s3Cr3T");
+        final CallContext contextWithNoTenant = new DefaultCallContext(null, "loulou", CallOrigin.EXTERNAL, UserType.ADMIN, "no reason", "hum", UUID.randomUUID(), clock);
+        final Tenant tenant = tenantUserApi.createTenant(tenantData, contextWithNoTenant);
+
+        final CallContext contextWithTenant = new DefaultCallContext(tenant.getId(), "loulou", CallOrigin.EXTERNAL, UserType.ADMIN, "no reason", "hum", UUID.randomUUID(), clock);
+        final String tenantKey = TenantKey.PLUGIN_CONFIG_ + "FOO";
+        tenantUserApi.addTenantKeyValue(tenantKey, "FOO", contextWithTenant);
+
+        await().atMost(10, SECONDS).until(new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                // expecting  TENANT_CONFIG_CHANGE
+                return externalBusCount.get() == 1;
+            }
+        });
+    }
+
 }
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/api/TenantCacheInvalidation.java b/tenant/src/main/java/org/killbill/billing/tenant/api/TenantCacheInvalidation.java
index 1e61e4c..3e2a09f 100644
--- a/tenant/src/main/java/org/killbill/billing/tenant/api/TenantCacheInvalidation.java
+++ b/tenant/src/main/java/org/killbill/billing/tenant/api/TenantCacheInvalidation.java
@@ -28,12 +28,20 @@ import javax.inject.Inject;
 import javax.inject.Named;
 
 import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.events.BusInternalEvent;
 import org.killbill.billing.tenant.api.TenantInternalApi.CacheInvalidationCallback;
 import org.killbill.billing.tenant.api.TenantKV.TenantKey;
+import org.killbill.billing.tenant.api.user.DefaultTenantConfigChangeInternalEvent;
+import org.killbill.billing.tenant.api.user.DefaultTenantConfigDeletionInternalEvent;
 import org.killbill.billing.tenant.dao.TenantBroadcastDao;
 import org.killbill.billing.tenant.dao.TenantBroadcastModelDao;
+import org.killbill.billing.tenant.dao.TenantDao;
+import org.killbill.billing.tenant.dao.TenantKVModelDao;
+import org.killbill.billing.tenant.dao.TenantModelDao;
 import org.killbill.billing.tenant.glue.DefaultTenantModule;
 import org.killbill.billing.util.config.TenantConfig;
+import org.killbill.bus.api.PersistentBus;
+import org.killbill.bus.api.PersistentBus.EventBusException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,17 +69,23 @@ public class TenantCacheInvalidation {
     private final TenantBroadcastDao broadcastDao;
     private final ScheduledExecutorService tenantExecutor;
     private final TenantConfig tenantConfig;
+    private final PersistentBus eventBus;
+    private final TenantDao tenantDao;
     private AtomicLong latestRecordIdProcessed;
     private volatile boolean isStopped;
 
     @Inject
     public TenantCacheInvalidation(@Named(DefaultTenantModule.NO_CACHING_TENANT) final TenantBroadcastDao broadcastDao,
                                    @Named(DefaultTenantModule.TENANT_EXECUTOR_NAMED) final ScheduledExecutorService tenantExecutor,
+                                   @Named(DefaultTenantModule.NO_CACHING_TENANT) final TenantDao tenantDao,
+                                   final PersistentBus eventBus,
                                    final TenantConfig tenantConfig) {
         this.cache = new HashMap<TenantKey, CacheInvalidationCallback>();
         this.broadcastDao = broadcastDao;
         this.tenantExecutor = tenantExecutor;
         this.tenantConfig = tenantConfig;
+        this.tenantDao = tenantDao;
+        this.eventBus = eventBus;
         this.isStopped = false;
     }
 
@@ -88,7 +102,7 @@ public class TenantCacheInvalidation {
         }
         final TimeUnit pendingRateUnit = tenantConfig.getTenantBroadcastServiceRunningRate().getUnit();
         final long pendingPeriod = tenantConfig.getTenantBroadcastServiceRunningRate().getPeriod();
-        tenantExecutor.scheduleAtFixedRate(new TenantCacheInvalidationRunnable(this, broadcastDao), pendingPeriod, pendingPeriod, pendingRateUnit);
+        tenantExecutor.scheduleAtFixedRate(new TenantCacheInvalidationRunnable(this, broadcastDao, tenantDao), pendingPeriod, pendingPeriod, pendingRateUnit);
 
     }
 
@@ -133,15 +147,22 @@ public class TenantCacheInvalidation {
         this.latestRecordIdProcessed.set(newProcessedRecordId);
     }
 
+    public PersistentBus getEventBus() {
+        return eventBus;
+    }
+
     public static class TenantCacheInvalidationRunnable implements Runnable {
 
         private final TenantCacheInvalidation parent;
         private final TenantBroadcastDao broadcastDao;
+        private final TenantDao tenantDao;
 
         public TenantCacheInvalidationRunnable(final TenantCacheInvalidation parent,
-                                               final TenantBroadcastDao broadcastDao) {
+                                               final TenantBroadcastDao broadcastDao,
+                                               final TenantDao tenantDao) {
             this.parent = parent;
             this.broadcastDao = broadcastDao;
+            this.tenantDao = tenantDao;
         }
 
         @Override
@@ -162,10 +183,26 @@ public class TenantCacheInvalidation {
                         final CacheInvalidationCallback callback = parent.getCacheInvalidation(tenantKeyAndCookie.getTenantKey());
                         if (callback != null) {
                             final InternalTenantContext tenantContext = new InternalTenantContext(cur.getTenantRecordId(), null);
-                            callback.invalidateCache(tenantKeyAndCookie. getTenantKey(), tenantKeyAndCookie.getCookie(), tenantContext);
-                        } else {
-                            logger.warn("Failed to find CacheInvalidationCallback for " + cur.getType());
+                            callback.invalidateCache(tenantKeyAndCookie.getTenantKey(), tenantKeyAndCookie.getCookie(), tenantContext);
+
+                            final Long tenantKvsTargetRecordId = cur.getTargetRecordId();
+                            final BusInternalEvent event;
+                            if (tenantKvsTargetRecordId != null) {
+                                final TenantKVModelDao tenantModelDao = tenantDao.getKeyByRecordId(tenantKvsTargetRecordId, tenantContext);
+                                event = new DefaultTenantConfigChangeInternalEvent(tenantModelDao.getId(), cur.getType(),
+                                                                                   null, tenantContext.getTenantRecordId(), cur.getUserToken());
+                            } else {
+                                event = new DefaultTenantConfigDeletionInternalEvent(cur.getType(),
+                                                                                     null, tenantContext.getTenantRecordId(), cur.getUserToken());
+                            }
+                            try {
+                                parent.getEventBus().post(event);
+                            } catch (final EventBusException e) {
+                                logger.warn("Failed post bus event " + event, e);
+                            }
                         }
+                    } else {
+                        logger.warn("Failed to find CacheInvalidationCallback for " + cur.getType());
                     }
                 } finally {
                     parent.setLatestRecordIdProcessed(cur.getRecordId());
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/api/user/DefaultTenantConfigChangeInternalEvent.java b/tenant/src/main/java/org/killbill/billing/tenant/api/user/DefaultTenantConfigChangeInternalEvent.java
new file mode 100644
index 0000000..f62e80d
--- /dev/null
+++ b/tenant/src/main/java/org/killbill/billing/tenant/api/user/DefaultTenantConfigChangeInternalEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.tenant.api.user;
+
+import java.util.UUID;
+
+import org.killbill.billing.events.BusEventBase;
+import org.killbill.billing.events.TenantConfigChangeInternalEvent;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class DefaultTenantConfigChangeInternalEvent extends BusEventBase implements TenantConfigChangeInternalEvent {
+
+    private final UUID id;
+    private final String key;
+
+    @JsonCreator
+    public DefaultTenantConfigChangeInternalEvent(@JsonProperty("id") final UUID id,
+                                                  @JsonProperty("key") final String key,
+                                                  @JsonProperty("searchKey1") final Long searchKey1,
+                                                  @JsonProperty("searchKey2") final Long searchKey2,
+                                                  @JsonProperty("userToken") final UUID userToken) {
+        super(searchKey1, searchKey2, userToken);
+        this.id = id;
+        this.key = key;
+    }
+
+    @Override
+    public UUID getId() {
+        return id;
+    }
+
+    @Override
+    public String getKey() {
+        return key;
+    }
+
+    @JsonIgnore
+    @Override
+    public BusInternalEventType getBusEventType() {
+        return BusInternalEventType.TENANT_CONFIG_CHANGE;
+    }
+}
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/api/user/DefaultTenantConfigDeletionInternalEvent.java b/tenant/src/main/java/org/killbill/billing/tenant/api/user/DefaultTenantConfigDeletionInternalEvent.java
new file mode 100644
index 0000000..191e610
--- /dev/null
+++ b/tenant/src/main/java/org/killbill/billing/tenant/api/user/DefaultTenantConfigDeletionInternalEvent.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.tenant.api.user;
+
+import java.util.UUID;
+
+import org.killbill.billing.events.BusEventBase;
+import org.killbill.billing.events.BusInternalEvent.BusInternalEventType;
+import org.killbill.billing.events.TenantConfigChangeInternalEvent;
+import org.killbill.billing.events.TenantConfigDeletionInternalEvent;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class DefaultTenantConfigDeletionInternalEvent extends BusEventBase implements TenantConfigDeletionInternalEvent {
+
+    private final String key;
+
+    @JsonCreator
+    public DefaultTenantConfigDeletionInternalEvent(@JsonProperty("key") final String key,
+                                                  @JsonProperty("searchKey1") final Long searchKey1,
+                                                  @JsonProperty("searchKey2") final Long searchKey2,
+                                                  @JsonProperty("userToken") final UUID userToken) {
+        super(searchKey1, searchKey2, userToken);
+        this.key = key;
+    }
+
+    @Override
+    public String getKey() {
+        return key;
+    }
+
+    @JsonIgnore
+    @Override
+    public BusInternalEventType getBusEventType() {
+        return BusInternalEventType.TENANT_CONFIG_DELETION;
+    }
+}
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/dao/DefaultTenantDao.java b/tenant/src/main/java/org/killbill/billing/tenant/dao/DefaultTenantDao.java
index 97d7721..fae6f7f 100644
--- a/tenant/src/main/java/org/killbill/billing/tenant/dao/DefaultTenantDao.java
+++ b/tenant/src/main/java/org/killbill/billing/tenant/dao/DefaultTenantDao.java
@@ -138,7 +138,8 @@ public class DefaultTenantDao extends EntityDaoBase<TenantModelDao, Tenant, Tena
                     deleteFromTransaction(key, entitySqlDaoWrapperFactory, context);
                 }
                 tenantKVSqlDao.create(tenantKVModelDao, context);
-                broadcastConfigurationChangeFromTransaction(key, entitySqlDaoWrapperFactory, context);
+                final TenantKVModelDao rehydrated = tenantKVSqlDao.getById(tenantKVModelDao.getId().toString(), context);
+                broadcastConfigurationChangeFromTransaction(rehydrated.getRecordId(), key, entitySqlDaoWrapperFactory, context);
                 return null;
             }
         });
@@ -151,12 +152,22 @@ public class DefaultTenantDao extends EntityDaoBase<TenantModelDao, Tenant, Tena
             @Override
             public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
                 deleteFromTransaction(key, entitySqlDaoWrapperFactory, context);
-                broadcastConfigurationChangeFromTransaction(key, entitySqlDaoWrapperFactory, context);
+                broadcastConfigurationChangeFromTransaction(null, key, entitySqlDaoWrapperFactory, context);
                 return null;
             }
         });
     }
 
+    @Override
+    public TenantKVModelDao getKeyByRecordId(final Long recordId, final InternalTenantContext context) {
+        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<TenantKVModelDao>() {
+            @Override
+            public TenantKVModelDao inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
+                return entitySqlDaoWrapperFactory.become(TenantKVSqlDao.class).getByRecordId(recordId, context);
+            }
+        });
+    }
+
     private Void deleteFromTransaction(final String key, final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final InternalCallContext context) {
         final List<TenantKVModelDao> tenantKVs = entitySqlDaoWrapperFactory.become(TenantKVSqlDao.class).getTenantValueForKey(key, context);
         for (TenantKVModelDao cur : tenantKVs) {
@@ -167,10 +178,10 @@ public class DefaultTenantDao extends EntityDaoBase<TenantModelDao, Tenant, Tena
         return null;
     }
 
-    private void broadcastConfigurationChangeFromTransaction(final String key, final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
+    private void broadcastConfigurationChangeFromTransaction(final Long kvRecordId, final String key, final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
                                                              final InternalCallContext context) throws EntityPersistenceException {
         if (isSystemKey(key)) {
-            final TenantBroadcastModelDao broadcast = new TenantBroadcastModelDao(key);
+            final TenantBroadcastModelDao broadcast = new TenantBroadcastModelDao(kvRecordId, key, context.getUserToken());
             entitySqlDaoWrapperFactory.become(TenantBroadcastSqlDao.class).create(broadcast, context);
         }
     }
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/dao/NoCachingTenantDao.java b/tenant/src/main/java/org/killbill/billing/tenant/dao/NoCachingTenantDao.java
index 711886d..d7cb4d5 100644
--- a/tenant/src/main/java/org/killbill/billing/tenant/dao/NoCachingTenantDao.java
+++ b/tenant/src/main/java/org/killbill/billing/tenant/dao/NoCachingTenantDao.java
@@ -77,6 +77,26 @@ public class NoCachingTenantDao extends EntityDaoBase<TenantModelDao, Tenant, Te
     }
 
     @Override
+    public TenantKVModelDao getKeyByRecordId(final Long recordId, final InternalTenantContext context) {
+        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<TenantKVModelDao>() {
+            @Override
+            public TenantKVModelDao inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
+                return entitySqlDaoWrapperFactory.become(TenantKVSqlDao.class).getByRecordId(recordId, context);
+            }
+        });
+    }
+
+    @Override
+    public TenantModelDao getByRecordId(final Long recordId, final InternalTenantContext context) {
+        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<TenantModelDao>() {
+            @Override
+            public TenantModelDao inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
+                return entitySqlDaoWrapperFactory.become(TenantSqlDao.class).getByRecordId(recordId, context);
+            }
+        });
+    }
+
+    @Override
     public void addTenantKeyValue(final String key, final String value, final boolean uniqueKey, final InternalCallContext context) {
         throw new IllegalStateException("Not implemented by NoCachingTenantDao");
     }
@@ -102,11 +122,6 @@ public class NoCachingTenantDao extends EntityDaoBase<TenantModelDao, Tenant, Te
     }
 
     @Override
-    public TenantModelDao getByRecordId(final Long recordId, final InternalTenantContext context) {
-        throw new IllegalStateException("Not implemented by NoCachingTenantDao");
-    }
-
-    @Override
     public TenantModelDao getById(final UUID id, final InternalTenantContext context) {
         throw new IllegalStateException("Not implemented by NoCachingTenantDao");
     }
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantBroadcastModelDao.java b/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantBroadcastModelDao.java
index 55ad9f9..081220f 100644
--- a/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantBroadcastModelDao.java
+++ b/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantBroadcastModelDao.java
@@ -28,17 +28,23 @@ import org.killbill.billing.util.entity.dao.EntityModelDaoBase;
 public class TenantBroadcastModelDao extends EntityModelDaoBase implements EntityModelDao<Entity> {
 
     private String type;
+    private Long targetRecordId;
+    private TableName targetTableName;
+    private UUID userToken;
 
     public TenantBroadcastModelDao() { /* For the DAO mapper */ }
 
-    public TenantBroadcastModelDao(final String type) {
-        this(UUID.randomUUID(), null, null, type);
+    public TenantBroadcastModelDao(final Long targetRecordId, final String type, final UUID userToken) {
+        this(UUID.randomUUID(), null, null, type, userToken);
+        this.targetRecordId = targetRecordId;
     }
 
     public TenantBroadcastModelDao(final UUID id, final DateTime createdDate, final DateTime updatedDate,
-                                   final String type) {
+                                   final String type, final UUID userToken) {
         super(id, createdDate, updatedDate);
         this.type = type;
+        this.userToken = userToken;
+        this.targetTableName = TableName.TENANT_KVS; // Only one supported now
     }
 
     public String getType() {
@@ -49,6 +55,30 @@ public class TenantBroadcastModelDao extends EntityModelDaoBase implements Entit
         this.type = type;
     }
 
+    public Long getTargetRecordId() {
+        return targetRecordId;
+    }
+
+    public void setTargetRecordId(final Long targetRecordId) {
+        this.targetRecordId = targetRecordId;
+    }
+
+    public UUID getUserToken() {
+        return userToken;
+    }
+
+    public void setUserToken(final UUID userToken) {
+        this.userToken = userToken;
+    }
+
+    public TableName getTargetTableName() {
+        return targetTableName;
+    }
+
+    public void setTargetTableName(final TableName targetTableName) {
+        this.targetTableName = targetTableName;
+    }
+
     @Override
     public boolean equals(final Object o) {
         if (this == o) {
@@ -63,9 +93,18 @@ public class TenantBroadcastModelDao extends EntityModelDaoBase implements Entit
 
         final TenantBroadcastModelDao that = (TenantBroadcastModelDao) o;
 
+        if (targetTableName != that.targetTableName) {
+            return false;
+        }
+        if (targetRecordId != null ? !targetRecordId.equals(that.targetRecordId) : that.targetRecordId != null) {
+            return false;
+        }
         if (type != null ? !type.equals(that.type) : that.type != null) {
             return false;
         }
+        if (userToken != null ? !userToken.equals(that.userToken) : that.userToken != null) {
+            return false;
+        }
 
         return true;
     }
@@ -74,6 +113,9 @@ public class TenantBroadcastModelDao extends EntityModelDaoBase implements Entit
     public int hashCode() {
         int result = super.hashCode();
         result = 31 * result + (type != null ? type.hashCode() : 0);
+        result = 31 * result + (targetRecordId != null ? targetRecordId.hashCode() : 0);
+        result = 31 * result + (targetTableName != null ? targetTableName.hashCode() : 0);
+        result = 31 * result + (userToken != null ? userToken.hashCode() : 0);
         return result;
     }
 
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantDao.java b/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantDao.java
index 7b32c57..8f48adc 100644
--- a/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantDao.java
+++ b/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantDao.java
@@ -33,4 +33,7 @@ public interface TenantDao extends EntityDao<TenantModelDao, Tenant, TenantApiEx
     public void addTenantKeyValue(final String key, final String value, final boolean uniqueKey, final InternalCallContext context);
 
     public void deleteTenantKey(final String key, final InternalCallContext context);
+
+    public TenantKVModelDao getKeyByRecordId(Long recordId, InternalTenantContext context);
+
 }
diff --git a/tenant/src/main/resources/org/killbill/billing/tenant/dao/TenantBroadcastSqlDao.sql.stg b/tenant/src/main/resources/org/killbill/billing/tenant/dao/TenantBroadcastSqlDao.sql.stg
index 60def50..b0e5bc3 100644
--- a/tenant/src/main/resources/org/killbill/billing/tenant/dao/TenantBroadcastSqlDao.sql.stg
+++ b/tenant/src/main/resources/org/killbill/billing/tenant/dao/TenantBroadcastSqlDao.sql.stg
@@ -3,7 +3,10 @@ group TenantBroadcastSqlDao: EntitySqlDao;
 tableName() ::= "tenant_broadcasts"
 
 tableFields(prefix) ::= <<
-  <prefix>type
+  <prefix>target_record_id
+, <prefix>target_table_name
+, <prefix>type
+, <prefix>user_token
 , <prefix>created_date
 , <prefix>created_by
 , <prefix>updated_date
@@ -11,7 +14,10 @@ tableFields(prefix) ::= <<
 >>
 
 tableValues() ::= <<
- :type
+  :targetRecordId
+, :targetTableName
+, :type
+, :userToken
 , :createdDate
 , :createdBy
 , :updatedDate
diff --git a/tenant/src/main/resources/org/killbill/billing/tenant/ddl.sql b/tenant/src/main/resources/org/killbill/billing/tenant/ddl.sql
index 814ed17..d549318 100644
--- a/tenant/src/main/resources/org/killbill/billing/tenant/ddl.sql
+++ b/tenant/src/main/resources/org/killbill/billing/tenant/ddl.sql
@@ -39,8 +39,11 @@ DROP TABLE IF EXISTS tenant_broadcasts;
 CREATE TABLE tenant_broadcasts (
    record_id int(11) unsigned NOT NULL AUTO_INCREMENT,
    id char(36) NOT NULL,
+   target_record_id int(11) unsigned,
+   target_table_name varchar(50) NOT NULL,
    tenant_record_id int(11) unsigned NOT NULL,
    type varchar(64) NOT NULL,
+   user_token char(36),
    created_date datetime NOT NULL,
    created_by varchar(50) NOT NULL,
    updated_date datetime DEFAULT NULL,
diff --git a/tenant/src/test/java/org/killbill/billing/tenant/dao/TestNoCachingTenantBroadcastDao.java b/tenant/src/test/java/org/killbill/billing/tenant/dao/TestNoCachingTenantBroadcastDao.java
index abab8da..4d6a43a 100644
--- a/tenant/src/test/java/org/killbill/billing/tenant/dao/TestNoCachingTenantBroadcastDao.java
+++ b/tenant/src/test/java/org/killbill/billing/tenant/dao/TestNoCachingTenantBroadcastDao.java
@@ -31,7 +31,7 @@ public class TestNoCachingTenantBroadcastDao extends TenantTestSuiteWithEmbedded
 
     @Test(groups = "slow")
     public void testBasic() throws Exception {
-        final TenantBroadcastModelDao model = new TenantBroadcastModelDao("foo");
+        final TenantBroadcastModelDao model = new TenantBroadcastModelDao(0L, "foo", UUID.randomUUID());
 
         final InternalCallContext context79 = createContext(79L);
         tenantBroadcastDao.create(model, context79);
@@ -54,7 +54,7 @@ public class TestNoCachingTenantBroadcastDao extends TenantTestSuiteWithEmbedded
         final InternalCallContext context79 = createContext(81L);
         TenantBroadcastModelDao latestInsert = null;
         for (int i = 0; i < 100; i++) {
-            final TenantBroadcastModelDao model = new TenantBroadcastModelDao("foo-" + i);
+            final TenantBroadcastModelDao model = new TenantBroadcastModelDao(0L, "foo-" + i, UUID.randomUUID());
             tenantBroadcastDao.create(model, context79);
             latestInsert = model;
         }
diff --git a/util/src/test/java/org/killbill/billing/mock/api/MockExtBusEvent.java b/util/src/test/java/org/killbill/billing/mock/api/MockExtBusEvent.java
index 206ff36..f11209e 100644
--- a/util/src/test/java/org/killbill/billing/mock/api/MockExtBusEvent.java
+++ b/util/src/test/java/org/killbill/billing/mock/api/MockExtBusEvent.java
@@ -63,6 +63,11 @@ public class MockExtBusEvent implements ExtBusEvent {
     }
 
     @Override
+    public String getMetaData() {
+        return null;
+    }
+
+    @Override
     public UUID getAccountId() {
         return accountId;
     }