killbill-memoizeit

Rework logic to upload catalog/overdue config so that cache

12/23/2014 8:47:45 PM

Changes

Details

diff --git a/api/src/main/java/org/killbill/billing/catalog/api/CatalogService.java b/api/src/main/java/org/killbill/billing/catalog/api/CatalogService.java
index a5f7e6c..f461328 100644
--- a/api/src/main/java/org/killbill/billing/catalog/api/CatalogService.java
+++ b/api/src/main/java/org/killbill/billing/catalog/api/CatalogService.java
@@ -27,8 +27,7 @@ import org.killbill.billing.util.callcontext.TenantContext;
  */
 public interface CatalogService extends KillbillService {
 
-    public abstract Catalog getFullCatalog(InternalTenantContext context) throws CatalogApiException;
-
-    public abstract StaticCatalog getCurrentCatalog(InternalTenantContext context) throws CatalogApiException;
+    public Catalog getFullCatalog(InternalTenantContext context) throws CatalogApiException;
 
+    public StaticCatalog getCurrentCatalog(InternalTenantContext context) throws CatalogApiException;
 }
diff --git a/api/src/main/java/org/killbill/billing/tenant/api/TenantInternalApi.java b/api/src/main/java/org/killbill/billing/tenant/api/TenantInternalApi.java
index b5adc7c..071f8ed 100644
--- a/api/src/main/java/org/killbill/billing/tenant/api/TenantInternalApi.java
+++ b/api/src/main/java/org/killbill/billing/tenant/api/TenantInternalApi.java
@@ -24,9 +24,13 @@ import org.killbill.billing.callcontext.InternalTenantContext;
 
 public interface TenantInternalApi {
 
-    public List<String> getTenantCatalogs(InternalTenantContext tenantContext);
+    public interface CacheInvalidationCallback {
+        public void invalidateCache(InternalTenantContext tenantContext);
+    }
 
-    public String getTenantOverdueConfig(InternalTenantContext tenantContext);
+    public List<String> getTenantCatalogs(InternalTenantContext tenantContext, CacheInvalidationCallback cacheInvalidationCallback);
+
+    public String getTenantOverdueConfig(InternalTenantContext tenantContext, CacheInvalidationCallback cacheInvalidationCallback);
 
     public String getInvoiceTemplate(Locale locale, InternalTenantContext tenantContext);
 
diff --git a/catalog/src/main/java/org/killbill/billing/catalog/api/user/DefaultCatalogUserApi.java b/catalog/src/main/java/org/killbill/billing/catalog/api/user/DefaultCatalogUserApi.java
index 41a1638..9d81dd1 100644
--- a/catalog/src/main/java/org/killbill/billing/catalog/api/user/DefaultCatalogUserApi.java
+++ b/catalog/src/main/java/org/killbill/billing/catalog/api/user/DefaultCatalogUserApi.java
@@ -18,12 +18,18 @@ package org.killbill.billing.catalog.api.user;
 
 import javax.inject.Inject;
 
+import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.catalog.api.Catalog;
 import org.killbill.billing.catalog.api.CatalogApiException;
 import org.killbill.billing.catalog.api.CatalogService;
 import org.killbill.billing.catalog.api.CatalogUserApi;
 import org.killbill.billing.catalog.api.StaticCatalog;
+import org.killbill.billing.catalog.caching.CatalogCache;
+import org.killbill.billing.tenant.api.TenantApiException;
+import org.killbill.billing.tenant.api.TenantKV.TenantKey;
+import org.killbill.billing.tenant.api.TenantUserApi;
+import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.TenantContext;
 
@@ -31,10 +37,17 @@ public class DefaultCatalogUserApi implements CatalogUserApi {
 
     private final CatalogService catalogService;
     private final InternalCallContextFactory internalCallContextFactory;
+    private final TenantUserApi tenantApi;
+    private final CatalogCache catalogCache;
 
     @Inject
-    public DefaultCatalogUserApi(final CatalogService catalogService, final InternalCallContextFactory internalCallContextFactory) {
+    public DefaultCatalogUserApi(final CatalogService catalogService,
+                                 final TenantUserApi tenantApi,
+                                 final CatalogCache catalogCache,
+                                 final InternalCallContextFactory internalCallContextFactory) {
         this.catalogService = catalogService;
+        this.tenantApi = tenantApi;
+        this.catalogCache = catalogCache;
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
@@ -46,7 +59,24 @@ public class DefaultCatalogUserApi implements CatalogUserApi {
 
     @Override
     public StaticCatalog getCurrentCatalog(final String catalogName, final TenantContext tenantContext) throws CatalogApiException {
-        final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(tenantContext);
+        final InternalTenantContext internalTenantContext = createInternalTenantContext(tenantContext);
         return catalogService.getCurrentCatalog(internalTenantContext);
     }
+
+    @Override
+    public void uploadCatalog(final String catalogXML, final CallContext callContext) throws CatalogApiException {
+        try {
+            final InternalTenantContext internalTenantContext = createInternalTenantContext(callContext);
+            catalogCache.clearCatalog(internalTenantContext);
+            tenantApi.addTenantKeyValue(TenantKey.CATALOG.toString(), catalogXML, callContext);
+        } catch (TenantApiException e) {
+            throw new CatalogApiException(e);
+        }
+    }
+
+    private InternalTenantContext createInternalTenantContext(final TenantContext tenantContext) {
+        // Only tenantRecordId will be populated-- this is important to always create the (ehcache) key the same way
+        return internalCallContextFactory.createInternalTenantContext(tenantContext);
+    }
+
 }
diff --git a/catalog/src/main/java/org/killbill/billing/catalog/caching/CatalogCache.java b/catalog/src/main/java/org/killbill/billing/catalog/caching/CatalogCache.java
index b7bf7d9..38984c0 100644
--- a/catalog/src/main/java/org/killbill/billing/catalog/caching/CatalogCache.java
+++ b/catalog/src/main/java/org/killbill/billing/catalog/caching/CatalogCache.java
@@ -22,6 +22,11 @@ import org.killbill.billing.catalog.VersionedCatalog;
 import org.killbill.billing.catalog.api.CatalogApiException;
 
 public interface CatalogCache {
+
     public void loadDefaultCatalog(final String url) throws CatalogApiException;
+
     public VersionedCatalog getCatalog(InternalTenantContext tenantContext) throws CatalogApiException;
+
+    public void clearCatalog(InternalTenantContext tenantContext);
+
 }
diff --git a/catalog/src/main/java/org/killbill/billing/catalog/caching/EhCacheCatalogCache.java b/catalog/src/main/java/org/killbill/billing/catalog/caching/EhCacheCatalogCache.java
index dd0769f..256b8e2 100644
--- a/catalog/src/main/java/org/killbill/billing/catalog/caching/EhCacheCatalogCache.java
+++ b/catalog/src/main/java/org/killbill/billing/catalog/caching/EhCacheCatalogCache.java
@@ -19,13 +19,17 @@ package org.killbill.billing.catalog.caching;
 
 import java.util.List;
 
+import javax.inject.Inject;
+
 import org.killbill.billing.ErrorCode;
 import org.killbill.billing.ObjectType;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.catalog.VersionedCatalog;
 import org.killbill.billing.catalog.api.CatalogApiException;
 import org.killbill.billing.catalog.io.VersionedCatalogLoader;
+import org.killbill.billing.util.cache.Cachable.CacheType;
 import org.killbill.billing.util.cache.CacheController;
+import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.cache.CacheLoaderArgument;
 import org.killbill.billing.util.cache.TenantCatalogCacheLoader.LoaderCallback;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
@@ -38,15 +42,16 @@ public class EhCacheCatalogCache implements CatalogCache {
 
     private VersionedCatalog defaultCatalog;
 
-    public EhCacheCatalogCache(final CacheController cacheController, final VersionedCatalogLoader loader) {
-        this.cacheController = cacheController;
+    @Inject
+    public EhCacheCatalogCache(final CacheControllerDispatcher cacheControllerDispatcher, final VersionedCatalogLoader loader) {
+        this.cacheController = cacheControllerDispatcher.getCacheController(CacheType.TENANT_CATALOG);
         this.loader = loader;
-        this.cacheLoaderArgument = initializeCacheLoaderArgument();
+        this.cacheLoaderArgument = initializeCacheLoaderArgument(this);
     }
 
     @Override
     public void loadDefaultCatalog(final String url) throws CatalogApiException {
-        defaultCatalog = loader.load(url);
+        defaultCatalog = (url != null) ? loader.load(url) : null;
     }
 
     @Override
@@ -68,17 +73,27 @@ public class EhCacheCatalogCache implements CatalogCache {
         }
     }
 
+    @Override
+    public void clearCatalog(final InternalTenantContext tenantContext) {
+        cacheController.remove(tenantContext);
+    }
+
     //
     // Build the LoaderCallback that is required to build the catalog from the xml from a module that knows
     // nothing about catalog.
     //
     // This is a contract between the TenantCatalogCacheLoader and the EhCacheCatalogCache
-    private CacheLoaderArgument initializeCacheLoaderArgument() {
+    private CacheLoaderArgument initializeCacheLoaderArgument(final EhCacheCatalogCache parentCache) {
         final LoaderCallback loaderCallback = new LoaderCallback() {
             @Override
             public Object loadCatalog(final List<String> catalogXMLs) throws CatalogApiException {
                 return loader.load(catalogXMLs);
             }
+
+            @Override
+            public void invalidateCache(final InternalTenantContext tenantContext) {
+                parentCache.clearCatalog(tenantContext);
+            }
         };
         final Object[] args = new Object[1];
         args[0]= loaderCallback;
diff --git a/catalog/src/main/java/org/killbill/billing/catalog/DefaultCatalogService.java b/catalog/src/main/java/org/killbill/billing/catalog/DefaultCatalogService.java
index 6665ac8..0497e9d 100644
--- a/catalog/src/main/java/org/killbill/billing/catalog/DefaultCatalogService.java
+++ b/catalog/src/main/java/org/killbill/billing/catalog/DefaultCatalogService.java
@@ -24,13 +24,9 @@ import org.killbill.billing.catalog.api.CatalogApiException;
 import org.killbill.billing.catalog.api.CatalogService;
 import org.killbill.billing.catalog.api.StaticCatalog;
 import org.killbill.billing.catalog.caching.CatalogCache;
-import org.killbill.billing.catalog.caching.EhCacheCatalogCache;
-import org.killbill.billing.catalog.io.VersionedCatalogLoader;
 import org.killbill.billing.platform.api.KillbillService;
 import org.killbill.billing.platform.api.LifecycleHandlerType;
 import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
-import org.killbill.billing.util.cache.Cachable.CacheType;
-import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.config.CatalogConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,11 +45,10 @@ public class DefaultCatalogService implements KillbillService, CatalogService {
 
     @Inject
     public DefaultCatalogService(final CatalogConfig config,
-                                 final VersionedCatalogLoader loader,
-                                 final CacheControllerDispatcher cacheControllerDispatcher) {
+                                 final CatalogCache catalogCache) {
         this.config = config;
         this.isInitialized = false;
-        this.catalogCache = new EhCacheCatalogCache(cacheControllerDispatcher.getCacheController(CacheType.TENANT_CATALOG), loader);
+        this.catalogCache = catalogCache;
     }
 
     @LifecycleHandlerType(LifecycleLevel.LOAD_CATALOG)
diff --git a/catalog/src/main/java/org/killbill/billing/catalog/glue/CatalogModule.java b/catalog/src/main/java/org/killbill/billing/catalog/glue/CatalogModule.java
index 033b855..2198ce6 100644
--- a/catalog/src/main/java/org/killbill/billing/catalog/glue/CatalogModule.java
+++ b/catalog/src/main/java/org/killbill/billing/catalog/glue/CatalogModule.java
@@ -22,6 +22,8 @@ import org.killbill.billing.catalog.DefaultCatalogService;
 import org.killbill.billing.catalog.api.CatalogService;
 import org.killbill.billing.catalog.api.CatalogUserApi;
 import org.killbill.billing.catalog.api.user.DefaultCatalogUserApi;
+import org.killbill.billing.catalog.caching.CatalogCache;
+import org.killbill.billing.catalog.caching.EhCacheCatalogCache;
 import org.killbill.billing.catalog.io.CatalogLoader;
 import org.killbill.billing.catalog.io.VersionedCatalogLoader;
 import org.killbill.billing.platform.api.KillbillConfigSource;
@@ -49,10 +51,15 @@ public class CatalogModule extends KillBillModule {
         bind(CatalogUserApi.class).to(DefaultCatalogUserApi.class).asEagerSingleton();
     }
 
+    public void installOverdueConfigCache() {
+        bind(CatalogCache.class).to(EhCacheCatalogCache.class).asEagerSingleton();
+    }
+
     @Override
     protected void configure() {
         installConfig();
         installCatalog();
         installCatalogUserApi();
+        installOverdueConfigCache();
     }
 }
diff --git a/catalog/src/test/java/org/killbill/billing/catalog/caching/TestEhCacheCatalogCache.java b/catalog/src/test/java/org/killbill/billing/catalog/caching/TestEhCacheCatalogCache.java
index 9109dde..5e0706e 100644
--- a/catalog/src/test/java/org/killbill/billing/catalog/caching/TestEhCacheCatalogCache.java
+++ b/catalog/src/test/java/org/killbill/billing/catalog/caching/TestEhCacheCatalogCache.java
@@ -28,7 +28,7 @@ import org.killbill.billing.catalog.CatalogTestSuiteNoDB;
 import org.killbill.billing.catalog.DefaultProduct;
 import org.killbill.billing.catalog.VersionedCatalog;
 import org.killbill.billing.catalog.api.CatalogApiException;
-import org.killbill.billing.util.cache.Cachable.CacheType;
+import org.killbill.billing.tenant.api.TenantInternalApi.CacheInvalidationCallback;
 import org.killbill.xmlloader.UriAccessor;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -41,13 +41,11 @@ import com.google.common.io.Resources;
 
 public class TestEhCacheCatalogCache extends CatalogTestSuiteNoDB {
 
-    private CatalogCache catalogCache;
+    final CacheInvalidationCallback cacheInvalidationCallback = Mockito.mock(CacheInvalidationCallback.class);
 
     @BeforeMethod(groups = "fast")
     protected void beforeMethod() throws Exception {
-        this.catalogCache = new EhCacheCatalogCache(cacheControllerDispatcher.getCacheController(CacheType.TENANT_CATALOG), loader);
         cacheControllerDispatcher.clearAll();
-
     }
 
     //
@@ -55,7 +53,11 @@ public class TestEhCacheCatalogCache extends CatalogTestSuiteNoDB {
     //
     @Test(groups = "fast", expectedExceptions = CatalogApiException.class)
     public void testMissingDefaultCatalog() throws CatalogApiException {
-        Mockito.when(tenantInternalApi.getTenantCatalogs((Mockito.<InternalTenantContext>any()))).thenReturn(ImmutableList.<String>of());
+
+        final InternalTenantContext tenantContext = Mockito.mock(InternalTenantContext.class);
+        Mockito.when(tenantContext.getTenantRecordId()).thenReturn(0L);
+        catalogCache.loadDefaultCatalog(null);
+        Mockito.when(tenantInternalApi.getTenantCatalogs(tenantContext, cacheInvalidationCallback)).thenReturn(ImmutableList.<String>of());
         catalogCache.getCatalog(internalCallContext);
     }
 
@@ -64,8 +66,12 @@ public class TestEhCacheCatalogCache extends CatalogTestSuiteNoDB {
     //
     @Test(groups = "fast")
     public void testDefaultCatalog() throws CatalogApiException {
+
+        final InternalTenantContext tenantContext = Mockito.mock(InternalTenantContext.class);
+        Mockito.when(tenantContext.getTenantRecordId()).thenReturn(0L);
+
         catalogCache.loadDefaultCatalog(Resources.getResource("SpyCarBasic.xml").toExternalForm());
-        Mockito.when(tenantInternalApi.getTenantCatalogs((Mockito.<InternalTenantContext>any()))).thenReturn(ImmutableList.<String>of());
+        Mockito.when(tenantInternalApi.getTenantCatalogs(tenantContext, cacheInvalidationCallback)).thenReturn(ImmutableList.<String>of());
         VersionedCatalog result = catalogCache.getCatalog(internalCallContext);
         Assert.assertNotNull(result);
         final DefaultProduct[] products = result.getProducts(clock.getUTCNow());
@@ -83,7 +89,7 @@ public class TestEhCacheCatalogCache extends CatalogTestSuiteNoDB {
         final InternalTenantContext tenantContext = Mockito.mock(InternalTenantContext.class);
         Mockito.when(tenantContext.getTenantRecordId()).thenReturn(99L);
 
-        Mockito.when(tenantInternalApi.getTenantCatalogs((Mockito.<InternalTenantContext>any()))).thenReturn(ImmutableList.<String>of());
+        Mockito.when(tenantInternalApi.getTenantCatalogs(Mockito.any(InternalTenantContext.class), Mockito.any(CacheInvalidationCallback.class))).thenReturn(ImmutableList.<String>of());
         VersionedCatalog result = catalogCache.getCatalog(tenantContext);
         Assert.assertNotNull(result);
         final DefaultProduct[] products = result.getProducts(clock.getUTCNow());
@@ -107,13 +113,13 @@ public class TestEhCacheCatalogCache extends CatalogTestSuiteNoDB {
         final InternalTenantContext tenantContext = Mockito.mock(InternalTenantContext.class);
         Mockito.when(tenantContext.getTenantRecordId()).thenReturn(156L);
 
-        Mockito.when(tenantInternalApi.getTenantCatalogs((Mockito.<InternalTenantContext>any()))).thenReturn(ImmutableList.<String>of(catalogXML));
+        Mockito.when(tenantInternalApi.getTenantCatalogs(Mockito.any(InternalTenantContext.class), Mockito.any(CacheInvalidationCallback.class))).thenReturn(ImmutableList.<String>of(catalogXML));
         VersionedCatalog result = catalogCache.getCatalog(tenantContext);
         Assert.assertNotNull(result);
         final DefaultProduct[] products = result.getProducts(clock.getUTCNow());
         Assert.assertEquals(products.length, 6);
 
-        Mockito.when(tenantInternalApi.getTenantCatalogs(tenantContext)).thenThrow(RuntimeException.class);
+        Mockito.when(tenantInternalApi.getTenantCatalogs(tenantContext, cacheInvalidationCallback)).thenThrow(RuntimeException.class);
 
         VersionedCatalog result2 = catalogCache.getCatalog(tenantContext);
         Assert.assertNotNull(result2);
diff --git a/catalog/src/test/java/org/killbill/billing/catalog/CatalogTestSuiteNoDB.java b/catalog/src/test/java/org/killbill/billing/catalog/CatalogTestSuiteNoDB.java
index d121d91..00d82b5 100644
--- a/catalog/src/test/java/org/killbill/billing/catalog/CatalogTestSuiteNoDB.java
+++ b/catalog/src/test/java/org/killbill/billing/catalog/CatalogTestSuiteNoDB.java
@@ -39,6 +39,9 @@ public abstract class CatalogTestSuiteNoDB extends GuicyKillbillTestSuiteNoDB {
     @Inject
     protected CacheControllerDispatcher cacheControllerDispatcher;
 
+    @Inject
+    protected CatalogCache catalogCache;
+
     @BeforeClass(groups = "fast")
     protected void beforeClass() throws Exception {
         final Injector injector = Guice.createInjector(new TestCatalogModuleNoDB(configSource));
diff --git a/catalog/src/test/java/org/killbill/billing/catalog/MockCatalogService.java b/catalog/src/test/java/org/killbill/billing/catalog/MockCatalogService.java
index 17f65f1..59a632c 100644
--- a/catalog/src/test/java/org/killbill/billing/catalog/MockCatalogService.java
+++ b/catalog/src/test/java/org/killbill/billing/catalog/MockCatalogService.java
@@ -26,7 +26,7 @@ public class MockCatalogService extends DefaultCatalogService {
     private final MockCatalog catalog;
 
     public MockCatalogService(final MockCatalog catalog, final CacheControllerDispatcher cacheControllerDispatcher) {
-        super(null, null, cacheControllerDispatcher);
+        super(null, null);
         this.catalog = catalog;
     }
 
diff --git a/catalog/src/test/java/org/killbill/billing/catalog/TestCatalogService.java b/catalog/src/test/java/org/killbill/billing/catalog/TestCatalogService.java
index f84c056..47e7b7d 100644
--- a/catalog/src/test/java/org/killbill/billing/catalog/TestCatalogService.java
+++ b/catalog/src/test/java/org/killbill/billing/catalog/TestCatalogService.java
@@ -21,7 +21,6 @@ package org.killbill.billing.catalog;
 import org.killbill.billing.catalog.api.CatalogApiException;
 import org.killbill.billing.catalog.io.VersionedCatalogLoader;
 import org.killbill.billing.platform.api.KillbillService.ServiceException;
-import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.config.CatalogConfig;
 import org.killbill.clock.DefaultClock;
 import org.testng.Assert;
@@ -37,7 +36,7 @@ public class TestCatalogService extends CatalogTestSuiteNoDB {
                 return "file:src/test/resources/versionedCatalog";
             }
 
-        }, new VersionedCatalogLoader(new DefaultClock()), cacheControllerDispatcher);
+        }, catalogCache);
         service.loadCatalog();
         Assert.assertNotNull(service.getFullCatalog(internalCallContext));
         Assert.assertEquals(service.getFullCatalog(internalCallContext).getCatalogName(), "WeaponsHireSmall");
@@ -51,7 +50,7 @@ public class TestCatalogService extends CatalogTestSuiteNoDB {
                 return "file:src/test/resources/WeaponsHire.xml";
             }
 
-        },  new VersionedCatalogLoader(new DefaultClock()), cacheControllerDispatcher);
+        },  catalogCache);
         service.loadCatalog();
         Assert.assertNotNull(service.getFullCatalog(internalCallContext));
         Assert.assertEquals(service.getFullCatalog(internalCallContext).getCatalogName(), "Firearms");
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/CatalogResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/CatalogResource.java
index a079b44..1d16343 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/CatalogResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/CatalogResource.java
@@ -74,7 +74,6 @@ import static javax.ws.rs.core.MediaType.APPLICATION_XML;
 public class CatalogResource extends JaxRsResourceBase {
 
     private final CatalogUserApi catalogUserApi;
-    private final TenantUserApi tenantApi;
 
     // Catalog API don't quite support multiple catalogs per tenant
     private static final String catalogName = "unused";
@@ -86,13 +85,11 @@ public class CatalogResource extends JaxRsResourceBase {
                            final AuditUserApi auditUserApi,
                            final AccountUserApi accountUserApi,
                            final PaymentApi paymentApi,
-                           final TenantUserApi tenantApi,
                            final CatalogUserApi catalogUserApi,
                            final Clock clock,
                            final Context context) {
         super(uriBuilder, tagUserApi, customFieldUserApi, auditUserApi, accountUserApi, paymentApi, clock, context);
         this.catalogUserApi = catalogUserApi;
-        this.tenantApi = tenantApi;
     }
 
     @Timed
@@ -122,7 +119,7 @@ public class CatalogResource extends JaxRsResourceBase {
         XMLLoader.getObjectFromStream(new URI(JaxrsResource.CATALOG_PATH), stream, StandaloneCatalog.class);
 
         final CallContext callContext = context.createContext(createdBy, reason, comment, request);
-        tenantApi.addTenantKeyValue(TenantKey.CATALOG.toString(), catalogXML, callContext);
+        catalogUserApi.uploadCatalog(catalogXML, callContext);
         return uriBuilder.buildResponse(uriInfo, CatalogResource.class, null, null);
     }
 
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/OverdueResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/OverdueResource.java
index 18df218..03d248e 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/OverdueResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/OverdueResource.java
@@ -67,7 +67,6 @@ import static javax.ws.rs.core.MediaType.APPLICATION_XML;
 public class OverdueResource extends JaxRsResourceBase {
 
     private final OverdueApi overdueApi;
-    private final TenantUserApi tenantApi;
 
     @Inject
     public OverdueResource(final JaxrsUriBuilder uriBuilder,
@@ -77,12 +76,10 @@ public class OverdueResource extends JaxRsResourceBase {
                            final AccountUserApi accountUserApi,
                            final PaymentApi paymentApi,
                            final OverdueApi overdueApi,
-                           final TenantUserApi tenantUserApi,
                            final Clock clock,
                            final Context context) {
         super(uriBuilder, tagUserApi, customFieldUserApi, auditUserApi, accountUserApi, paymentApi, clock, context);
         this.overdueApi = overdueApi;
-        this.tenantApi = tenantUserApi;
     }
 
     @Timed
@@ -112,7 +109,7 @@ public class OverdueResource extends JaxRsResourceBase {
         XMLLoader.getObjectFromStream(new URI(JaxrsResource.OVERDUE_PATH), stream, DefaultOverdueConfig.class);
 
         final CallContext callContext = context.createContext(createdBy, reason, comment, request);
-        tenantApi.addTenantKeyValue(TenantKey.OVERDUE_CONFIG.toString(), overdueXML, callContext);
+        overdueApi.uploadOverdueConfig(overdueXML, callContext);
         return uriBuilder.buildResponse(uriInfo, OverdueResource.class, null, null);
     }
 }
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/api/DefaultOverdueApi.java b/overdue/src/main/java/org/killbill/billing/overdue/api/DefaultOverdueApi.java
index 59d5c2b..3fd75a9 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/api/DefaultOverdueApi.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/api/DefaultOverdueApi.java
@@ -22,6 +22,10 @@ import javax.inject.Inject;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.catalog.api.CatalogApiException;
 import org.killbill.billing.overdue.caching.OverdueConfigCache;
+import org.killbill.billing.tenant.api.TenantApiException;
+import org.killbill.billing.tenant.api.TenantKV.TenantKey;
+import org.killbill.billing.tenant.api.TenantUserApi;
+import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.TenantContext;
 
@@ -29,17 +33,36 @@ public class DefaultOverdueApi implements OverdueApi {
 
     private final OverdueConfigCache overdueConfigCache;
     private final InternalCallContextFactory internalCallContextFactory;
+    private final TenantUserApi tenantApi;
 
     @Inject
     public DefaultOverdueApi(final OverdueConfigCache overdueConfigCache,
+                             final TenantUserApi tenantApi,
                              final InternalCallContextFactory internalCallContextFactory) {
         this.overdueConfigCache = overdueConfigCache;
+        this.tenantApi = tenantApi;
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
     @Override
     public OverdueConfig getOverdueConfig(final TenantContext tenantContext) throws OverdueApiException {
-        final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(tenantContext);
+        final InternalTenantContext internalTenantContext = createInternalTenantContext(tenantContext);
         return overdueConfigCache.getOverdueConfig(internalTenantContext);
     }
+
+    @Override
+    public void uploadOverdueConfig(final String overdueXML, final CallContext callContext) throws OverdueApiException {
+        try {
+            final InternalTenantContext internalTenantContext = createInternalTenantContext(callContext);
+            overdueConfigCache.clearOverdueConfig(internalTenantContext);
+            tenantApi.addTenantKeyValue(TenantKey.OVERDUE_CONFIG.toString(), overdueXML, callContext);
+        } catch (TenantApiException e) {
+            throw new OverdueApiException(e);
+        }
+    }
+
+    private InternalTenantContext createInternalTenantContext(final TenantContext tenantContext) {
+        // Only tenantRecordId will be populated-- this is important to always create the (ehcache) key the same way
+        return internalCallContextFactory.createInternalTenantContext(tenantContext);
+    }
 }
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/caching/EhCacheOverdueConfigCache.java b/overdue/src/main/java/org/killbill/billing/overdue/caching/EhCacheOverdueConfigCache.java
index 8a15187..5c7542d 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/caching/EhCacheOverdueConfigCache.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/caching/EhCacheOverdueConfigCache.java
@@ -45,15 +45,15 @@ public class EhCacheOverdueConfigCache implements OverdueConfigCache {
 
     private static final Logger log = LoggerFactory.getLogger(EhCacheOverdueConfigCache.class);
 
-    private OverdueConfig defaultOverdueConfig;
-
     private final CacheController cacheController;
     private final CacheLoaderArgument cacheLoaderArgument;
 
+    private OverdueConfig defaultOverdueConfig;
+
     @Inject
     public EhCacheOverdueConfigCache(final CacheControllerDispatcher cacheControllerDispatcher) {
         this.cacheController = cacheControllerDispatcher.getCacheController(CacheType.TENANT_OVERDUE_CONFIG);
-        this.cacheLoaderArgument = initializeCacheLoaderArgument();
+        this.cacheLoaderArgument = initializeCacheLoaderArgument(this);
     }
 
     @Override
@@ -98,7 +98,16 @@ public class EhCacheOverdueConfigCache implements OverdueConfigCache {
         }
     }
 
-    private CacheLoaderArgument initializeCacheLoaderArgument() {
+    @Override
+    public void clearOverdueConfig(final InternalTenantContext tenantContext) {
+
+        if (tenantContext.getTenantRecordId() == InternalCallContextFactory.INTERNAL_TENANT_RECORD_ID) {
+            return;
+        }
+        cacheController.remove(tenantContext);
+    }
+
+    private CacheLoaderArgument initializeCacheLoaderArgument(final EhCacheOverdueConfigCache parentCache) {
         final LoaderCallback loaderCallback = new LoaderCallback() {
             @Override
             public Object loadCatalog(final String catalogXMLs) throws OverdueApiException {
@@ -112,6 +121,11 @@ public class EhCacheOverdueConfigCache implements OverdueConfigCache {
                     throw new OverdueApiException(ErrorCode.OVERDUE_INVALID_FOR_TENANT, "Problem encountered loading overdue config ", e);
                 }
             }
+
+            @Override
+            public void invalidateCache(final InternalTenantContext tenantContext) {
+                parentCache.clearOverdueConfig(tenantContext);
+            }
         };
         final Object[] args = new Object[1];
         args[0] = loaderCallback;
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/caching/OverdueConfigCache.java b/overdue/src/main/java/org/killbill/billing/overdue/caching/OverdueConfigCache.java
index 0e1a65e..1c601c9 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/caching/OverdueConfigCache.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/caching/OverdueConfigCache.java
@@ -28,4 +28,6 @@ public interface OverdueConfigCache {
     public void loadDefaultOverdueConfig(OverdueConfig config) throws OverdueApiException;
 
     public OverdueConfig getOverdueConfig(InternalTenantContext tenantContext) throws OverdueApiException;
+
+    public void clearOverdueConfig(InternalTenantContext tenantContext);
 }
diff --git a/profiles/killbill/src/main/resources/killbill-server.properties b/profiles/killbill/src/main/resources/killbill-server.properties
index c9fd394..bbcbcc7 100644
--- a/profiles/killbill/src/main/resources/killbill-server.properties
+++ b/profiles/killbill/src/main/resources/killbill-server.properties
@@ -70,3 +70,8 @@ org.killbill.server.test.mode=true
 org.killbill.payment.plugin.timeout=5s
 
 org.killbill.payment.retry.days=
+
+org.killbill.catalog.bundlePath=CatalogTranslation
+org.killbill.template.bundlePath=InvoiceTranslation
+org.killbill.template.name=HtmlInvoiceTemplate.mustache
+
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/api/DefaultTenantInternalApi.java b/tenant/src/main/java/org/killbill/billing/tenant/api/DefaultTenantInternalApi.java
index 6fed6c5..579b153 100644
--- a/tenant/src/main/java/org/killbill/billing/tenant/api/DefaultTenantInternalApi.java
+++ b/tenant/src/main/java/org/killbill/billing/tenant/api/DefaultTenantInternalApi.java
@@ -24,27 +24,46 @@ import javax.inject.Inject;
 import javax.inject.Named;
 
 import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.tenant.api.TenantCacheInvalidation.CacheInvalidationKey;
 import org.killbill.billing.tenant.api.TenantKV.TenantKey;
 import org.killbill.billing.tenant.dao.TenantDao;
 import org.killbill.billing.tenant.glue.DefaultTenantModule;
 import org.killbill.billing.util.LocaleUtils;
 
+/**
+ * This is the private API which is used to extract per tenant objects (catalog, overdue, invoice templates, ..)
+ * <p>
+ * Some of these per tenant objects are cached at a higher level in their respective modules (catalog, overdue) to
+ * avoid reconstructing the object state from the xml definition each time. As a result, the module also registers
+ * a callback which is used for the cache invalidation when the state changes and the operation occurred on a remote node.
+ * For those objects, the private api is called from the module.
+ * <p>
+ * Some others (invoice templates,...) are not cached (yet) and so the logic is simpler.
+ * <p>
+ * The api can only be used to retrieve objects where no caching is required.
+ */
 public class DefaultTenantInternalApi implements TenantInternalApi {
 
     private final TenantDao tenantDao;
+    private final TenantCacheInvalidation tenantCacheInvalidation;
+
 
     @Inject
-    public DefaultTenantInternalApi(@Named(DefaultTenantModule.NO_CACHING_TENANT) final TenantDao tenantDao) {
+    public DefaultTenantInternalApi(@Named(DefaultTenantModule.NO_CACHING_TENANT) final TenantDao tenantDao,
+                                    final TenantCacheInvalidation tenantCacheInvalidation) {
         this.tenantDao = tenantDao;
+        this.tenantCacheInvalidation = tenantCacheInvalidation;
     }
 
     @Override
-    public List<String> getTenantCatalogs(final InternalTenantContext tenantContext) {
+    public List<String> getTenantCatalogs(final InternalTenantContext tenantContext, final CacheInvalidationCallback cacheInvalidationCallback) {
+        tenantCacheInvalidation.registerCallback(new CacheInvalidationKey(tenantContext.getTenantRecordId(), TenantKey.CATALOG), cacheInvalidationCallback);
         return tenantDao.getTenantValueForKey(TenantKey.CATALOG.toString(), tenantContext);
     }
 
     @Override
-    public String getTenantOverdueConfig(final InternalTenantContext tenantContext) {
+    public String getTenantOverdueConfig(final InternalTenantContext tenantContext, final CacheInvalidationCallback cacheInvalidationCallback) {
+        tenantCacheInvalidation.registerCallback(new CacheInvalidationKey(tenantContext.getTenantRecordId(), TenantKey.OVERDUE_CONFIG), cacheInvalidationCallback);
         final List<String> values = tenantDao.getTenantValueForKey(TenantKey.OVERDUE_CONFIG.toString(), tenantContext);
         return getUniqueValue(values, "overdue config", tenantContext);
     }
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/api/DefaultTenantService.java b/tenant/src/main/java/org/killbill/billing/tenant/api/DefaultTenantService.java
index 88578f8..0077fb5 100644
--- a/tenant/src/main/java/org/killbill/billing/tenant/api/DefaultTenantService.java
+++ b/tenant/src/main/java/org/killbill/billing/tenant/api/DefaultTenantService.java
@@ -16,12 +16,39 @@
 
 package org.killbill.billing.tenant.api;
 
+import javax.inject.Inject;
+
+import org.killbill.billing.platform.api.LifecycleHandlerType;
+import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
+
 public class DefaultTenantService implements TenantService {
 
     private static final String TENANT_SERVICE_NAME = "tenant-service";
 
+    private final TenantCacheInvalidation tenantCacheInvalidation;
+
+    @Inject
+    public DefaultTenantService(final TenantCacheInvalidation tenantCacheInvalidation) {
+        this.tenantCacheInvalidation = tenantCacheInvalidation;
+    }
+
     @Override
     public String getName() {
         return TENANT_SERVICE_NAME;
     }
+
+    @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+    public void initialize() {
+        tenantCacheInvalidation.initialize();
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+    public void start() {
+        tenantCacheInvalidation.start();
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+    public void stop()  {
+        tenantCacheInvalidation.stop();
+    }
 }
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/api/TenantCacheInvalidation.java b/tenant/src/main/java/org/killbill/billing/tenant/api/TenantCacheInvalidation.java
new file mode 100644
index 0000000..ec4ef07
--- /dev/null
+++ b/tenant/src/main/java/org/killbill/billing/tenant/api/TenantCacheInvalidation.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2014 Groupon, Inc
+ * Copyright 2014 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.tenant.api;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.tenant.api.TenantInternalApi.CacheInvalidationCallback;
+import org.killbill.billing.tenant.api.TenantKV.TenantKey;
+import org.killbill.billing.tenant.dao.TenantBroadcastDao;
+import org.killbill.billing.tenant.dao.TenantBroadcastModelDao;
+import org.killbill.billing.tenant.glue.DefaultTenantModule;
+import org.killbill.billing.util.config.TenantConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages the callbacks that have been registered when per tenant objects have been inserted into the
+ * tenant_kvs store; the flow is the following (for e.g catalog):
+ * 1. CatalogUserApi is invoked to retrieve per tenant catalog
+ * 2. If cache is empty, TenantCacheLoader is invoked and uses TenantInternalApi is load the data; at that time, the invalidation callback
+ * is registered
+ * <p/>
+ * When this class initializes, it reads the current entry in the tenant_broadcasts table and from then on, keeps polling for new entries; when new
+ * entries are found, it invokes the callback to invalidate the current caching and force the TenantCacheLoader to be invoked again.
+ */
+public class TenantCacheInvalidation {
+
+    private final static int TERMINATION_TIMEOUT_SEC = 5;
+    private static final Logger logger = LoggerFactory.getLogger(TenantCacheInvalidation.class);
+
+    private final Map<CacheInvalidationKey, CacheInvalidationCallback> cache;
+    private final TenantBroadcastDao broadcastDao;
+    private final ScheduledExecutorService tenantExecutor;
+    private final TenantConfig tenantConfig;
+    private AtomicLong latestRecordIdProcessed;
+    private volatile boolean isStopped;
+
+    @Inject
+    public TenantCacheInvalidation(@Named(DefaultTenantModule.NO_CACHING_TENANT) final TenantBroadcastDao broadcastDao,
+                                   @Named(DefaultTenantModule.TENANT_EXECUTOR_NAMED) final ScheduledExecutorService tenantExecutor,
+                                   final TenantConfig tenantConfig) {
+        this.cache = new HashMap<CacheInvalidationKey, CacheInvalidationCallback>();
+        this.broadcastDao = broadcastDao;
+        this.tenantExecutor = tenantExecutor;
+        this.tenantConfig = tenantConfig;
+        this.isStopped = false;
+    }
+
+    public void initialize() {
+        final TenantBroadcastModelDao entry = broadcastDao.getLatestEntry();
+        this.latestRecordIdProcessed = entry != null ? new AtomicLong(entry.getRecordId()) : new AtomicLong(0L);
+
+    }
+
+    public void start() {
+        if (isStopped) {
+            logger.warn("TenantExecutor is in a stopped state, abort start sequence");
+            return;
+        }
+        final TimeUnit pendingRateUnit = tenantConfig.getTenantBroadcastServiceRunningRate().getUnit();
+        final long pendingPeriod = tenantConfig.getTenantBroadcastServiceRunningRate().getPeriod();
+        tenantExecutor.scheduleAtFixedRate(new TenantCacheInvalidationRunnable(this, broadcastDao), pendingPeriod, pendingPeriod, pendingRateUnit);
+
+    }
+
+    public void stop() {
+        if (isStopped) {
+            logger.warn("TenantExecutor is already in a stopped state");
+            return;
+        }
+        try {
+            tenantExecutor.shutdown();
+            boolean success = tenantExecutor.awaitTermination(TERMINATION_TIMEOUT_SEC, TimeUnit.SECONDS);
+            if (!success) {
+                logger.warn("TenantExecutor failed to complete termination within " + TERMINATION_TIMEOUT_SEC + "sec");
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.warn("TenantExecutor stop sequence got interrupted");
+        } finally {
+            isStopped = true;
+        }
+    }
+
+    public void registerCallback(final CacheInvalidationKey key, final CacheInvalidationCallback value) {
+        if (!cache.containsKey(key)) {
+            cache.put(key, value);
+        }
+    }
+
+    public CacheInvalidationCallback getCacheInvalidation(final CacheInvalidationKey key) {
+        return cache.get(key);
+    }
+
+    public AtomicLong getLatestRecordIdProcessed() {
+        return latestRecordIdProcessed;
+    }
+
+    public boolean isStopped() {
+        return isStopped;
+    }
+
+    public void setLatestRecordIdProcessed(final Long newProcessedRecordId) {
+        this.latestRecordIdProcessed.set(newProcessedRecordId);
+    }
+
+    public static class TenantCacheInvalidationRunnable implements Runnable {
+
+        private final TenantCacheInvalidation parent;
+        private final TenantBroadcastDao broadcastDao;
+
+        public TenantCacheInvalidationRunnable(final TenantCacheInvalidation parent,
+                                               final TenantBroadcastDao broadcastDao) {
+            this.parent = parent;
+            this.broadcastDao = broadcastDao;
+        }
+
+        @Override
+        public void run() {
+            if (parent.isStopped) {
+                return;
+            }
+            final List<TenantBroadcastModelDao> entries = broadcastDao.getLatestEntriesFrom(parent.getLatestRecordIdProcessed().get());
+            for (TenantBroadcastModelDao cur : entries) {
+                if (parent.isStopped()) {
+                    return;
+                }
+
+                final CacheInvalidationKey key = new CacheInvalidationKey(cur.getTenantRecordId(), TenantKey.valueOf(cur.getType()));
+                final CacheInvalidationCallback callback = parent.getCacheInvalidation(key);
+                if (callback != null) {
+                    final InternalTenantContext tenantContext = new InternalTenantContext(cur.getTenantRecordId(), null);
+                    callback.invalidateCache(tenantContext);
+                }
+                parent.setLatestRecordIdProcessed(cur.getRecordId());
+            }
+        }
+    }
+
+    public static final class CacheInvalidationKey {
+
+        private final Long tenantRecordId;
+        private final TenantKey type;
+
+        public CacheInvalidationKey(final Long tenantRecordId, final TenantKey type) {
+            this.tenantRecordId = tenantRecordId;
+            this.type = type;
+        }
+
+        public Long getTenantRecordId() {
+            return tenantRecordId;
+        }
+
+        public TenantKey getType() {
+            return type;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof CacheInvalidationKey)) {
+                return false;
+            }
+
+            final CacheInvalidationKey that = (CacheInvalidationKey) o;
+            if (tenantRecordId != null ? !tenantRecordId.equals(that.tenantRecordId) : that.tenantRecordId != null) {
+                return false;
+            }
+            if (type != that.type) {
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = tenantRecordId != null ? tenantRecordId.hashCode() : 0;
+            result = 31 * result + (type != null ? type.hashCode() : 0);
+            return result;
+        }
+    }
+}
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/dao/DefaultTenantBroadcastDao.java b/tenant/src/main/java/org/killbill/billing/tenant/dao/DefaultTenantBroadcastDao.java
new file mode 100644
index 0000000..0ac2964
--- /dev/null
+++ b/tenant/src/main/java/org/killbill/billing/tenant/dao/DefaultTenantBroadcastDao.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2014 Groupon, Inc
+ * Copyright 2014 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.tenant.dao;
+
+import java.util.List;
+
+import javax.inject.Inject;
+
+import org.killbill.billing.callcontext.InternalCallContext;
+import org.killbill.billing.tenant.api.TenantApiException;
+import org.killbill.billing.util.cache.CacheControllerDispatcher;
+import org.killbill.billing.util.dao.NonEntityDao;
+import org.killbill.billing.util.entity.Entity;
+import org.killbill.billing.util.entity.dao.EntityDaoBase;
+import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
+import org.killbill.clock.Clock;
+import org.skife.jdbi.v2.IDBI;
+
+
+public class DefaultTenantBroadcastDao extends EntityDaoBase<TenantBroadcastModelDao, Entity, TenantApiException> implements TenantBroadcastDao {
+
+    @Inject
+    public DefaultTenantBroadcastDao(final IDBI dbi, final Clock clock, final CacheControllerDispatcher cacheControllerDispatcher, final NonEntityDao nonEntityDao) {
+        super(new EntitySqlDaoTransactionalJdbiWrapper(dbi, clock, cacheControllerDispatcher, nonEntityDao), TenantBroadcastSqlDao.class);
+    }
+
+    @Override
+    protected TenantApiException generateAlreadyExistsException(final TenantBroadcastModelDao entity, final InternalCallContext context) {
+        // STEPH generateAlreadyExistsException
+        return null;
+    }
+
+    @Override
+    public List<TenantBroadcastModelDao> getLatestEntriesFrom(final Long recordId) {
+        throw new IllegalStateException("Not implemented by DefaultTenantBroadcastDao");
+    }
+
+    @Override
+    public TenantBroadcastModelDao getLatestEntry() {
+        throw new IllegalStateException("Not implemented by DefaultTenantBroadcastDao");
+    }
+}
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/dao/DefaultTenantDao.java b/tenant/src/main/java/org/killbill/billing/tenant/dao/DefaultTenantDao.java
index 85350ef..8ad4dfe 100644
--- a/tenant/src/main/java/org/killbill/billing/tenant/dao/DefaultTenantDao.java
+++ b/tenant/src/main/java/org/killbill/billing/tenant/dao/DefaultTenantDao.java
@@ -26,6 +26,8 @@ import org.apache.shiro.crypto.RandomNumberGenerator;
 import org.apache.shiro.crypto.SecureRandomNumberGenerator;
 import org.apache.shiro.crypto.hash.SimpleHash;
 import org.apache.shiro.util.ByteSource;
+import org.killbill.billing.entity.EntityPersistenceException;
+import org.killbill.billing.tenant.api.TenantKV.TenantKey;
 import org.skife.jdbi.v2.IDBI;
 
 import org.killbill.billing.ErrorCode;
@@ -131,11 +133,13 @@ public class DefaultTenantDao extends EntityDaoBase<TenantModelDao, Tenant, Tena
             public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
                 final TenantKVModelDao tenantKVModelDao = new TenantKVModelDao(UUID.randomUUID(), context.getCreatedDate(), context.getUpdatedDate(), key, value);
                 entitySqlDaoWrapperFactory.become(TenantKVSqlDao.class).create(tenantKVModelDao, context);
+                broadcastConfigurationChangeFromTransaction(entitySqlDaoWrapperFactory, key, context);
                 return null;
             }
         });
     }
 
+
     @Override
     public void deleteTenantKey(final String key, final InternalCallContext context) {
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@@ -151,4 +155,13 @@ public class DefaultTenantDao extends EntityDaoBase<TenantModelDao, Tenant, Tena
             }
         });
     }
+
+    private void broadcastConfigurationChangeFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory,
+                                                             final String key, final InternalCallContext context) throws EntityPersistenceException {
+        if (key.equals(TenantKey.CATALOG.toString()) ||
+            key.equals(TenantKey.OVERDUE_CONFIG.toString())) {
+            final TenantBroadcastModelDao broadcast = new TenantBroadcastModelDao(key);
+            entitySqlDaoWrapperFactory.become(TenantBroadcastSqlDao.class).create(broadcast, context);
+        }
+    }
 }
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/dao/NoCachingTenantBroadcastDao.java b/tenant/src/main/java/org/killbill/billing/tenant/dao/NoCachingTenantBroadcastDao.java
new file mode 100644
index 0000000..ed1c6dc
--- /dev/null
+++ b/tenant/src/main/java/org/killbill/billing/tenant/dao/NoCachingTenantBroadcastDao.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2014 Groupon, Inc
+ * Copyright 2014 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.tenant.dao;
+
+import java.util.List;
+import java.util.UUID;
+
+import javax.inject.Inject;
+
+import org.killbill.billing.callcontext.InternalCallContext;
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.tenant.api.TenantApiException;
+import org.killbill.billing.util.entity.Entity;
+import org.killbill.billing.util.entity.Pagination;
+import org.killbill.billing.util.entity.dao.EntityDaoBase;
+import org.killbill.billing.util.entity.dao.EntitySqlDao;
+import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
+import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
+import org.killbill.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import org.killbill.clock.Clock;
+import org.skife.jdbi.v2.IDBI;
+
+public class NoCachingTenantBroadcastDao extends EntityDaoBase<TenantBroadcastModelDao, Entity, TenantApiException> implements TenantBroadcastDao {
+
+    @Inject
+    public NoCachingTenantBroadcastDao(final IDBI dbi, final Clock clock) {
+        super(new EntitySqlDaoTransactionalJdbiWrapper(dbi, clock, null, null), TenantBroadcastSqlDao.class);
+    }
+
+    @Override
+    public void create(final TenantBroadcastModelDao entity, final InternalCallContext context) throws TenantApiException {
+        throw new IllegalStateException("Not implemented by NoCachingTenantBroadcastDao");
+    }
+
+    @Override
+    protected TenantApiException generateAlreadyExistsException(final TenantBroadcastModelDao entity, final InternalCallContext context) {
+        throw new IllegalStateException("Not implemented by NoCachingTenantBroadcastDao");
+    }
+
+    @Override
+    public Long getRecordId(final UUID id, final InternalTenantContext context) {
+        throw new IllegalStateException("Not implemented by NoCachingTenantBroadcastDao");
+    }
+
+    @Override
+    public TenantBroadcastModelDao getByRecordId(final Long recordId, final InternalTenantContext context) {
+        throw new IllegalStateException("Not implemented by NoCachingTenantBroadcastDao");
+    }
+
+    @Override
+    public TenantBroadcastModelDao getById(final UUID id, final InternalTenantContext context) {
+        throw new IllegalStateException("Not implemented by NoCachingTenantBroadcastDao");
+    }
+
+    @Override
+    public Pagination<TenantBroadcastModelDao> getAll(final InternalTenantContext context) {
+        throw new IllegalStateException("Not implemented by NoCachingTenantBroadcastDao");
+    }
+
+    @Override
+    public Pagination<TenantBroadcastModelDao> get(final Long offset, final Long limit, final InternalTenantContext context) {
+        throw new IllegalStateException("Not implemented by NoCachingTenantBroadcastDao");
+    }
+
+    @Override
+    public Long getCount(final InternalTenantContext context) {
+        throw new IllegalStateException("Not implemented by NoCachingTenantBroadcastDao");
+    }
+
+    @Override
+    public void test(final InternalTenantContext context) {
+        throw new IllegalStateException("Not implemented by NoCachingTenantBroadcastDao");
+    }
+
+    @Override
+    public List<TenantBroadcastModelDao> getLatestEntriesFrom(final Long recordId) {
+        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<TenantBroadcastModelDao>>() {
+            @Override
+            public List<TenantBroadcastModelDao> inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+                return entitySqlDaoWrapperFactory.become(TenantBroadcastSqlDao.class).getLatestEntries(recordId);
+            }
+        });
+    }
+
+    @Override
+    public TenantBroadcastModelDao getLatestEntry() {
+        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<TenantBroadcastModelDao>() {
+            @Override
+            public TenantBroadcastModelDao inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+                return entitySqlDaoWrapperFactory.become(TenantBroadcastSqlDao.class).getLatestEntry();
+            }
+        });
+    }
+}
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantBroadcastDao.java b/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantBroadcastDao.java
new file mode 100644
index 0000000..7ce67ad
--- /dev/null
+++ b/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantBroadcastDao.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2014 Groupon, Inc
+ * Copyright 2014 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.tenant.dao;
+
+import java.util.List;
+
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.tenant.api.TenantApiException;
+import org.killbill.billing.util.entity.Entity;
+import org.killbill.billing.util.entity.dao.EntityDao;
+
+public interface TenantBroadcastDao extends EntityDao<TenantBroadcastModelDao, Entity, TenantApiException> {
+
+    public List<TenantBroadcastModelDao> getLatestEntriesFrom(final Long recordId);
+
+    public TenantBroadcastModelDao getLatestEntry();
+}
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantBroadcastModelDao.java b/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantBroadcastModelDao.java
new file mode 100644
index 0000000..55ad9f9
--- /dev/null
+++ b/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantBroadcastModelDao.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2014 Groupon, Inc
+ * Copyright 2014 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.tenant.dao;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.util.dao.TableName;
+import org.killbill.billing.util.entity.Entity;
+import org.killbill.billing.util.entity.dao.EntityModelDao;
+import org.killbill.billing.util.entity.dao.EntityModelDaoBase;
+
+public class TenantBroadcastModelDao extends EntityModelDaoBase implements EntityModelDao<Entity> {
+
+    private String type;
+
+    public TenantBroadcastModelDao() { /* For the DAO mapper */ }
+
+    public TenantBroadcastModelDao(final String type) {
+        this(UUID.randomUUID(), null, null, type);
+    }
+
+    public TenantBroadcastModelDao(final UUID id, final DateTime createdDate, final DateTime updatedDate,
+                                   final String type) {
+        super(id, createdDate, updatedDate);
+        this.type = type;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(final String type) {
+        this.type = type;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof TenantBroadcastModelDao)) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+
+        final TenantBroadcastModelDao that = (TenantBroadcastModelDao) o;
+
+        if (type != null ? !type.equals(that.type) : that.type != null) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + (type != null ? type.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public TableName getTableName() {
+        return TableName.TENANT_BROADCASTS;
+    }
+
+    @Override
+    public TableName getHistoryTableName() {
+        return null;
+    }
+}
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantBroadcastSqlDao.java b/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantBroadcastSqlDao.java
new file mode 100644
index 0000000..1925151
--- /dev/null
+++ b/tenant/src/main/java/org/killbill/billing/tenant/dao/TenantBroadcastSqlDao.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2014 Groupon, Inc
+ * Copyright 2014 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.tenant.dao;
+
+import java.util.List;
+
+import org.killbill.billing.util.entity.Entity;
+import org.killbill.billing.util.entity.dao.EntitySqlDao;
+import org.killbill.billing.util.entity.dao.EntitySqlDaoStringTemplate;
+import org.skife.jdbi.v2.sqlobject.Bind;
+import org.skife.jdbi.v2.sqlobject.SqlQuery;
+
+@EntitySqlDaoStringTemplate
+public interface TenantBroadcastSqlDao extends EntitySqlDao<TenantBroadcastModelDao, Entity> {
+
+    @SqlQuery
+    public List<TenantBroadcastModelDao> getLatestEntries(@Bind("recordId") final Long recordId);
+
+    @SqlQuery
+    public TenantBroadcastModelDao getLatestEntry();
+
+}
diff --git a/tenant/src/main/java/org/killbill/billing/tenant/glue/DefaultTenantModule.java b/tenant/src/main/java/org/killbill/billing/tenant/glue/DefaultTenantModule.java
index 4df178b..479cf18 100644
--- a/tenant/src/main/java/org/killbill/billing/tenant/glue/DefaultTenantModule.java
+++ b/tenant/src/main/java/org/killbill/billing/tenant/glue/DefaultTenantModule.java
@@ -18,35 +18,50 @@
 
 package org.killbill.billing.tenant.glue;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.killbill.billing.glue.TenantModule;
 import org.killbill.billing.platform.api.KillbillConfigSource;
 import org.killbill.billing.tenant.api.DefaultTenantInternalApi;
 import org.killbill.billing.tenant.api.DefaultTenantService;
+import org.killbill.billing.tenant.api.TenantCacheInvalidation;
 import org.killbill.billing.tenant.api.TenantInternalApi;
 import org.killbill.billing.tenant.api.TenantService;
 import org.killbill.billing.tenant.api.TenantUserApi;
 import org.killbill.billing.tenant.api.user.DefaultTenantUserApi;
+import org.killbill.billing.tenant.dao.DefaultTenantBroadcastDao;
 import org.killbill.billing.tenant.dao.DefaultTenantDao;
+import org.killbill.billing.tenant.dao.NoCachingTenantBroadcastDao;
 import org.killbill.billing.tenant.dao.NoCachingTenantDao;
+import org.killbill.billing.tenant.dao.TenantBroadcastDao;
 import org.killbill.billing.tenant.dao.TenantDao;
+import org.killbill.billing.util.config.TenantConfig;
 import org.killbill.billing.util.glue.KillBillModule;
+import org.skife.config.ConfigurationObjectFactory;
 
 import com.google.inject.name.Names;
 
-public class DefaultTenantModule  extends KillBillModule implements TenantModule {
+public class DefaultTenantModule extends KillBillModule implements TenantModule {
 
     public static final String NO_CACHING_TENANT = "NoCachingTenant";
 
+    public static final String TENANT_EXECUTOR_NAMED = "TenantExecutor";
+
     public DefaultTenantModule(final KillbillConfigSource configSource) {
         super(configSource);
     }
 
     private void installConfig() {
+        final ConfigurationObjectFactory factory = new ConfigurationObjectFactory(skifeConfigSource);
+        final TenantConfig tenantConfig = factory.build(TenantConfig.class);
+        bind(TenantConfig.class).toInstance(tenantConfig);
     }
 
     public void installTenantDao() {
         bind(TenantDao.class).to(DefaultTenantDao.class).asEagerSingleton();
         bind(TenantDao.class).annotatedWith(Names.named(NO_CACHING_TENANT)).to(NoCachingTenantDao.class).asEagerSingleton();
+        bind(TenantBroadcastDao.class).to(DefaultTenantBroadcastDao.class).asEagerSingleton();
+        bind(TenantBroadcastDao.class).annotatedWith(Names.named(NO_CACHING_TENANT)).to(NoCachingTenantBroadcastDao.class).asEagerSingleton();
     }
 
     public void installTenantUserApi() {
@@ -58,11 +73,22 @@ public class DefaultTenantModule  extends KillBillModule implements TenantModule
         bind(TenantService.class).to(DefaultTenantService.class).asEagerSingleton();
     }
 
+    public void installTenantCacheInvalidation() {
+        bind(TenantCacheInvalidation.class).asEagerSingleton();
+    }
+
+    protected void installExecutor() {
+        final ScheduledExecutorService tenantExecutor = org.killbill.commons.concurrent.Executors.newSingleThreadScheduledExecutor("TenantExecutor");
+        bind(ScheduledExecutorService.class).annotatedWith(Names.named(TENANT_EXECUTOR_NAMED)).toInstance(tenantExecutor);
+    }
+
     @Override
     protected void configure() {
         installConfig();
         installTenantDao();
         installTenantService();
         installTenantUserApi();
+        installTenantCacheInvalidation();
+        installExecutor();
     }
 }
diff --git a/tenant/src/main/resources/org/killbill/billing/tenant/dao/TenantBroadcastSqlDao.sql.stg b/tenant/src/main/resources/org/killbill/billing/tenant/dao/TenantBroadcastSqlDao.sql.stg
new file mode 100644
index 0000000..60def50
--- /dev/null
+++ b/tenant/src/main/resources/org/killbill/billing/tenant/dao/TenantBroadcastSqlDao.sql.stg
@@ -0,0 +1,42 @@
+group TenantBroadcastSqlDao: EntitySqlDao;
+
+tableName() ::= "tenant_broadcasts"
+
+tableFields(prefix) ::= <<
+  <prefix>type
+, <prefix>created_date
+, <prefix>created_by
+, <prefix>updated_date
+, <prefix>updated_by
+>>
+
+tableValues() ::= <<
+ :type
+, :createdDate
+, :createdBy
+, :updatedDate
+, :updatedBy
+>>
+
+/* No account_record_id field */
+accountRecordIdFieldWithComma(prefix) ::= ""
+accountRecordIdValueWithComma(prefix) ::= ""
+
+
+getLatestEntries() ::= <<
+select
+  <allTableFields("t.")>
+from <tableName()> t
+where record_id > :recordId
+<defaultOrderBy("t.")>
+;
+>>
+
+
+getLatestEntry() ::= <<
+select
+  <allTableFields("t.")>
+from <tableName()> t
+where record_id = (select max(record_id) from <tableName()>)
+;
+>>
diff --git a/tenant/src/main/resources/org/killbill/billing/tenant/ddl.sql b/tenant/src/main/resources/org/killbill/billing/tenant/ddl.sql
index a95ec39..6214c10 100644
--- a/tenant/src/main/resources/org/killbill/billing/tenant/ddl.sql
+++ b/tenant/src/main/resources/org/killbill/billing/tenant/ddl.sql
@@ -22,7 +22,7 @@ DROP TABLE IF EXISTS tenant_kvs;
 CREATE TABLE tenant_kvs (
    record_id int(11) unsigned NOT NULL AUTO_INCREMENT,
    id char(36) NOT NULL,
-   tenant_record_id int(11) unsigned default null,
+   tenant_record_id int(11) unsigned NOT NULL,
    tenant_key varchar(256) NOT NULL,
    tenant_value mediumtext NOT NULL,
    is_active bool DEFAULT 1,
@@ -33,3 +33,18 @@ CREATE TABLE tenant_kvs (
    PRIMARY KEY(record_id)
 ) /*! CHARACTER SET utf8 COLLATE utf8_bin */;
 CREATE INDEX tenant_kvs_key ON tenant_kvs(tenant_key);
+
+
+DROP TABLE IF EXISTS tenant_broadcasts;
+CREATE TABLE tenant_broadcasts (
+   record_id int(11) unsigned NOT NULL AUTO_INCREMENT,
+   id char(36) NOT NULL,
+   tenant_record_id int(11) unsigned NOT NULL,
+   type varchar(64) NOT NULL,
+   created_date datetime NOT NULL,
+   created_by varchar(50) NOT NULL,
+   updated_date datetime DEFAULT NULL,
+   updated_by varchar(50) DEFAULT NULL,
+   PRIMARY KEY(record_id)
+) /*! CHARACTER SET utf8 COLLATE utf8_bin */;
+CREATE INDEX tenant_broadcasts_key ON tenant_broadcasts(tenant_record_id);
\ No newline at end of file
diff --git a/tenant/src/test/java/org/killbill/billing/tenant/dao/TestNoCachingTenantBroadcastDao.java b/tenant/src/test/java/org/killbill/billing/tenant/dao/TestNoCachingTenantBroadcastDao.java
new file mode 100644
index 0000000..abab8da
--- /dev/null
+++ b/tenant/src/test/java/org/killbill/billing/tenant/dao/TestNoCachingTenantBroadcastDao.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2014 Groupon, Inc
+ * Copyright 2014 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.tenant.dao;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.killbill.billing.callcontext.InternalCallContext;
+import org.killbill.billing.tenant.TenantTestSuiteWithEmbeddedDb;
+import org.killbill.billing.util.callcontext.CallOrigin;
+import org.killbill.billing.util.callcontext.UserType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestNoCachingTenantBroadcastDao extends TenantTestSuiteWithEmbeddedDb {
+
+    @Test(groups = "slow")
+    public void testBasic() throws Exception {
+        final TenantBroadcastModelDao model = new TenantBroadcastModelDao("foo");
+
+        final InternalCallContext context79 = createContext(79L);
+        tenantBroadcastDao.create(model, context79);
+
+        final TenantBroadcastModelDao result1 = tenantBroadcastDao.getById(model.getId(), context79);
+        Assert.assertEquals(result1.getTenantRecordId(), new Long(79L));
+        Assert.assertEquals(result1.getType(), "foo");
+
+        final TenantBroadcastModelDao resultNull = tenantBroadcastDao.getById(model.getId(), internalCallContext);
+        Assert.assertNull(resultNull);
+
+        final TenantBroadcastModelDao result2 = noCachingTenantBroadcastDao.getLatestEntry();
+        Assert.assertEquals(result2.getTenantRecordId(), new Long(79L));
+        Assert.assertEquals(result2.getType(), "foo");
+    }
+
+    @Test(groups = "slow")
+    public void testLatestEntries() throws Exception {
+
+        final InternalCallContext context79 = createContext(81L);
+        TenantBroadcastModelDao latestInsert = null;
+        for (int i = 0; i < 100; i++) {
+            final TenantBroadcastModelDao model = new TenantBroadcastModelDao("foo-" + i);
+            tenantBroadcastDao.create(model, context79);
+            latestInsert = model;
+        }
+        final TenantBroadcastModelDao latestInsertRefreshed = tenantBroadcastDao.getById(latestInsert.getId(), context79);
+        final TenantBroadcastModelDao lastEntry = noCachingTenantBroadcastDao.getLatestEntry();
+
+        Assert.assertEquals(lastEntry.getRecordId(), latestInsertRefreshed.getRecordId());
+
+        final int expectedEntries = 25;
+        final Long fromRecordId = lastEntry.getRecordId() - expectedEntries;
+        final List<TenantBroadcastModelDao> result = noCachingTenantBroadcastDao.getLatestEntriesFrom(fromRecordId);
+        Assert.assertEquals(result.size(), expectedEntries);
+
+        long i = 0;
+        for (TenantBroadcastModelDao cur : result) {
+            Assert.assertEquals(cur.getRecordId().longValue(), (fromRecordId + i++ + 1L));
+        }
+
+    }
+
+    private InternalCallContext createContext(final Long tenantRecordId) {
+        return new InternalCallContext(tenantRecordId, 0L, UUID.randomUUID(),
+                                       UUID.randomUUID().toString(), CallOrigin.TEST,
+                                       UserType.TEST, "Testing TestNoCachingTenantBroadcastDao", "This is a test for TestNoCachingTenantBroadcastDao",
+                                       clock.getUTCNow(), clock.getUTCNow());
+
+    }
+}
diff --git a/tenant/src/test/java/org/killbill/billing/tenant/TenantTestSuiteWithEmbeddedDb.java b/tenant/src/test/java/org/killbill/billing/tenant/TenantTestSuiteWithEmbeddedDb.java
index 25ded5e..2a9429d 100644
--- a/tenant/src/test/java/org/killbill/billing/tenant/TenantTestSuiteWithEmbeddedDb.java
+++ b/tenant/src/test/java/org/killbill/billing/tenant/TenantTestSuiteWithEmbeddedDb.java
@@ -16,6 +16,11 @@
 
 package org.killbill.billing.tenant;
 
+import javax.inject.Named;
+
+import org.killbill.billing.tenant.dao.NoCachingTenantBroadcastDao;
+import org.killbill.billing.tenant.dao.TenantBroadcastDao;
+import org.killbill.billing.tenant.glue.DefaultTenantModule;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -33,6 +38,13 @@ public class TenantTestSuiteWithEmbeddedDb extends GuicyKillbillTestSuiteWithEmb
     @Inject
     protected DefaultTenantDao tenantDao;
 
+    @Named(DefaultTenantModule.NO_CACHING_TENANT)
+    @Inject
+    protected TenantBroadcastDao noCachingTenantBroadcastDao;
+
+    @Inject
+    protected TenantBroadcastDao tenantBroadcastDao;
+
     @BeforeClass(groups = "slow")
     protected void beforeClass() throws Exception {
         final Injector injector = Guice.createInjector(new TestTenantModuleWithEmbeddedDB(configSource));
diff --git a/util/src/main/java/org/killbill/billing/util/cache/TenantCatalogCacheLoader.java b/util/src/main/java/org/killbill/billing/util/cache/TenantCatalogCacheLoader.java
index 63017dc..cd82a27 100644
--- a/util/src/main/java/org/killbill/billing/util/cache/TenantCatalogCacheLoader.java
+++ b/util/src/main/java/org/killbill/billing/util/cache/TenantCatalogCacheLoader.java
@@ -25,6 +25,7 @@ import javax.inject.Singleton;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.catalog.api.CatalogApiException;
 import org.killbill.billing.tenant.api.TenantInternalApi;
+import org.killbill.billing.tenant.api.TenantInternalApi.CacheInvalidationCallback;
 import org.killbill.billing.util.cache.Cachable.CacheType;
 
 @Singleton
@@ -62,8 +63,7 @@ public class TenantCatalogCacheLoader extends BaseCacheLoader {
         }
 
         final LoaderCallback callback = (LoaderCallback) cacheLoaderArgument.getArgs()[0];
-
-        final List<String> catalogXMLs = tenantApi.getTenantCatalogs(internalTenantContext);
+        final List<String> catalogXMLs = tenantApi.getTenantCatalogs(internalTenantContext, (CacheInvalidationCallback) callback);
         if (catalogXMLs.isEmpty()) {
             return null;
         }
@@ -75,7 +75,7 @@ public class TenantCatalogCacheLoader extends BaseCacheLoader {
         }
     }
 
-    public interface LoaderCallback {
+    public interface LoaderCallback extends CacheInvalidationCallback {
         public Object loadCatalog(final List<String> catalogXMLs) throws CatalogApiException;
     }
 }
diff --git a/util/src/main/java/org/killbill/billing/util/cache/TenantOverdueConfigCacheLoader.java b/util/src/main/java/org/killbill/billing/util/cache/TenantOverdueConfigCacheLoader.java
index 3665bc0..edcf7a4 100644
--- a/util/src/main/java/org/killbill/billing/util/cache/TenantOverdueConfigCacheLoader.java
+++ b/util/src/main/java/org/killbill/billing/util/cache/TenantOverdueConfigCacheLoader.java
@@ -23,6 +23,7 @@ import javax.inject.Singleton;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.overdue.api.OverdueApiException;
 import org.killbill.billing.tenant.api.TenantInternalApi;
+import org.killbill.billing.tenant.api.TenantInternalApi.CacheInvalidationCallback;
 import org.killbill.billing.util.cache.Cachable.CacheType;
 
 @Singleton
@@ -60,8 +61,7 @@ public class TenantOverdueConfigCacheLoader extends BaseCacheLoader {
         }
 
         final LoaderCallback callback = (LoaderCallback) cacheLoaderArgument.getArgs()[0];
-
-        final String overdueXML = tenantApi.getTenantOverdueConfig(internalTenantContext);
+        final String overdueXML = tenantApi.getTenantOverdueConfig(internalTenantContext, (CacheInvalidationCallback) callback);
         if (overdueXML == null) {
             return null;
         }
@@ -73,8 +73,7 @@ public class TenantOverdueConfigCacheLoader extends BaseCacheLoader {
         }
     }
 
-    public interface LoaderCallback {
-
+    public interface LoaderCallback extends CacheInvalidationCallback {
         public Object loadCatalog(final String overdueXML) throws OverdueApiException;
     }
 }
diff --git a/util/src/main/java/org/killbill/billing/util/config/TenantConfig.java b/util/src/main/java/org/killbill/billing/util/config/TenantConfig.java
new file mode 100644
index 0000000..48374c0
--- /dev/null
+++ b/util/src/main/java/org/killbill/billing/util/config/TenantConfig.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2014 Groupon, Inc
+ * Copyright 2014 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.config;
+
+import org.skife.config.Config;
+import org.skife.config.Default;
+import org.skife.config.Description;
+import org.skife.config.TimeSpan;
+
+public interface TenantConfig extends KillbillConfig {
+
+    @Config("org.killbill.tenant.broadcast.rate")
+    @Default("5s")
+    @Description("Rate at which janitor tasks are scheduled")
+    public TimeSpan getTenantBroadcastServiceRunningRate();
+
+}
diff --git a/util/src/main/java/org/killbill/billing/util/dao/TableName.java b/util/src/main/java/org/killbill/billing/util/dao/TableName.java
index a7edf11..44b28d2 100644
--- a/util/src/main/java/org/killbill/billing/util/dao/TableName.java
+++ b/util/src/main/java/org/killbill/billing/util/dao/TableName.java
@@ -52,6 +52,7 @@ public enum TableName {
     TAG_HISTORY("tag_history"),
     TENANT("tenants", ObjectType.TENANT),
     TENANT_KVS("tenant_kvs", ObjectType.TENANT_KVS),
+    TENANT_BROADCASTS("tenant_broadcasts"),
     TAG("tags", ObjectType.TAG, TAG_HISTORY),
     ROLLED_UP_USAGE("rolled_up_usage");