killbill-aplcache

Details

diff --git a/api/src/main/java/org/killbill/billing/events/BroadcastInternalEvent.java b/api/src/main/java/org/killbill/billing/events/BroadcastInternalEvent.java
new file mode 100644
index 0000000..4fdb90e
--- /dev/null
+++ b/api/src/main/java/org/killbill/billing/events/BroadcastInternalEvent.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.events;
+
+public interface BroadcastInternalEvent extends BusInternalEvent {
+    public String getServiceName();
+    public String getType();
+    public String getJsonEvent();
+}
diff --git a/api/src/main/java/org/killbill/billing/events/BusInternalEvent.java b/api/src/main/java/org/killbill/billing/events/BusInternalEvent.java
index b756b7a..7802660 100644
--- a/api/src/main/java/org/killbill/billing/events/BusInternalEvent.java
+++ b/api/src/main/java/org/killbill/billing/events/BusInternalEvent.java
@@ -25,6 +25,7 @@ public interface BusInternalEvent extends BusEvent {
         ACCOUNT_CHANGE,
         ACCOUNT_CREATE,
         BLOCKING_STATE,
+        BROADCAST_SERVICE,
         BUNDLE_REPAIR,
         CONTROL_TAGDEFINITION_CREATION,
         CONTROL_TAGDEFINITION_DELETION,
diff --git a/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/BeatrixListener.java b/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/BeatrixListener.java
index 33d9c68..16b5b66 100644
--- a/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/BeatrixListener.java
+++ b/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/BeatrixListener.java
@@ -31,6 +31,7 @@ import org.killbill.billing.entitlement.api.BlockingStateType;
 import org.killbill.billing.events.AccountChangeInternalEvent;
 import org.killbill.billing.events.AccountCreationInternalEvent;
 import org.killbill.billing.events.BlockingTransitionInternalEvent;
+import org.killbill.billing.events.BroadcastInternalEvent;
 import org.killbill.billing.events.BusInternalEvent;
 import org.killbill.billing.events.BusInternalEvent.BusInternalEventType;
 import org.killbill.billing.events.ControlTagCreationInternalEvent;
@@ -51,6 +52,7 @@ import org.killbill.billing.events.TenantConfigDeletionInternalEvent;
 import org.killbill.billing.events.UserTagCreationInternalEvent;
 import org.killbill.billing.events.UserTagDeletionInternalEvent;
 import org.killbill.billing.lifecycle.glue.BusModule;
+import org.killbill.billing.notification.plugin.api.BroadcastMetadata;
 import org.killbill.billing.notification.plugin.api.ExtBusEventType;
 import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
 import org.killbill.billing.util.callcontext.CallOrigin;
@@ -63,6 +65,7 @@ import org.killbill.bus.api.PersistentBus.EventBusException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.datatype.joda.JodaModule;
@@ -99,10 +102,12 @@ public class BeatrixListener {
             }
         } catch (final EventBusException e) {
             log.warn("Failed to dispatch external bus events", e);
+        } catch (JsonProcessingException e) {
+            log.warn("Failed to dispatch external bus events", e);
         }
     }
 
-    private BusEvent computeExtBusEventEntryFromBusInternalEvent(final BusInternalEvent event, final InternalCallContext context) {
+    private BusEvent computeExtBusEventEntryFromBusInternalEvent(final BusInternalEvent event, final InternalCallContext context) throws JsonProcessingException {
         ObjectType objectType = null;
         UUID objectId = null;
         ExtBusEventType eventBusType = null;
@@ -278,6 +283,15 @@ public class BeatrixListener {
                 metaData = realTenantConfigEventDel.getKey();
                 break;
 
+            case BROADCAST_SERVICE:
+                final BroadcastInternalEvent realBroadcastEvent = (BroadcastInternalEvent) event;
+                objectType = ObjectType.SERVICE_BROADCAST;
+                objectId = null;
+                eventBusType = ExtBusEventType.BROADCAST_SERVICE;
+                final BroadcastMetadata metaDataObj = new BroadcastMetadata(realBroadcastEvent.getServiceName(), realBroadcastEvent.getType(), realBroadcastEvent.getJsonEvent());
+                metaData = objectMapper.writeValueAsString(metaDataObj);
+                break;
+
             default:
         }
 
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/extbus/TestEventJson.java b/beatrix/src/test/java/org/killbill/billing/beatrix/extbus/TestEventJson.java
index ac5be0f..8238496 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/extbus/TestEventJson.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/extbus/TestEventJson.java
@@ -60,4 +60,16 @@ public class TestEventJson extends BeatrixTestSuite {
         Assert.assertTrue(obj.getObjectId().equals(UUID.fromString("273ff2ed-5442-4d10-971f-3cc2414fe33b")));
         Assert.assertNull(obj.getMetaData());
     }
+
+
+    @Test(groups = "fast")
+    public void testBusExternalEventWithJsonMetadata() throws Exception {
+        final String eventWithJsonMetadata = "{\"objectId\":null,\"objectType\":\"SERVICE_BROADCAST\",\"eventType\":\"BROADCAST_SERVICE\",\"accountId\":null,\"tenantId\":null,\"metaData\":\"{\\\"service\\\":\\\"nodes-service\\\",\\\"commandType\\\":\\\"START_PLUGIN\\\",\\\"eventJson\\\":\\\"{\\\\\\\"pluginName\\\\\\\":\\\\\\\"pluginName\\\\\\\",\\\\\\\"pluginVersion\\\\\\\":\\\\\\\"4.5.6\\\\\\\",\\\\\\\"properties\\\\\\\":[{\\\\\\\"key\\\\\\\":\\\\\\\"key\\\\\\\",\\\\\\\"value\\\\\\\":\\\\\\\"value\\\\\\\"}]}\\\"}\"}";
+        final Class<?> claz = Class.forName(DefaultBusExternalEvent.class.getName());
+        final ExtBusEvent obj = (ExtBusEvent) mapper.readValue(eventWithJsonMetadata, claz);
+        Assert.assertTrue(obj.getObjectType().equals(ObjectType.SERVICE_BROADCAST));
+        Assert.assertTrue(obj.getEventType().equals(ExtBusEventType.BROADCAST_SERVICE));
+        Assert.assertNotNull(obj.getMetaData());
+        Assert.assertTrue(obj.getMetaData().equals("{\"service\":\"nodes-service\",\"commandType\":\"START_PLUGIN\",\"eventJson\":\"{\\\"pluginName\\\":\\\"pluginName\\\",\\\"pluginVersion\\\":\\\"4.5.6\\\",\\\"properties\\\":[{\\\"key\\\":\\\"key\\\",\\\"value\\\":\\\"value\\\"}]}\"}"));
+    }
 }
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/BeatrixIntegrationModule.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/BeatrixIntegrationModule.java
index 1142ba6..db9efa1 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/BeatrixIntegrationModule.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/BeatrixIntegrationModule.java
@@ -46,6 +46,7 @@ import org.killbill.billing.util.config.PaymentConfig;
 import org.killbill.billing.util.email.EmailModule;
 import org.killbill.billing.util.email.templates.TemplateModule;
 import org.killbill.billing.util.glue.AuditModule;
+import org.killbill.billing.util.glue.BroadcastModule;
 import org.killbill.billing.util.glue.CacheModule;
 import org.killbill.billing.util.glue.CallContextModule;
 import org.killbill.billing.util.glue.CustomFieldModule;
@@ -53,6 +54,7 @@ import org.killbill.billing.util.glue.ExportModule;
 import org.killbill.billing.util.glue.GlobalLockerModule;
 import org.killbill.billing.util.glue.KillBillModule;
 import org.killbill.billing.util.glue.KillBillShiroModule;
+import org.killbill.billing.util.glue.NodesModule;
 import org.killbill.billing.util.glue.NonEntityDaoModule;
 import org.killbill.billing.util.glue.RecordIdModule;
 import org.killbill.billing.util.glue.SecurityModule;
@@ -97,6 +99,8 @@ public class BeatrixIntegrationModule extends KillBillModule {
         install(new RecordIdModule(configSource));
         install(new UsageModule(configSource));
         install(new SecurityModule(configSource));
+        install(new NodesModule(configSource));
+        install(new BroadcastModule(configSource));
         install(new KillBillShiroModuleOnlyIniRealm(configSource));
         install(new BeatrixModule(configSource));
 
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java
index 3a17e84..a52037e 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java
@@ -99,6 +99,7 @@ import org.killbill.billing.util.api.TagApiException;
 import org.killbill.billing.util.api.TagDefinitionApiException;
 import org.killbill.billing.util.api.TagUserApi;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
+import org.killbill.billing.util.nodes.KillbillNodesApi;
 import org.killbill.billing.util.tag.ControlTagType;
 import org.killbill.billing.util.tag.Tag;
 import org.killbill.bus.api.PersistentBus;
@@ -254,6 +255,9 @@ public class TestIntegrationBase extends BeatrixTestSuiteWithEmbeddedDB {
     @Inject
     protected TenantUserApi tenantUserApi;
 
+    @Inject
+    protected KillbillNodesApi nodesApi;
+
     protected void assertListenerStatus() {
         busHandler.assertListenerStatus();
     }
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
index 6bde96a..ad64469 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
@@ -33,6 +33,7 @@ import org.killbill.billing.catalog.api.ProductCategory;
 import org.killbill.billing.entitlement.api.DefaultEntitlement;
 import org.killbill.billing.notification.plugin.api.ExtBusEvent;
 import org.killbill.billing.overdue.api.OverdueConfig;
+import org.killbill.billing.platform.api.KillbillConfigSource;
 import org.killbill.billing.tenant.api.DefaultTenant;
 import org.killbill.billing.tenant.api.Tenant;
 import org.killbill.billing.tenant.api.TenantData;
@@ -40,9 +41,16 @@ import org.killbill.billing.tenant.api.TenantKV.TenantKey;
 import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.CallOrigin;
 import org.killbill.billing.util.callcontext.UserType;
+import org.killbill.billing.util.nodes.NodeCommand;
+import org.killbill.billing.util.nodes.NodeCommandMetadata;
+import org.killbill.billing.util.nodes.NodeCommandProperty;
+import org.killbill.billing.util.nodes.PluginNodeCommandMetadata;
+import org.killbill.billing.util.nodes.SystemNodeCommandType;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.eventbus.Subscribe;
 
 import static com.jayway.awaitility.Awaitility.await;
@@ -55,6 +63,15 @@ public class TestPublicBus extends TestIntegrationBase {
 
     private AtomicInteger externalBusCount;
 
+    @Override
+    protected KillbillConfigSource getConfigSource() {
+        ImmutableMap additionalProperties = new ImmutableMap.Builder()
+                .put("org.killbill.billing.util.broadcast.rate", "500ms")
+                .build();
+        return getConfigSource("/beatrix.properties", additionalProperties);
+    }
+
+
     public class PublicListener {
 
         @Subscribe
@@ -81,8 +98,6 @@ public class TestPublicBus extends TestIntegrationBase {
         } catch (final Exception ignored) {
         }
 
-        super.beforeMethod();
-
         log.debug("RESET TEST FRAMEWORK");
 
         overdueConfigCache.loadDefaultOverdueConfig((OverdueConfig) null);
@@ -161,4 +176,38 @@ public class TestPublicBus extends TestIntegrationBase {
         });
     }
 
+
+    @Test(groups = "{slow}")
+    public void testBroadcastEvent() throws Exception {
+
+
+        final NodeCommand nodeCommand = new NodeCommand() {
+            @Override
+            public boolean isSystemCommandType() {
+                return true;
+            }
+            @Override
+            public String getNodeCommandType() {
+                return SystemNodeCommandType.START_PLUGIN.name();
+            }
+            @Override
+            public NodeCommandMetadata getNodeCommandMetadata() {
+                return new PluginNodeCommandMetadata("pluginName", "4.5.6", ImmutableList.of(new NodeCommandProperty("key", "value")));
+            }
+        };
+
+        // Verify the internal bus first
+        busHandler.pushExpectedEvent(NextEvent.BROADCAST_SERVICE);
+        nodesApi.triggerNodeCommand(nodeCommand);
+        assertListenerStatus();
+
+        // Verify the public bus
+        await().atMost(10, SECONDS).until(new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                return externalBusCount.get() == 1;
+            }
+        });
+    }
+
 }
diff --git a/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastInternalEvent.java b/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastInternalEvent.java
new file mode 100644
index 0000000..c8afe52
--- /dev/null
+++ b/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastInternalEvent.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.broadcast;
+
+import org.killbill.billing.events.BroadcastInternalEvent;
+import org.killbill.billing.events.BusEventBase;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class DefaultBroadcastInternalEvent extends BusEventBase implements BroadcastInternalEvent {
+
+    private String serviceName;
+    private String type;
+    private String jsonEvent;
+
+    public DefaultBroadcastInternalEvent() {
+        super(null, 0L, null);
+    }
+
+    @JsonCreator
+    public DefaultBroadcastInternalEvent(@JsonProperty("serviceName") final String serviceName,
+                                         @JsonProperty("type") final String type,
+                                         @JsonProperty("jsonEvent") final String jsonEvent) {
+        super(null, 0L, null);
+        this.serviceName = serviceName;
+        this.type = type;
+        this.jsonEvent = jsonEvent;
+    }
+
+    @Override
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    @Override
+    public String getType() {
+        return type;
+    }
+
+    @Override
+    public String getJsonEvent() {
+        return jsonEvent;
+    }
+
+    @JsonIgnore
+    @Override
+    public BusInternalEventType getBusEventType() {
+        return BusInternalEventType.BROADCAST_SERVICE;
+    }
+}
diff --git a/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastService.java b/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastService.java
index 3609e6a..7ee1a99 100644
--- a/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastService.java
+++ b/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastService.java
@@ -17,23 +17,137 @@
 
 package org.killbill.billing.util.broadcast;
 
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import org.killbill.billing.events.BroadcastInternalEvent;
 import org.killbill.billing.platform.api.LifecycleHandlerType;
 import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
+import org.killbill.billing.util.broadcast.dao.BroadcastDao;
+import org.killbill.billing.util.broadcast.dao.BroadcastModelDao;
+import org.killbill.billing.util.config.BroadcastConfig;
+import org.killbill.bus.api.PersistentBus;
+import org.killbill.bus.api.PersistentBus.EventBusException;
+import org.killbill.commons.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.EventBus;
 
 public class DefaultBroadcastService implements BroadcastService {
 
+    private final static int TERMINATION_TIMEOUT_SEC = 5;
+
+    private static final Logger logger = LoggerFactory.getLogger(DefaultBroadcastService.class);
+
     public static final String BROADCAST_SERVICE_NAME = "broadcast-service";
 
+    private final BroadcastConfig broadcastConfig;
+    private final BroadcastDao broadcastDao;
+    private final PersistentBus eventBus;
+
+    private AtomicLong latestRecordIdProcessed;
+    private ScheduledExecutorService broadcastExecutor;
+    private volatile boolean isStopped;
+
+    @Inject
+    public DefaultBroadcastService(final BroadcastDao broadcastDao, final BroadcastConfig broadcastConfig, final PersistentBus eventBus) {
+        this.broadcastDao = broadcastDao;
+        this.broadcastConfig = broadcastConfig;
+        this.eventBus = eventBus;
+        this.isStopped = false;
+    }
+
     @Override
     public String getName() {
         return BROADCAST_SERVICE_NAME;
     }
 
+    @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+    public void initialize() {
+        final BroadcastModelDao entry = broadcastDao.getLatestEntry();
+        this.latestRecordIdProcessed = entry != null ? new AtomicLong(entry.getRecordId()) : new AtomicLong(0L);
+        this.broadcastExecutor = Executors.newSingleThreadScheduledExecutor("BroadcastExecutor");
+        this.isStopped = false;
+    }
+
     @LifecycleHandlerType(LifecycleHandlerType.LifecycleLevel.START_SERVICE)
     public void start() {
+        final TimeUnit pendingRateUnit = broadcastConfig.getBroadcastServiceRunningRate().getUnit();
+        final long pendingPeriod = broadcastConfig.getBroadcastServiceRunningRate().getPeriod();
+        broadcastExecutor.scheduleAtFixedRate(new BroadcastServiceRunnable(this, broadcastDao, eventBus), pendingPeriod, pendingPeriod, pendingRateUnit);
+
     }
 
     @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
     public void stop() {
+        if (isStopped) {
+            logger.warn("BroadcastExecutor is already in a stopped state");
+            return;
+        }
+        try {
+            broadcastExecutor.shutdown();
+            boolean success = broadcastExecutor.awaitTermination(TERMINATION_TIMEOUT_SEC, TimeUnit.SECONDS);
+            if (!success) {
+                logger.warn("BroadcastExecutor failed to complete termination within " + TERMINATION_TIMEOUT_SEC + "sec");
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.warn("BroadcastExecutor stop sequence got interrupted");
+        } finally {
+            isStopped = true;
+        }
+    }
+
+    public boolean isStopped() {
+        return isStopped;
+    }
+
+    public AtomicLong getLatestRecordIdProcessed() {
+        return latestRecordIdProcessed;
+    }
+
+    public void setLatestRecordIdProcessed(final Long latestRecordIdProcessed) {
+        this.latestRecordIdProcessed.set(latestRecordIdProcessed);
+    }
+
+    private static class BroadcastServiceRunnable implements Runnable {
+
+        private final DefaultBroadcastService parent;
+        private final BroadcastDao broadcastDao;
+        private final PersistentBus eventBus;
+
+        public BroadcastServiceRunnable(final DefaultBroadcastService defaultBroadcastService, final BroadcastDao broadcastDao, final PersistentBus eventBus) {
+            this.parent = defaultBroadcastService;
+            this.broadcastDao = broadcastDao;
+            this.eventBus = eventBus;
+        }
+
+        @Override
+        public void run() {
+            if (parent.isStopped) {
+                return;
+            }
+
+            final List<BroadcastModelDao> entries = broadcastDao.getLatestEntriesFrom(parent.getLatestRecordIdProcessed().get());
+            for (BroadcastModelDao cur : entries) {
+                if (parent.isStopped()) {
+                    return;
+                }
+
+                try {
+                    final BroadcastInternalEvent event = new DefaultBroadcastInternalEvent(cur.getServiceName(), cur.getType(), cur.getEvent());
+                    eventBus.post(event);
+                } catch (final EventBusException e) {
+                    logger.error("Failed to send event BroadcastInternalEvent: ", e);
+                } finally {
+                    parent.setLatestRecordIdProcessed(cur.getRecordId());
+                }
+            }
+        }
     }
 }
diff --git a/util/src/main/java/org/killbill/billing/util/config/BroadcastConfig.java b/util/src/main/java/org/killbill/billing/util/config/BroadcastConfig.java
new file mode 100644
index 0000000..8125941
--- /dev/null
+++ b/util/src/main/java/org/killbill/billing/util/config/BroadcastConfig.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.config;
+
+import org.skife.config.Config;
+import org.skife.config.Default;
+import org.skife.config.Description;
+import org.skife.config.TimeSpan;
+
+public interface BroadcastConfig extends KillbillConfig {
+
+    @Config("org.killbill.billing.util.broadcast.rate")
+    @Default("5s")
+    @Description("Rate at which broadcast service task is scheduled")
+    public TimeSpan getBroadcastServiceRunningRate();
+
+}
diff --git a/util/src/main/java/org/killbill/billing/util/glue/BroadcastModule.java b/util/src/main/java/org/killbill/billing/util/glue/BroadcastModule.java
index d409051..72fb5be 100644
--- a/util/src/main/java/org/killbill/billing/util/glue/BroadcastModule.java
+++ b/util/src/main/java/org/killbill/billing/util/glue/BroadcastModule.java
@@ -19,9 +19,13 @@ package org.killbill.billing.util.glue;
 
 import org.killbill.billing.broadcast.BroadcastApi;
 import org.killbill.billing.platform.api.KillbillConfigSource;
+import org.killbill.billing.util.broadcast.BroadcastService;
 import org.killbill.billing.util.broadcast.DefaultBroadcastApi;
+import org.killbill.billing.util.broadcast.DefaultBroadcastService;
 import org.killbill.billing.util.broadcast.dao.BroadcastDao;
 import org.killbill.billing.util.broadcast.dao.DefaultBroadcastDao;
+import org.killbill.billing.util.config.BroadcastConfig;
+import org.skife.config.ConfigurationObjectFactory;
 
 public class BroadcastModule extends KillBillModule {
 
@@ -34,12 +38,16 @@ public class BroadcastModule extends KillBillModule {
     }
 
     protected void installUserApi() {
+
+        bind(BroadcastService.class).to(DefaultBroadcastService.class).asEagerSingleton();
         bind(BroadcastApi.class).to(DefaultBroadcastApi.class).asEagerSingleton();
     }
 
-
     @Override
     protected void configure() {
+        final ConfigurationObjectFactory factory = new ConfigurationObjectFactory(skifeConfigSource);
+        final BroadcastConfig broadcastConfig = factory.build(BroadcastConfig.class);
+        bind(BroadcastConfig.class).toInstance(broadcastConfig);
         installDaos();
         installUserApi();
     }
diff --git a/util/src/test/java/org/killbill/billing/api/TestApiListener.java b/util/src/test/java/org/killbill/billing/api/TestApiListener.java
index 0f92505..a5815c4 100644
--- a/util/src/test/java/org/killbill/billing/api/TestApiListener.java
+++ b/util/src/test/java/org/killbill/billing/api/TestApiListener.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import javax.inject.Inject;
 
 import org.joda.time.DateTime;
+import org.killbill.billing.events.BroadcastInternalEvent;
 import org.killbill.billing.events.InvoiceNotificationInternalEvent;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
@@ -92,6 +93,7 @@ public class TestApiListener {
     public enum NextEvent {
         MIGRATE_ENTITLEMENT,
         MIGRATE_BILLING,
+        BROADCAST_SERVICE,
         CREATE,
         TRANSFER,
         RE_CREATE,
@@ -114,6 +116,15 @@ public class TestApiListener {
         CUSTOM_FIELD,
     }
 
+
+    @Subscribe
+    public void handleBroadcastEvents(final BroadcastInternalEvent event) {
+        log.info(String.format("Got BroadcastInternalEvent event %s", event.toString()));
+        assertEqualsNicely(NextEvent.BROADCAST_SERVICE);
+        notifyIfStackEmpty();
+    }
+
+
     @Subscribe
     public void handleRepairSubscriptionEvents(final RepairSubscriptionInternalEvent event) {
         log.info(String.format("Got RepairSubscriptionEvent event %s", event.toString()));
diff --git a/util/src/test/java/org/killbill/billing/util/broadcast/TestBroadcastService.java b/util/src/test/java/org/killbill/billing/util/broadcast/TestBroadcastService.java
new file mode 100644
index 0000000..a52cd80
--- /dev/null
+++ b/util/src/test/java/org/killbill/billing/util/broadcast/TestBroadcastService.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.broadcast;
+
+import org.killbill.billing.api.TestApiListener.NextEvent;
+import org.killbill.billing.platform.api.KillbillConfigSource;
+import org.killbill.billing.util.UtilTestSuiteWithEmbeddedDB;
+import org.killbill.billing.util.broadcast.dao.BroadcastModelDao;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+
+public class TestBroadcastService extends UtilTestSuiteWithEmbeddedDB {
+
+    @Inject
+    private BroadcastService broadcastService;
+
+    @Override
+    protected KillbillConfigSource getConfigSource() {
+            return getConfigSource(null,
+                               ImmutableMap.<String, String>of("org.killbill.billing.util.broadcast.rate", "500ms"));
+    }
+
+    @BeforeMethod(groups = "slow")
+    public void beforeMethod() throws Exception {
+        super.beforeMethod();
+        ((DefaultBroadcastService) broadcastService).initialize();
+        ((DefaultBroadcastService) broadcastService).start();
+    }
+
+    @AfterMethod(groups = "slow")
+    public void afterMethod() throws Exception {
+        ((DefaultBroadcastService) broadcastService).stop();
+        super.afterMethod();
+    }
+
+    @Test(groups = "slow")
+    public void testBasic() {
+        final String eventJson = "\"{\"pluginName\":\"foo\",\"pluginVersion\":\"1.2.3\",\"properties\":[{\"key\":\"something\",\"value\":\"nothing\"}]}\"";
+
+        eventsListener.pushExpectedEvent(NextEvent.BROADCAST_SERVICE);
+        broadcastDao.create(new BroadcastModelDao("svc", "type", eventJson, clock.getUTCNow(), "tester"));
+        assertListenerStatus();
+    }
+
+}
diff --git a/util/src/test/java/org/killbill/billing/util/broadcast/TestDefaultBroadcastInternalEvent.java b/util/src/test/java/org/killbill/billing/util/broadcast/TestDefaultBroadcastInternalEvent.java
new file mode 100644
index 0000000..28c8a5b
--- /dev/null
+++ b/util/src/test/java/org/killbill/billing/util/broadcast/TestDefaultBroadcastInternalEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.broadcast;
+
+import org.killbill.billing.events.BroadcastInternalEvent;
+import org.killbill.billing.util.UtilTestSuiteNoDB;
+import org.killbill.billing.util.jackson.ObjectMapper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+
+public class TestDefaultBroadcastInternalEvent extends UtilTestSuiteNoDB {
+
+
+    @Test(groups = "fast")
+    public void testBasic() throws Exception {
+        final ObjectMapper objectMapper = new ObjectMapper();
+        objectMapper.registerModule(new JodaModule());
+        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+
+        final String eventJson = "\"{\"pluginName\":\"foo\",\"pluginVersion\":\"1.2.3\",\"properties\":[{\"key\":\"something\",\"value\":\"nothing\"}]}\"";
+
+        final BroadcastInternalEvent broadcastEvent = new DefaultBroadcastInternalEvent("service", "PLUGIN_INSTALL", eventJson);
+
+        final String broadcastEventStr = objectMapper.writeValueAsString(broadcastEvent);
+
+        final BroadcastInternalEvent res = objectMapper.readValue(broadcastEventStr, DefaultBroadcastInternalEvent.class);
+
+        Assert.assertEquals(res.getServiceName(), "service");
+        Assert.assertEquals(res.getType(), "PLUGIN_INSTALL");
+        Assert.assertEquals(res.getJsonEvent(), eventJson);
+    }
+}