Details
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/EntitlementUserApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/EntitlementUserApi.java
index 127eedb..decc852 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/EntitlementUserApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/EntitlementUserApi.java
@@ -41,6 +41,7 @@ import com.ning.billing.entitlement.events.phase.IPhaseEvent;
import com.ning.billing.entitlement.events.phase.PhaseEvent;
import com.ning.billing.entitlement.events.user.ApiEventCreate;
import com.ning.billing.entitlement.exceptions.EntitlementError;
+import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.IClock;
public class EntitlementUserApi implements IEntitlementUserApi {
@@ -98,8 +99,8 @@ public class EntitlementUserApi implements IEntitlementUserApi {
BillingPeriod term, String priceList, DateTime requestedDate) throws EntitlementUserApiException {
String realPriceList = (priceList == null) ? IPriceListSet.DEFAULT_PRICELIST_NAME : priceList;
-
DateTime now = clock.getUTCNow();
+ requestedDate = (requestedDate != null) ? Clock.truncateMs(requestedDate) : null;
if (requestedDate != null && requestedDate.isAfter(now)) {
throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_DATE, requestedDate.toString());
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/Subscription.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/Subscription.java
index 05f6e9d..c266f61 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/Subscription.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/Subscription.java
@@ -54,6 +54,7 @@ import com.ning.billing.entitlement.events.user.ApiEventUncancel;
import com.ning.billing.entitlement.events.user.IApiEvent;
import com.ning.billing.entitlement.exceptions.EntitlementError;
import com.ning.billing.entitlement.glue.InjectorMagic;
+import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.IClock;
public class Subscription extends PrivateFields implements ISubscription {
@@ -218,6 +219,7 @@ public class Subscription extends PrivateFields implements ISubscription {
}
DateTime now = clock.getUTCNow();
+ requestedDate = (requestedDate != null) ? Clock.truncateMs(requestedDate) : null;
if (requestedDate != null && requestedDate.isAfter(now)) {
throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_DATE, requestedDate.toString());
}
@@ -260,6 +262,7 @@ public class Subscription extends PrivateFields implements ISubscription {
public void changePlan(String productName, BillingPeriod term,
String priceList, DateTime requestedDate) throws EntitlementUserApiException {
+ requestedDate = (requestedDate != null) ? Clock.truncateMs(requestedDate) : null;
String currentPriceList = getCurrentPriceList();
SubscriptionState currentState = getState();
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java
index a22e5b3..acaa7ad 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java
@@ -78,7 +78,7 @@ public abstract class ApiEventProcessorBase implements IApiEventProcessor {
@Override
- public void startNotifications(IEventListener listener) {
+ public void startNotifications(final IEventListener listener) {
this.listener = listener;
this.isProcessingEvents = true;
@@ -87,6 +87,7 @@ public abstract class ApiEventProcessorBase implements IApiEventProcessor {
if (config.isEventProcessingOff()) {
log.warn("KILLBILL ENTITLEMENT EVENT PROCESSING IS OFF !!!");
+ listener.completedNotificationStart();
return;
}
final ApiEventProcessorBase apiEventProcessor = this;
@@ -95,6 +96,7 @@ public abstract class ApiEventProcessorBase implements IApiEventProcessor {
if (executor != null) {
log.warn("There is already an executor thread running, return");
+ listener.completedNotificationStart();
return;
}
@@ -122,6 +124,10 @@ public abstract class ApiEventProcessorBase implements IApiEventProcessor {
log.info(String.format("ApiEventProcessor thread %s [%d] started", API_EVENT_THREAD_NAME,
Thread.currentThread().getId()));
+
+ // Thread is now started, notify the listener
+ listener.completedNotificationStart();
+
try {
while (true) {
synchronized (apiEventProcessor) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 622fa12..2587baa 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -38,8 +38,7 @@ import com.ning.billing.entitlement.events.IEvent;
import com.ning.billing.entitlement.events.IEvent.EventType;
import com.ning.billing.entitlement.events.phase.IPhaseEvent;
import com.ning.billing.entitlement.events.phase.PhaseEvent;
-import com.ning.billing.entitlement.events.user.IApiEvent;
-import com.ning.billing.lifecycle.IService;
+import com.ning.billing.entitlement.exceptions.EntitlementError;
import com.ning.billing.lifecycle.LyfecycleHandlerType;
import com.ning.billing.lifecycle.LyfecycleHandlerType.LyfecycleLevel;
import com.ning.billing.util.clock.IClock;
@@ -50,6 +49,10 @@ public class Engine implements IEventListener, IEntitlementService {
private static final String ENTITLEMENT_SERVICE_NAME = "entitlement-service";
+ private final long MAX_NOTIFICATION_THREAD_WAIT_MS = 10000; // 10 secs
+ private final long NOTIFICATION_THREAD_WAIT_INCREMENT_MS = 1000; // 1 sec
+ private final long NANO_TO_MS = (1000 * 1000);
+
private final static Logger log = LoggerFactory.getLogger(Engine.class);
private final IClock clock;
@@ -61,6 +64,8 @@ public class Engine implements IEventListener, IEntitlementService {
private final IEntitlementTestApi testApi;
private final IEventBus eventBus;
+ private boolean startedNotificationThread;
+
@Inject
public Engine(IClock clock, IEntitlementDao dao, IApiEventProcessor apiEventProcessor,
IPlanAligner planAligner, IEntitlementConfig config, EntitlementUserApi userApi,
@@ -74,6 +79,8 @@ public class Engine implements IEventListener, IEntitlementService {
this.testApi = testApi;
this.billingApi = billingApi;
this.eventBus = eventBus;
+
+ this.startedNotificationThread = false;
}
@Override
@@ -89,11 +96,13 @@ public class Engine implements IEventListener, IEntitlementService {
@LyfecycleHandlerType(LyfecycleLevel.START_SERVICE)
public void start() {
apiEventProcessor.startNotifications(this);
+ waitForNotificationStartCompletion();
}
@LyfecycleHandlerType(LyfecycleLevel.STOP_SERVICE)
public void stop() {
apiEventProcessor.stopNotifications();
+ startedNotificationThread = false;
}
@Override
@@ -127,7 +136,42 @@ public class Engine implements IEventListener, IEntitlementService {
} catch (EventBusException e) {
log.warn("Failed to post entitlement event " + event, e);
}
+ }
+ //
+ // We want to ensure the notification thread is indeed started when we return from start()
+ //
+ @Override
+ public void completedNotificationStart() {
+ synchronized (this) {
+ startedNotificationThread = true;
+ this.notifyAll();
+ }
+ }
+
+ private void waitForNotificationStartCompletion() {
+
+ long ini = System.nanoTime();
+ synchronized(this) {
+ do {
+ if (startedNotificationThread) {
+ break;
+ }
+ try {
+ this.wait(NOTIFICATION_THREAD_WAIT_INCREMENT_MS);
+ } catch (InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ throw new EntitlementError(e);
+ }
+ } while (!startedNotificationThread &&
+ (System.nanoTime() - ini) / NANO_TO_MS < MAX_NOTIFICATION_THREAD_WAIT_MS);
+
+ if (!startedNotificationThread) {
+ log.error("Could not start notification thread in {} msec !!!", MAX_NOTIFICATION_THREAD_WAIT_MS);
+ throw new EntitlementError("Failed to start service!!");
+ }
+ log.info("Notification thread has been started in {} ms", (System.nanoTime() - ini) / NANO_TO_MS);
+ }
}
private void insertNextPhaseEvent(Subscription subscription) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/IEventListener.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/IEventListener.java
index f973279..651f8a9 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/IEventListener.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/IEventListener.java
@@ -21,5 +21,8 @@ import com.ning.billing.entitlement.events.IEvent;
public interface IEventListener {
+
public void processEventReady(IEvent event);
+
+ public void completedNotificationStart();
}
diff --git a/util/src/main/java/com/ning/billing/util/clock/Clock.java b/util/src/main/java/com/ning/billing/util/clock/Clock.java
index 43819ba..a60b393 100644
--- a/util/src/main/java/com/ning/billing/util/clock/Clock.java
+++ b/util/src/main/java/com/ning/billing/util/clock/Clock.java
@@ -29,7 +29,7 @@ public class Clock implements IClock {
@Override
public DateTime getNow(DateTimeZone tz) {
DateTime result = new DateTime(tz);
- return result.minus(result.getMillisOfSecond());
+ return truncateMs(result);
}
@Override
@@ -38,6 +38,10 @@ public class Clock implements IClock {
}
+ public static DateTime truncateMs(DateTime input) {
+ return input.minus(input.getMillisOfSecond());
+ }
+
public static DateTime addDuration(DateTime input, List<IDuration> durations) {
DateTime result = input;