killbill-aplcache

Details

diff --git a/api/src/main/java/org/killbill/billing/broadcast/BroadcastApi.java b/api/src/main/java/org/killbill/billing/broadcast/BroadcastApi.java
index 6fcab7b..e8aec63 100644
--- a/api/src/main/java/org/killbill/billing/broadcast/BroadcastApi.java
+++ b/api/src/main/java/org/killbill/billing/broadcast/BroadcastApi.java
@@ -20,5 +20,5 @@ package org.killbill.billing.broadcast;
 import org.joda.time.DateTime;
 
 public interface BroadcastApi {
-    public void broadcast(String serviceName, String type, String event, DateTime createdDate, String createdBy);
+    public void broadcast(String serviceName, String type, String event, DateTime createdDate, String createdBy, boolean localNodeOnly);
 }
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
index 6443bda..ba6bad6 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
@@ -197,7 +197,7 @@ public class TestPublicBus extends TestIntegrationBase {
 
         // Verify the internal bus first
         busHandler.pushExpectedEvent(NextEvent.BROADCAST_SERVICE);
-        nodesApi.triggerNodeCommand(nodeCommand);
+        nodesApi.triggerNodeCommand(nodeCommand, false);
         assertListenerStatus();
 
         // Verify the public bus
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithFakeKPMPlugin.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithFakeKPMPlugin.java
index a822b7d..4d9c7d8 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithFakeKPMPlugin.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithFakeKPMPlugin.java
@@ -348,7 +348,7 @@ public class TestWithFakeKPMPlugin extends TestIntegrationBase {
             }
         };
         busHandler.pushExpectedEvent(NextEvent.BROADCAST_SERVICE);
-        nodesApi.triggerNodeCommand(nodeCommand);
+        nodesApi.triggerNodeCommand(nodeCommand, false);
         assertListenerStatus();
 
         // Exit condition is based on the new config being updated on disk
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxrsResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxrsResource.java
index 85234aa..2fdd31e 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxrsResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxrsResource.java
@@ -61,6 +61,9 @@ public interface JaxrsResource {
     /*
      * Query parameters
      */
+
+
+    public static final String QUERY_LOCAL_NODE_ONLY = "localNodeOnly";
     public static final String QUERY_EXTERNAL_KEY = "externalKey";
     public static final String QUERY_API_KEY = "apiKey";
     public static final String QUERY_REQUESTED_DT = "requestedDate";
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/NodesInfoResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/NodesInfoResource.java
index b6fa653..4701262 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/NodesInfoResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/NodesInfoResource.java
@@ -24,21 +24,20 @@ import java.util.Set;
 import javax.inject.Inject;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.HeaderParam;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriInfo;
 
-import org.killbill.billing.account.api.Account;
 import org.killbill.billing.account.api.AccountApiException;
-import org.killbill.billing.account.api.AccountData;
 import org.killbill.billing.account.api.AccountUserApi;
 import org.killbill.billing.entitlement.api.SubscriptionApiException;
-import org.killbill.billing.jaxrs.json.AccountJson;
 import org.killbill.billing.jaxrs.json.NodeCommandJson;
 import org.killbill.billing.jaxrs.json.NodeInfoJson;
 import org.killbill.billing.jaxrs.json.PluginInfoJson;
@@ -136,8 +135,6 @@ public class NodesInfoResource extends JaxRsResourceBase {
         return Response.status(Status.OK).entity(nodeInfosJson).build();
     }
 
-
-
     @TimedResource
     @POST
     @Consumes(APPLICATION_JSON)
@@ -145,11 +142,12 @@ public class NodesInfoResource extends JaxRsResourceBase {
     @ApiOperation(value = "Trigger a node command")
     @ApiResponses(value = {@ApiResponse(code = 400, message = "Invalid node command supplied")})
     public Response triggerNodeCommand(final NodeCommandJson json,
-                                  @HeaderParam(HDR_CREATED_BY) final String createdBy,
-                                  @HeaderParam(HDR_REASON) final String reason,
-                                  @HeaderParam(HDR_COMMENT) final String comment,
-                                  @javax.ws.rs.core.Context final HttpServletRequest request,
-                                  @javax.ws.rs.core.Context final UriInfo uriInfo) throws AccountApiException {
+                                       @QueryParam(QUERY_LOCAL_NODE_ONLY) @DefaultValue("false") final Boolean localNodeOnly,
+                                       @HeaderParam(HDR_CREATED_BY) final String createdBy,
+                                       @HeaderParam(HDR_REASON) final String reason,
+                                       @HeaderParam(HDR_COMMENT) final String comment,
+                                       @javax.ws.rs.core.Context final HttpServletRequest request,
+                                       @javax.ws.rs.core.Context final UriInfo uriInfo) throws AccountApiException {
 
         final NodeCommandMetadata metadata = toNodeCommandMetadata(json);
 
@@ -168,7 +166,7 @@ public class NodesInfoResource extends JaxRsResourceBase {
             }
         };
 
-        killbillInfoApi.triggerNodeCommand(nodeCommand);
+        killbillInfoApi.triggerNodeCommand(nodeCommand, localNodeOnly);
         return Response.status(Status.CREATED).build();
     }
 
diff --git a/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastApi.java b/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastApi.java
index 33524f1..7c24739 100644
--- a/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastApi.java
+++ b/util/src/main/java/org/killbill/billing/util/broadcast/DefaultBroadcastApi.java
@@ -21,21 +21,41 @@ import javax.inject.Inject;
 
 import org.joda.time.DateTime;
 import org.killbill.billing.broadcast.BroadcastApi;
+import org.killbill.billing.events.BroadcastInternalEvent;
 import org.killbill.billing.util.broadcast.dao.BroadcastDao;
 import org.killbill.billing.util.broadcast.dao.BroadcastModelDao;
+import org.killbill.bus.api.PersistentBus;
+import org.killbill.bus.api.PersistentBus.EventBusException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DefaultBroadcastApi implements BroadcastApi {
 
+    private final Logger logger = LoggerFactory.getLogger(DefaultBroadcastApi.class);
+
     private final BroadcastDao dao;
+    private final PersistentBus eventBus;
 
     @Inject
-    public DefaultBroadcastApi(final BroadcastDao dao) {
+    public DefaultBroadcastApi(final BroadcastDao dao, final PersistentBus eventBus) {
         this.dao = dao;
+        this.eventBus = eventBus;
     }
 
     @Override
-    public void broadcast(final String serviceName, final String type, final String event, final DateTime createdDate, final String createdBy) {
-        final BroadcastModelDao modelDao = new BroadcastModelDao(serviceName, type, event, createdDate, createdBy);
-        dao.create(modelDao);
+    public void broadcast(final String serviceName, final String type, final String event, final DateTime createdDate, final String createdBy, final boolean localNodeOnly) {
+
+        // If localNodeOnly, this is not really a broadcast api anymore, but we still rely on that broadcast bus event to notify the local node
+        if (localNodeOnly) {
+            final BroadcastInternalEvent busEvent = new DefaultBroadcastInternalEvent(serviceName, type, event);
+            try {
+                eventBus.post(busEvent);
+            } catch (final EventBusException e) {
+                logger.warn("Failed to deliver bus event ", e);
+            }
+        } else {
+            final BroadcastModelDao modelDao = new BroadcastModelDao(serviceName, type, event, createdDate, createdBy);
+            dao.create(modelDao);
+        }
     }
 }
diff --git a/util/src/main/java/org/killbill/billing/util/nodes/DefaultKillbillNodesApi.java b/util/src/main/java/org/killbill/billing/util/nodes/DefaultKillbillNodesApi.java
index 8e14ee2..ddb6a38 100644
--- a/util/src/main/java/org/killbill/billing/util/nodes/DefaultKillbillNodesApi.java
+++ b/util/src/main/java/org/killbill/billing/util/nodes/DefaultKillbillNodesApi.java
@@ -81,12 +81,12 @@ public class DefaultKillbillNodesApi implements KillbillNodesApi {
     }
 
     @Override
-    public void triggerNodeCommand(final NodeCommand nodeCommand) {
+    public void triggerNodeCommand(final NodeCommand nodeCommand, final boolean localNodeOnly) {
 
         final String event;
         try {
             event = mapper.serializeNodeCommand(nodeCommand.getNodeCommandMetadata());
-            broadcastApi.broadcast(DefaultKillbillNodesService.NODES_SERVICE_NAME, nodeCommand.getNodeCommandType(), event, clock.getUTCNow(), "unset");
+            broadcastApi.broadcast(DefaultKillbillNodesService.NODES_SERVICE_NAME, nodeCommand.getNodeCommandType(), event, clock.getUTCNow(), "unset", localNodeOnly);
 
         } catch (JsonProcessingException e) {
             throw new RuntimeException(e);