killbill-memoizeit

Details

diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/EntitlementUserApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/EntitlementUserApi.java
index 127eedb..decc852 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/EntitlementUserApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/EntitlementUserApi.java
@@ -41,6 +41,7 @@ import com.ning.billing.entitlement.events.phase.IPhaseEvent;
 import com.ning.billing.entitlement.events.phase.PhaseEvent;
 import com.ning.billing.entitlement.events.user.ApiEventCreate;
 import com.ning.billing.entitlement.exceptions.EntitlementError;
+import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.IClock;
 
 public class EntitlementUserApi implements IEntitlementUserApi {
@@ -98,8 +99,8 @@ public class EntitlementUserApi implements IEntitlementUserApi {
             BillingPeriod term, String priceList, DateTime requestedDate) throws EntitlementUserApiException {
 
         String realPriceList = (priceList == null) ? IPriceListSet.DEFAULT_PRICELIST_NAME : priceList;
-
         DateTime now = clock.getUTCNow();
+        requestedDate = (requestedDate != null) ? Clock.truncateMs(requestedDate) : null;
         if (requestedDate != null && requestedDate.isAfter(now)) {
             throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_DATE, requestedDate.toString());
         }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/Subscription.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/Subscription.java
index 05f6e9d..c266f61 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/Subscription.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/Subscription.java
@@ -54,6 +54,7 @@ import com.ning.billing.entitlement.events.user.ApiEventUncancel;
 import com.ning.billing.entitlement.events.user.IApiEvent;
 import com.ning.billing.entitlement.exceptions.EntitlementError;
 import com.ning.billing.entitlement.glue.InjectorMagic;
+import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.IClock;
 
 public class Subscription extends PrivateFields  implements ISubscription {
@@ -218,6 +219,7 @@ public class Subscription extends PrivateFields  implements ISubscription {
         }
 
         DateTime now = clock.getUTCNow();
+        requestedDate = (requestedDate != null) ? Clock.truncateMs(requestedDate) : null;
         if (requestedDate != null && requestedDate.isAfter(now)) {
             throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_DATE, requestedDate.toString());
         }
@@ -260,6 +262,7 @@ public class Subscription extends PrivateFields  implements ISubscription {
     public void changePlan(String productName, BillingPeriod term,
             String priceList, DateTime requestedDate) throws EntitlementUserApiException {
 
+        requestedDate = (requestedDate != null) ? Clock.truncateMs(requestedDate) : null;
         String currentPriceList = getCurrentPriceList();
 
         SubscriptionState currentState = getState();
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java
index a22e5b3..acaa7ad 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java
@@ -78,7 +78,7 @@ public abstract class ApiEventProcessorBase implements IApiEventProcessor {
 
 
     @Override
-    public void startNotifications(IEventListener listener) {
+    public void startNotifications(final IEventListener listener) {
 
         this.listener = listener;
         this.isProcessingEvents = true;
@@ -87,6 +87,7 @@ public abstract class ApiEventProcessorBase implements IApiEventProcessor {
 
         if (config.isEventProcessingOff()) {
             log.warn("KILLBILL ENTITLEMENT EVENT PROCESSING IS OFF !!!");
+            listener.completedNotificationStart();
             return;
         }
         final ApiEventProcessorBase apiEventProcessor = this;
@@ -95,6 +96,7 @@ public abstract class ApiEventProcessorBase implements IApiEventProcessor {
 
             if (executor != null) {
                 log.warn("There is already an executor thread running, return");
+                listener.completedNotificationStart();
                 return;
             }
 
@@ -122,6 +124,10 @@ public abstract class ApiEventProcessorBase implements IApiEventProcessor {
 
                 log.info(String.format("ApiEventProcessor thread %s  [%d] started", API_EVENT_THREAD_NAME,
                         Thread.currentThread().getId()));
+
+                // Thread is now started, notify the listener
+                listener.completedNotificationStart();
+
                 try {
                     while (true) {
                         synchronized (apiEventProcessor) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 622fa12..2587baa 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -38,8 +38,7 @@ import com.ning.billing.entitlement.events.IEvent;
 import com.ning.billing.entitlement.events.IEvent.EventType;
 import com.ning.billing.entitlement.events.phase.IPhaseEvent;
 import com.ning.billing.entitlement.events.phase.PhaseEvent;
-import com.ning.billing.entitlement.events.user.IApiEvent;
-import com.ning.billing.lifecycle.IService;
+import com.ning.billing.entitlement.exceptions.EntitlementError;
 import com.ning.billing.lifecycle.LyfecycleHandlerType;
 import com.ning.billing.lifecycle.LyfecycleHandlerType.LyfecycleLevel;
 import com.ning.billing.util.clock.IClock;
@@ -50,6 +49,10 @@ public class Engine implements IEventListener, IEntitlementService {
 
     private static final String ENTITLEMENT_SERVICE_NAME = "entitlement-service";
 
+    private final long MAX_NOTIFICATION_THREAD_WAIT_MS = 10000; // 10 secs
+    private final long NOTIFICATION_THREAD_WAIT_INCREMENT_MS = 1000; // 1 sec
+    private final long NANO_TO_MS = (1000 * 1000);
+
     private final static Logger log = LoggerFactory.getLogger(Engine.class);
 
     private final IClock clock;
@@ -61,6 +64,8 @@ public class Engine implements IEventListener, IEntitlementService {
     private final IEntitlementTestApi testApi;
     private final IEventBus eventBus;
 
+    private boolean startedNotificationThread;
+
     @Inject
     public Engine(IClock clock, IEntitlementDao dao, IApiEventProcessor apiEventProcessor,
             IPlanAligner planAligner, IEntitlementConfig config, EntitlementUserApi userApi,
@@ -74,6 +79,8 @@ public class Engine implements IEventListener, IEntitlementService {
         this.testApi = testApi;
         this.billingApi = billingApi;
         this.eventBus = eventBus;
+
+        this.startedNotificationThread = false;
     }
 
     @Override
@@ -89,11 +96,13 @@ public class Engine implements IEventListener, IEntitlementService {
     @LyfecycleHandlerType(LyfecycleLevel.START_SERVICE)
     public void start() {
         apiEventProcessor.startNotifications(this);
+        waitForNotificationStartCompletion();
     }
 
     @LyfecycleHandlerType(LyfecycleLevel.STOP_SERVICE)
     public void stop() {
         apiEventProcessor.stopNotifications();
+        startedNotificationThread = false;
     }
 
     @Override
@@ -127,7 +136,42 @@ public class Engine implements IEventListener, IEntitlementService {
         } catch (EventBusException e) {
             log.warn("Failed to post entitlement event " + event, e);
         }
+    }
 
+    //
+    // We want to ensure the notification thread is indeed started when we return from start()
+    //
+    @Override
+    public void completedNotificationStart() {
+        synchronized (this) {
+            startedNotificationThread = true;
+            this.notifyAll();
+        }
+    }
+
+    private void waitForNotificationStartCompletion() {
+
+        long ini = System.nanoTime();
+        synchronized(this) {
+            do {
+                if (startedNotificationThread) {
+                    break;
+                }
+                try {
+                    this.wait(NOTIFICATION_THREAD_WAIT_INCREMENT_MS);
+                } catch (InterruptedException e ) {
+                    Thread.currentThread().interrupt();
+                    throw new EntitlementError(e);
+                }
+            } while (!startedNotificationThread &&
+                    (System.nanoTime() - ini) / NANO_TO_MS < MAX_NOTIFICATION_THREAD_WAIT_MS);
+
+            if (!startedNotificationThread) {
+                log.error("Could not start notification thread in {} msec !!!", MAX_NOTIFICATION_THREAD_WAIT_MS);
+                throw new EntitlementError("Failed to start service!!");
+            }
+            log.info("Notification thread has been started in {} ms", (System.nanoTime() - ini) / NANO_TO_MS);
+        }
     }
 
     private void insertNextPhaseEvent(Subscription subscription) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/IEventListener.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/IEventListener.java
index f973279..651f8a9 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/IEventListener.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/IEventListener.java
@@ -21,5 +21,8 @@ import com.ning.billing.entitlement.events.IEvent;
 
 
 public interface IEventListener {
+
     public void processEventReady(IEvent event);
+
+    public void completedNotificationStart();
 }
diff --git a/util/src/main/java/com/ning/billing/util/clock/Clock.java b/util/src/main/java/com/ning/billing/util/clock/Clock.java
index 43819ba..a60b393 100644
--- a/util/src/main/java/com/ning/billing/util/clock/Clock.java
+++ b/util/src/main/java/com/ning/billing/util/clock/Clock.java
@@ -29,7 +29,7 @@ public class Clock implements IClock {
     @Override
     public DateTime getNow(DateTimeZone tz) {
        DateTime result = new DateTime(tz);
-       return result.minus(result.getMillisOfSecond());
+       return truncateMs(result);
     }
 
     @Override
@@ -38,6 +38,10 @@ public class Clock implements IClock {
     }
 
 
+    public static DateTime truncateMs(DateTime input) {
+        return input.minus(input.getMillisOfSecond());
+    }
+
     public static DateTime addDuration(DateTime input, List<IDuration> durations) {
 
         DateTime result = input;