Details
diff --git a/api/src/main/java/org/killbill/billing/broadcast/BroadcastApi.java b/api/src/main/java/org/killbill/billing/broadcast/BroadcastApi.java
index 6fcab7b..e8aec63 100644
--- a/api/src/main/java/org/killbill/billing/broadcast/BroadcastApi.java
+++ b/api/src/main/java/org/killbill/billing/broadcast/BroadcastApi.java
@@ -20,5 +20,5 @@ package org.killbill.billing.broadcast;
import org.joda.time.DateTime;
public interface BroadcastApi {
- public void broadcast(String serviceName, String type, String event, DateTime createdDate, String createdBy);
+ public void broadcast(String serviceName, String type, String event, DateTime createdDate, String createdBy, boolean localNodeOnly);
}
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
index 6443bda..ba6bad6 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
@@ -197,7 +197,7 @@ public class TestPublicBus extends TestIntegrationBase {
// Verify the internal bus first
busHandler.pushExpectedEvent(NextEvent.BROADCAST_SERVICE);
- nodesApi.triggerNodeCommand(nodeCommand);
+ nodesApi.triggerNodeCommand(nodeCommand, false);
assertListenerStatus();
// Verify the public bus
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithFakeKPMPlugin.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithFakeKPMPlugin.java
index a822b7d..4d9c7d8 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithFakeKPMPlugin.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithFakeKPMPlugin.java
@@ -348,7 +348,7 @@ public class TestWithFakeKPMPlugin extends TestIntegrationBase {
}
};
busHandler.pushExpectedEvent(NextEvent.BROADCAST_SERVICE);
- nodesApi.triggerNodeCommand(nodeCommand);
+ nodesApi.triggerNodeCommand(nodeCommand, false);
assertListenerStatus();
// Exit condition is based on the new config being updated on disk
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxrsResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxrsResource.java
index 85234aa..2fdd31e 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxrsResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxrsResource.java
@@ -61,6 +61,9 @@ public interface JaxrsResource {
/*
* Query parameters
*/
+
+
+ public static final String QUERY_LOCAL_NODE_ONLY = "localNodeOnly";
public static final String QUERY_EXTERNAL_KEY = "externalKey";
public static final String QUERY_API_KEY = "apiKey";
public static final String QUERY_REQUESTED_DT = "requestedDate";
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/NodesInfoResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/NodesInfoResource.java
index b6fa653..4701262 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/NodesInfoResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/NodesInfoResource.java
@@ -24,21 +24,20 @@ import java.util.Set;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;
-import org.killbill.billing.account.api.Account;
import org.killbill.billing.account.api.AccountApiException;
-import org.killbill.billing.account.api.AccountData;
import org.killbill.billing.account.api.AccountUserApi;
import org.killbill.billing.entitlement.api.SubscriptionApiException;
-import org.killbill.billing.jaxrs.json.AccountJson;
import org.killbill.billing.jaxrs.json.NodeCommandJson;
import org.killbill.billing.jaxrs.json.NodeInfoJson;
import org.killbill.billing.jaxrs.json.PluginInfoJson;
@@ -136,8 +135,6 @@ public class NodesInfoResource extends JaxRsResourceBase {
return Response.status(Status.OK).entity(nodeInfosJson).build();
}
-
-
@TimedResource
@POST
@Consumes(APPLICATION_JSON)
@@ -145,11 +142,12 @@ public class NodesInfoResource extends JaxRsResourceBase {
@ApiOperation(value = "Trigger a node command")
@ApiResponses(value = {@ApiResponse(code = 400, message = "Invalid node command supplied")})
public Response triggerNodeCommand(final NodeCommandJson json,
- @HeaderParam(HDR_CREATED_BY) final String createdBy,
- @HeaderParam(HDR_REASON) final String reason,
- @HeaderParam(HDR_COMMENT) final String comment,
- @javax.ws.rs.core.Context final HttpServletRequest request,
- @javax.ws.rs.core.Context final UriInfo uriInfo) throws AccountApiException {
+ @QueryParam(QUERY_LOCAL_NODE_ONLY) @DefaultValue("false") final Boolean localNodeOnly,
+ @HeaderParam(HDR_CREATED_BY) final String createdBy,
+ @HeaderParam(HDR_REASON) final String reason,
+ @HeaderParam(HDR_COMMENT) final String comment,
+ @javax.ws.rs.core.Context final HttpServletRequest request,
+ @javax.ws.rs.core.Context final UriInfo uriInfo) throws AccountApiException {
final NodeCommandMetadata metadata = toNodeCommandMetadata(json);
@@ -168,7 +166,7 @@ public class NodesInfoResource extends JaxRsResourceBase {
}
};
- killbillInfoApi.triggerNodeCommand(nodeCommand);
+ killbillInfoApi.triggerNodeCommand(nodeCommand, localNodeOnly);
return Response.status(Status.CREATED).build();
}
diff --git a/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastApi.java b/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastApi.java
index 33524f1..7c24739 100644
--- a/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastApi.java
+++ b/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastApi.java
@@ -21,21 +21,41 @@ import javax.inject.Inject;
import org.joda.time.DateTime;
import org.killbill.billing.broadcast.BroadcastApi;
+import org.killbill.billing.events.BroadcastInternalEvent;
import org.killbill.billing.util.broadcast.dao.BroadcastDao;
import org.killbill.billing.util.broadcast.dao.BroadcastModelDao;
+import org.killbill.bus.api.PersistentBus;
+import org.killbill.bus.api.PersistentBus.EventBusException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class DefaultBroadcastApi implements BroadcastApi {
+ private final Logger logger = LoggerFactory.getLogger(DefaultBroadcastApi.class);
+
private final BroadcastDao dao;
+ private final PersistentBus eventBus;
@Inject
- public DefaultBroadcastApi(final BroadcastDao dao) {
+ public DefaultBroadcastApi(final BroadcastDao dao, final PersistentBus eventBus) {
this.dao = dao;
+ this.eventBus = eventBus;
}
@Override
- public void broadcast(final String serviceName, final String type, final String event, final DateTime createdDate, final String createdBy) {
- final BroadcastModelDao modelDao = new BroadcastModelDao(serviceName, type, event, createdDate, createdBy);
- dao.create(modelDao);
+ public void broadcast(final String serviceName, final String type, final String event, final DateTime createdDate, final String createdBy, final boolean localNodeOnly) {
+
+ // If localNodeOnly, this is not really a broadcast api anymore, but we still rely on that broadcast bus event to notify the local node
+ if (localNodeOnly) {
+ final BroadcastInternalEvent busEvent = new DefaultBroadcastInternalEvent(serviceName, type, event);
+ try {
+ eventBus.post(busEvent);
+ } catch (final EventBusException e) {
+ logger.warn("Failed to deliver bus event ", e);
+ }
+ } else {
+ final BroadcastModelDao modelDao = new BroadcastModelDao(serviceName, type, event, createdDate, createdBy);
+ dao.create(modelDao);
+ }
}
}
diff --git a/util/src/main/java/org/killbill/billing/util/nodes/DefaultKillbillNodesApi.java b/util/src/main/java/org/killbill/billing/util/nodes/DefaultKillbillNodesApi.java
index 8e14ee2..ddb6a38 100644
--- a/util/src/main/java/org/killbill/billing/util/nodes/DefaultKillbillNodesApi.java
+++ b/util/src/main/java/org/killbill/billing/util/nodes/DefaultKillbillNodesApi.java
@@ -81,12 +81,12 @@ public class DefaultKillbillNodesApi implements KillbillNodesApi {
}
@Override
- public void triggerNodeCommand(final NodeCommand nodeCommand) {
+ public void triggerNodeCommand(final NodeCommand nodeCommand, final boolean localNodeOnly) {
final String event;
try {
event = mapper.serializeNodeCommand(nodeCommand.getNodeCommandMetadata());
- broadcastApi.broadcast(DefaultKillbillNodesService.NODES_SERVICE_NAME, nodeCommand.getNodeCommandType(), event, clock.getUTCNow(), "unset");
+ broadcastApi.broadcast(DefaultKillbillNodesService.NODES_SERVICE_NAME, nodeCommand.getNodeCommandType(), event, clock.getUTCNow(), "unset", localNodeOnly);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);