killbill-aplcache
Changes
beatrix/src/test/java/org/killbill/billing/beatrix/integration/BeatrixIntegrationModule.java 4(+4 -0)
Details
diff --git a/api/src/main/java/org/killbill/billing/events/BroadcastInternalEvent.java b/api/src/main/java/org/killbill/billing/events/BroadcastInternalEvent.java
new file mode 100644
index 0000000..4fdb90e
--- /dev/null
+++ b/api/src/main/java/org/killbill/billing/events/BroadcastInternalEvent.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.events;
+
+public interface BroadcastInternalEvent extends BusInternalEvent {
+ public String getServiceName();
+ public String getType();
+ public String getJsonEvent();
+}
diff --git a/api/src/main/java/org/killbill/billing/events/BusInternalEvent.java b/api/src/main/java/org/killbill/billing/events/BusInternalEvent.java
index b756b7a..7802660 100644
--- a/api/src/main/java/org/killbill/billing/events/BusInternalEvent.java
+++ b/api/src/main/java/org/killbill/billing/events/BusInternalEvent.java
@@ -25,6 +25,7 @@ public interface BusInternalEvent extends BusEvent {
ACCOUNT_CHANGE,
ACCOUNT_CREATE,
BLOCKING_STATE,
+ BROADCAST_SERVICE,
BUNDLE_REPAIR,
CONTROL_TAGDEFINITION_CREATION,
CONTROL_TAGDEFINITION_DELETION,
diff --git a/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/BeatrixListener.java b/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/BeatrixListener.java
index 33d9c68..16b5b66 100644
--- a/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/BeatrixListener.java
+++ b/beatrix/src/main/java/org/killbill/billing/beatrix/extbus/BeatrixListener.java
@@ -31,6 +31,7 @@ import org.killbill.billing.entitlement.api.BlockingStateType;
import org.killbill.billing.events.AccountChangeInternalEvent;
import org.killbill.billing.events.AccountCreationInternalEvent;
import org.killbill.billing.events.BlockingTransitionInternalEvent;
+import org.killbill.billing.events.BroadcastInternalEvent;
import org.killbill.billing.events.BusInternalEvent;
import org.killbill.billing.events.BusInternalEvent.BusInternalEventType;
import org.killbill.billing.events.ControlTagCreationInternalEvent;
@@ -51,6 +52,7 @@ import org.killbill.billing.events.TenantConfigDeletionInternalEvent;
import org.killbill.billing.events.UserTagCreationInternalEvent;
import org.killbill.billing.events.UserTagDeletionInternalEvent;
import org.killbill.billing.lifecycle.glue.BusModule;
+import org.killbill.billing.notification.plugin.api.BroadcastMetadata;
import org.killbill.billing.notification.plugin.api.ExtBusEventType;
import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
import org.killbill.billing.util.callcontext.CallOrigin;
@@ -63,6 +65,7 @@ import org.killbill.bus.api.PersistentBus.EventBusException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.joda.JodaModule;
@@ -99,10 +102,12 @@ public class BeatrixListener {
}
} catch (final EventBusException e) {
log.warn("Failed to dispatch external bus events", e);
+ } catch (JsonProcessingException e) {
+ log.warn("Failed to dispatch external bus events", e);
}
}
- private BusEvent computeExtBusEventEntryFromBusInternalEvent(final BusInternalEvent event, final InternalCallContext context) {
+ private BusEvent computeExtBusEventEntryFromBusInternalEvent(final BusInternalEvent event, final InternalCallContext context) throws JsonProcessingException {
ObjectType objectType = null;
UUID objectId = null;
ExtBusEventType eventBusType = null;
@@ -278,6 +283,15 @@ public class BeatrixListener {
metaData = realTenantConfigEventDel.getKey();
break;
+ case BROADCAST_SERVICE:
+ final BroadcastInternalEvent realBroadcastEvent = (BroadcastInternalEvent) event;
+ objectType = ObjectType.SERVICE_BROADCAST;
+ objectId = null;
+ eventBusType = ExtBusEventType.BROADCAST_SERVICE;
+ final BroadcastMetadata metaDataObj = new BroadcastMetadata(realBroadcastEvent.getServiceName(), realBroadcastEvent.getType(), realBroadcastEvent.getJsonEvent());
+ metaData = objectMapper.writeValueAsString(metaDataObj);
+ break;
+
default:
}
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/extbus/TestEventJson.java b/beatrix/src/test/java/org/killbill/billing/beatrix/extbus/TestEventJson.java
index ac5be0f..8238496 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/extbus/TestEventJson.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/extbus/TestEventJson.java
@@ -60,4 +60,16 @@ public class TestEventJson extends BeatrixTestSuite {
Assert.assertTrue(obj.getObjectId().equals(UUID.fromString("273ff2ed-5442-4d10-971f-3cc2414fe33b")));
Assert.assertNull(obj.getMetaData());
}
+
+
+ @Test(groups = "fast")
+ public void testBusExternalEventWithJsonMetadata() throws Exception {
+ final String eventWithJsonMetadata = "{\"objectId\":null,\"objectType\":\"SERVICE_BROADCAST\",\"eventType\":\"BROADCAST_SERVICE\",\"accountId\":null,\"tenantId\":null,\"metaData\":\"{\\\"service\\\":\\\"nodes-service\\\",\\\"commandType\\\":\\\"START_PLUGIN\\\",\\\"eventJson\\\":\\\"{\\\\\\\"pluginName\\\\\\\":\\\\\\\"pluginName\\\\\\\",\\\\\\\"pluginVersion\\\\\\\":\\\\\\\"4.5.6\\\\\\\",\\\\\\\"properties\\\\\\\":[{\\\\\\\"key\\\\\\\":\\\\\\\"key\\\\\\\",\\\\\\\"value\\\\\\\":\\\\\\\"value\\\\\\\"}]}\\\"}\"}";
+ final Class<?> claz = Class.forName(DefaultBusExternalEvent.class.getName());
+ final ExtBusEvent obj = (ExtBusEvent) mapper.readValue(eventWithJsonMetadata, claz);
+ Assert.assertTrue(obj.getObjectType().equals(ObjectType.SERVICE_BROADCAST));
+ Assert.assertTrue(obj.getEventType().equals(ExtBusEventType.BROADCAST_SERVICE));
+ Assert.assertNotNull(obj.getMetaData());
+ Assert.assertTrue(obj.getMetaData().equals("{\"service\":\"nodes-service\",\"commandType\":\"START_PLUGIN\",\"eventJson\":\"{\\\"pluginName\\\":\\\"pluginName\\\",\\\"pluginVersion\\\":\\\"4.5.6\\\",\\\"properties\\\":[{\\\"key\\\":\\\"key\\\",\\\"value\\\":\\\"value\\\"}]}\"}"));
+ }
}
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/BeatrixIntegrationModule.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/BeatrixIntegrationModule.java
index 1142ba6..db9efa1 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/BeatrixIntegrationModule.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/BeatrixIntegrationModule.java
@@ -46,6 +46,7 @@ import org.killbill.billing.util.config.PaymentConfig;
import org.killbill.billing.util.email.EmailModule;
import org.killbill.billing.util.email.templates.TemplateModule;
import org.killbill.billing.util.glue.AuditModule;
+import org.killbill.billing.util.glue.BroadcastModule;
import org.killbill.billing.util.glue.CacheModule;
import org.killbill.billing.util.glue.CallContextModule;
import org.killbill.billing.util.glue.CustomFieldModule;
@@ -53,6 +54,7 @@ import org.killbill.billing.util.glue.ExportModule;
import org.killbill.billing.util.glue.GlobalLockerModule;
import org.killbill.billing.util.glue.KillBillModule;
import org.killbill.billing.util.glue.KillBillShiroModule;
+import org.killbill.billing.util.glue.NodesModule;
import org.killbill.billing.util.glue.NonEntityDaoModule;
import org.killbill.billing.util.glue.RecordIdModule;
import org.killbill.billing.util.glue.SecurityModule;
@@ -97,6 +99,8 @@ public class BeatrixIntegrationModule extends KillBillModule {
install(new RecordIdModule(configSource));
install(new UsageModule(configSource));
install(new SecurityModule(configSource));
+ install(new NodesModule(configSource));
+ install(new BroadcastModule(configSource));
install(new KillBillShiroModuleOnlyIniRealm(configSource));
install(new BeatrixModule(configSource));
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java
index 3a17e84..a52037e 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java
@@ -99,6 +99,7 @@ import org.killbill.billing.util.api.TagApiException;
import org.killbill.billing.util.api.TagDefinitionApiException;
import org.killbill.billing.util.api.TagUserApi;
import org.killbill.billing.util.cache.CacheControllerDispatcher;
+import org.killbill.billing.util.nodes.KillbillNodesApi;
import org.killbill.billing.util.tag.ControlTagType;
import org.killbill.billing.util.tag.Tag;
import org.killbill.bus.api.PersistentBus;
@@ -254,6 +255,9 @@ public class TestIntegrationBase extends BeatrixTestSuiteWithEmbeddedDB {
@Inject
protected TenantUserApi tenantUserApi;
+ @Inject
+ protected KillbillNodesApi nodesApi;
+
protected void assertListenerStatus() {
busHandler.assertListenerStatus();
}
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
index 6bde96a..ad64469 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
@@ -33,6 +33,7 @@ import org.killbill.billing.catalog.api.ProductCategory;
import org.killbill.billing.entitlement.api.DefaultEntitlement;
import org.killbill.billing.notification.plugin.api.ExtBusEvent;
import org.killbill.billing.overdue.api.OverdueConfig;
+import org.killbill.billing.platform.api.KillbillConfigSource;
import org.killbill.billing.tenant.api.DefaultTenant;
import org.killbill.billing.tenant.api.Tenant;
import org.killbill.billing.tenant.api.TenantData;
@@ -40,9 +41,16 @@ import org.killbill.billing.tenant.api.TenantKV.TenantKey;
import org.killbill.billing.util.callcontext.CallContext;
import org.killbill.billing.util.callcontext.CallOrigin;
import org.killbill.billing.util.callcontext.UserType;
+import org.killbill.billing.util.nodes.NodeCommand;
+import org.killbill.billing.util.nodes.NodeCommandMetadata;
+import org.killbill.billing.util.nodes.NodeCommandProperty;
+import org.killbill.billing.util.nodes.PluginNodeCommandMetadata;
+import org.killbill.billing.util.nodes.SystemNodeCommandType;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.Subscribe;
import static com.jayway.awaitility.Awaitility.await;
@@ -55,6 +63,15 @@ public class TestPublicBus extends TestIntegrationBase {
private AtomicInteger externalBusCount;
+ @Override
+ protected KillbillConfigSource getConfigSource() {
+ ImmutableMap additionalProperties = new ImmutableMap.Builder()
+ .put("org.killbill.billing.util.broadcast.rate", "500ms")
+ .build();
+ return getConfigSource("/beatrix.properties", additionalProperties);
+ }
+
+
public class PublicListener {
@Subscribe
@@ -81,8 +98,6 @@ public class TestPublicBus extends TestIntegrationBase {
} catch (final Exception ignored) {
}
- super.beforeMethod();
-
log.debug("RESET TEST FRAMEWORK");
overdueConfigCache.loadDefaultOverdueConfig((OverdueConfig) null);
@@ -161,4 +176,38 @@ public class TestPublicBus extends TestIntegrationBase {
});
}
+
+ @Test(groups = "{slow}")
+ public void testBroadcastEvent() throws Exception {
+
+
+ final NodeCommand nodeCommand = new NodeCommand() {
+ @Override
+ public boolean isSystemCommandType() {
+ return true;
+ }
+ @Override
+ public String getNodeCommandType() {
+ return SystemNodeCommandType.START_PLUGIN.name();
+ }
+ @Override
+ public NodeCommandMetadata getNodeCommandMetadata() {
+ return new PluginNodeCommandMetadata("pluginName", "4.5.6", ImmutableList.of(new NodeCommandProperty("key", "value")));
+ }
+ };
+
+ // Verify the internal bus first
+ busHandler.pushExpectedEvent(NextEvent.BROADCAST_SERVICE);
+ nodesApi.triggerNodeCommand(nodeCommand);
+ assertListenerStatus();
+
+ // Verify the public bus
+ await().atMost(10, SECONDS).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return externalBusCount.get() == 1;
+ }
+ });
+ }
+
}
diff --git a/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastInternalEvent.java b/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastInternalEvent.java
new file mode 100644
index 0000000..c8afe52
--- /dev/null
+++ b/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastInternalEvent.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.broadcast;
+
+import org.killbill.billing.events.BroadcastInternalEvent;
+import org.killbill.billing.events.BusEventBase;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class DefaultBroadcastInternalEvent extends BusEventBase implements BroadcastInternalEvent {
+
+ private String serviceName;
+ private String type;
+ private String jsonEvent;
+
+ public DefaultBroadcastInternalEvent() {
+ super(null, 0L, null);
+ }
+
+ @JsonCreator
+ public DefaultBroadcastInternalEvent(@JsonProperty("serviceName") final String serviceName,
+ @JsonProperty("type") final String type,
+ @JsonProperty("jsonEvent") final String jsonEvent) {
+ super(null, 0L, null);
+ this.serviceName = serviceName;
+ this.type = type;
+ this.jsonEvent = jsonEvent;
+ }
+
+ @Override
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public String getJsonEvent() {
+ return jsonEvent;
+ }
+
+ @JsonIgnore
+ @Override
+ public BusInternalEventType getBusEventType() {
+ return BusInternalEventType.BROADCAST_SERVICE;
+ }
+}
diff --git a/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastService.java b/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastService.java
index 3609e6a..7ee1a99 100644
--- a/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastService.java
+++ b/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastService.java
@@ -17,23 +17,137 @@
package org.killbill.billing.util.broadcast;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import org.killbill.billing.events.BroadcastInternalEvent;
import org.killbill.billing.platform.api.LifecycleHandlerType;
import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
+import org.killbill.billing.util.broadcast.dao.BroadcastDao;
+import org.killbill.billing.util.broadcast.dao.BroadcastModelDao;
+import org.killbill.billing.util.config.BroadcastConfig;
+import org.killbill.bus.api.PersistentBus;
+import org.killbill.bus.api.PersistentBus.EventBusException;
+import org.killbill.commons.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.EventBus;
public class DefaultBroadcastService implements BroadcastService {
+ private final static int TERMINATION_TIMEOUT_SEC = 5;
+
+ private static final Logger logger = LoggerFactory.getLogger(DefaultBroadcastService.class);
+
public static final String BROADCAST_SERVICE_NAME = "broadcast-service";
+ private final BroadcastConfig broadcastConfig;
+ private final BroadcastDao broadcastDao;
+ private final PersistentBus eventBus;
+
+ private AtomicLong latestRecordIdProcessed;
+ private ScheduledExecutorService broadcastExecutor;
+ private volatile boolean isStopped;
+
+ @Inject
+ public DefaultBroadcastService(final BroadcastDao broadcastDao, final BroadcastConfig broadcastConfig, final PersistentBus eventBus) {
+ this.broadcastDao = broadcastDao;
+ this.broadcastConfig = broadcastConfig;
+ this.eventBus = eventBus;
+ this.isStopped = false;
+ }
+
@Override
public String getName() {
return BROADCAST_SERVICE_NAME;
}
+ @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+ public void initialize() {
+ final BroadcastModelDao entry = broadcastDao.getLatestEntry();
+ this.latestRecordIdProcessed = entry != null ? new AtomicLong(entry.getRecordId()) : new AtomicLong(0L);
+ this.broadcastExecutor = Executors.newSingleThreadScheduledExecutor("BroadcastExecutor");
+ this.isStopped = false;
+ }
+
@LifecycleHandlerType(LifecycleHandlerType.LifecycleLevel.START_SERVICE)
public void start() {
+ final TimeUnit pendingRateUnit = broadcastConfig.getBroadcastServiceRunningRate().getUnit();
+ final long pendingPeriod = broadcastConfig.getBroadcastServiceRunningRate().getPeriod();
+ broadcastExecutor.scheduleAtFixedRate(new BroadcastServiceRunnable(this, broadcastDao, eventBus), pendingPeriod, pendingPeriod, pendingRateUnit);
+
}
@LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
public void stop() {
+ if (isStopped) {
+ logger.warn("BroadcastExecutor is already in a stopped state");
+ return;
+ }
+ try {
+ broadcastExecutor.shutdown();
+ boolean success = broadcastExecutor.awaitTermination(TERMINATION_TIMEOUT_SEC, TimeUnit.SECONDS);
+ if (!success) {
+ logger.warn("BroadcastExecutor failed to complete termination within " + TERMINATION_TIMEOUT_SEC + "sec");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("BroadcastExecutor stop sequence got interrupted");
+ } finally {
+ isStopped = true;
+ }
+ }
+
+ public boolean isStopped() {
+ return isStopped;
+ }
+
+ public AtomicLong getLatestRecordIdProcessed() {
+ return latestRecordIdProcessed;
+ }
+
+ public void setLatestRecordIdProcessed(final Long latestRecordIdProcessed) {
+ this.latestRecordIdProcessed.set(latestRecordIdProcessed);
+ }
+
+ private static class BroadcastServiceRunnable implements Runnable {
+
+ private final DefaultBroadcastService parent;
+ private final BroadcastDao broadcastDao;
+ private final PersistentBus eventBus;
+
+ public BroadcastServiceRunnable(final DefaultBroadcastService defaultBroadcastService, final BroadcastDao broadcastDao, final PersistentBus eventBus) {
+ this.parent = defaultBroadcastService;
+ this.broadcastDao = broadcastDao;
+ this.eventBus = eventBus;
+ }
+
+ @Override
+ public void run() {
+ if (parent.isStopped) {
+ return;
+ }
+
+ final List<BroadcastModelDao> entries = broadcastDao.getLatestEntriesFrom(parent.getLatestRecordIdProcessed().get());
+ for (BroadcastModelDao cur : entries) {
+ if (parent.isStopped()) {
+ return;
+ }
+
+ try {
+ final BroadcastInternalEvent event = new DefaultBroadcastInternalEvent(cur.getServiceName(), cur.getType(), cur.getEvent());
+ eventBus.post(event);
+ } catch (final EventBusException e) {
+ logger.error("Failed to send event BroadcastInternalEvent: ", e);
+ } finally {
+ parent.setLatestRecordIdProcessed(cur.getRecordId());
+ }
+ }
+ }
}
}
diff --git a/util/src/main/java/org/killbill/billing/util/config/BroadcastConfig.java b/util/src/main/java/org/killbill/billing/util/config/BroadcastConfig.java
new file mode 100644
index 0000000..8125941
--- /dev/null
+++ b/util/src/main/java/org/killbill/billing/util/config/BroadcastConfig.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.config;
+
+import org.skife.config.Config;
+import org.skife.config.Default;
+import org.skife.config.Description;
+import org.skife.config.TimeSpan;
+
+public interface BroadcastConfig extends KillbillConfig {
+
+ @Config("org.killbill.billing.util.broadcast.rate")
+ @Default("5s")
+ @Description("Rate at which broadcast service task is scheduled")
+ public TimeSpan getBroadcastServiceRunningRate();
+
+}
diff --git a/util/src/main/java/org/killbill/billing/util/glue/BroadcastModule.java b/util/src/main/java/org/killbill/billing/util/glue/BroadcastModule.java
index d409051..72fb5be 100644
--- a/util/src/main/java/org/killbill/billing/util/glue/BroadcastModule.java
+++ b/util/src/main/java/org/killbill/billing/util/glue/BroadcastModule.java
@@ -19,9 +19,13 @@ package org.killbill.billing.util.glue;
import org.killbill.billing.broadcast.BroadcastApi;
import org.killbill.billing.platform.api.KillbillConfigSource;
+import org.killbill.billing.util.broadcast.BroadcastService;
import org.killbill.billing.util.broadcast.DefaultBroadcastApi;
+import org.killbill.billing.util.broadcast.DefaultBroadcastService;
import org.killbill.billing.util.broadcast.dao.BroadcastDao;
import org.killbill.billing.util.broadcast.dao.DefaultBroadcastDao;
+import org.killbill.billing.util.config.BroadcastConfig;
+import org.skife.config.ConfigurationObjectFactory;
public class BroadcastModule extends KillBillModule {
@@ -34,12 +38,16 @@ public class BroadcastModule extends KillBillModule {
}
protected void installUserApi() {
+
+ bind(BroadcastService.class).to(DefaultBroadcastService.class).asEagerSingleton();
bind(BroadcastApi.class).to(DefaultBroadcastApi.class).asEagerSingleton();
}
-
@Override
protected void configure() {
+ final ConfigurationObjectFactory factory = new ConfigurationObjectFactory(skifeConfigSource);
+ final BroadcastConfig broadcastConfig = factory.build(BroadcastConfig.class);
+ bind(BroadcastConfig.class).toInstance(broadcastConfig);
installDaos();
installUserApi();
}
diff --git a/util/src/test/java/org/killbill/billing/api/TestApiListener.java b/util/src/test/java/org/killbill/billing/api/TestApiListener.java
index 0f92505..a5815c4 100644
--- a/util/src/test/java/org/killbill/billing/api/TestApiListener.java
+++ b/util/src/test/java/org/killbill/billing/api/TestApiListener.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.joda.time.DateTime;
+import org.killbill.billing.events.BroadcastInternalEvent;
import org.killbill.billing.events.InvoiceNotificationInternalEvent;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
@@ -92,6 +93,7 @@ public class TestApiListener {
public enum NextEvent {
MIGRATE_ENTITLEMENT,
MIGRATE_BILLING,
+ BROADCAST_SERVICE,
CREATE,
TRANSFER,
RE_CREATE,
@@ -114,6 +116,15 @@ public class TestApiListener {
CUSTOM_FIELD,
}
+
+ @Subscribe
+ public void handleBroadcastEvents(final BroadcastInternalEvent event) {
+ log.info(String.format("Got BroadcastInternalEvent event %s", event.toString()));
+ assertEqualsNicely(NextEvent.BROADCAST_SERVICE);
+ notifyIfStackEmpty();
+ }
+
+
@Subscribe
public void handleRepairSubscriptionEvents(final RepairSubscriptionInternalEvent event) {
log.info(String.format("Got RepairSubscriptionEvent event %s", event.toString()));
diff --git a/util/src/test/java/org/killbill/billing/util/broadcast/TestBroadcastService.java b/util/src/test/java/org/killbill/billing/util/broadcast/TestBroadcastService.java
new file mode 100644
index 0000000..a52cd80
--- /dev/null
+++ b/util/src/test/java/org/killbill/billing/util/broadcast/TestBroadcastService.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.broadcast;
+
+import org.killbill.billing.api.TestApiListener.NextEvent;
+import org.killbill.billing.platform.api.KillbillConfigSource;
+import org.killbill.billing.util.UtilTestSuiteWithEmbeddedDB;
+import org.killbill.billing.util.broadcast.dao.BroadcastModelDao;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+
+public class TestBroadcastService extends UtilTestSuiteWithEmbeddedDB {
+
+ @Inject
+ private BroadcastService broadcastService;
+
+ @Override
+ protected KillbillConfigSource getConfigSource() {
+ return getConfigSource(null,
+ ImmutableMap.<String, String>of("org.killbill.billing.util.broadcast.rate", "500ms"));
+ }
+
+ @BeforeMethod(groups = "slow")
+ public void beforeMethod() throws Exception {
+ super.beforeMethod();
+ ((DefaultBroadcastService) broadcastService).initialize();
+ ((DefaultBroadcastService) broadcastService).start();
+ }
+
+ @AfterMethod(groups = "slow")
+ public void afterMethod() throws Exception {
+ ((DefaultBroadcastService) broadcastService).stop();
+ super.afterMethod();
+ }
+
+ @Test(groups = "slow")
+ public void testBasic() {
+ final String eventJson = "\"{\"pluginName\":\"foo\",\"pluginVersion\":\"1.2.3\",\"properties\":[{\"key\":\"something\",\"value\":\"nothing\"}]}\"";
+
+ eventsListener.pushExpectedEvent(NextEvent.BROADCAST_SERVICE);
+ broadcastDao.create(new BroadcastModelDao("svc", "type", eventJson, clock.getUTCNow(), "tester"));
+ assertListenerStatus();
+ }
+
+}
diff --git a/util/src/test/java/org/killbill/billing/util/broadcast/TestDefaultBroadcastInternalEvent.java b/util/src/test/java/org/killbill/billing/util/broadcast/TestDefaultBroadcastInternalEvent.java
new file mode 100644
index 0000000..28c8a5b
--- /dev/null
+++ b/util/src/test/java/org/killbill/billing/util/broadcast/TestDefaultBroadcastInternalEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.broadcast;
+
+import org.killbill.billing.events.BroadcastInternalEvent;
+import org.killbill.billing.util.UtilTestSuiteNoDB;
+import org.killbill.billing.util.jackson.ObjectMapper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+
+public class TestDefaultBroadcastInternalEvent extends UtilTestSuiteNoDB {
+
+
+ @Test(groups = "fast")
+ public void testBasic() throws Exception {
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(new JodaModule());
+ objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+
+ final String eventJson = "\"{\"pluginName\":\"foo\",\"pluginVersion\":\"1.2.3\",\"properties\":[{\"key\":\"something\",\"value\":\"nothing\"}]}\"";
+
+ final BroadcastInternalEvent broadcastEvent = new DefaultBroadcastInternalEvent("service", "PLUGIN_INSTALL", eventJson);
+
+ final String broadcastEventStr = objectMapper.writeValueAsString(broadcastEvent);
+
+ final BroadcastInternalEvent res = objectMapper.readValue(broadcastEventStr, DefaultBroadcastInternalEvent.class);
+
+ Assert.assertEquals(res.getServiceName(), "service");
+ Assert.assertEquals(res.getType(), "PLUGIN_INSTALL");
+ Assert.assertEquals(res.getJsonEvent(), eventJson);
+ }
+}