Details
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
index 0ac07c0..ef82964 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
@@ -165,6 +165,7 @@ public abstract class TestApiBase {
final String entitlementDdl = IOUtils.toString(TestApiBase.class.getResourceAsStream("/com/ning/billing/entitlement/ddl.sql"));
final String utilDdl = IOUtils.toString(TestApiBase.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
helper.startMysql();
+ helper.cleanupAllTables();
helper.initDb(entitlementDdl);
helper.initDb(utilDdl);
}
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java b/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
index 94f5a8d..b9c6faa 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
@@ -23,13 +23,15 @@ public class BusEventEntry implements NotificationLifecycle {
private final long id;
private final String owner;
+ private final String createdOwner;
private final DateTime nextAvailable;
private final NotificationLifecycleState processingState;
private final String busEventClass;
private final String busEventJson;
- public BusEventEntry(final long id, final String owner, final DateTime nextAvailable, NotificationLifecycleState processingState, final String busEventClass, final String busEventJson) {
+ public BusEventEntry(final long id, final String createdOwner, final String owner, final DateTime nextAvailable, NotificationLifecycleState processingState, final String busEventClass, final String busEventJson) {
this.id = id;
+ this.createdOwner = createdOwner;
this.owner = owner;
this.nextAvailable = nextAvailable;
this.processingState = processingState;
@@ -37,8 +39,8 @@ public class BusEventEntry implements NotificationLifecycle {
this.busEventJson = busEventJson;
}
- public BusEventEntry(final String busEventClass, final String busEventJson) {
- this(0, null, null, null, busEventClass, busEventJson);
+ public BusEventEntry(final String createdOwner, final String busEventClass, final String busEventJson) {
+ this(0, createdOwner, null, null, null, busEventClass, busEventJson);
}
@@ -61,6 +63,11 @@ public class BusEventEntry implements NotificationLifecycle {
}
@Override
+ public String getCreatedOwner() {
+ return createdOwner;
+ }
+
+ @Override
public DateTime getNextAvailableDate() {
return nextAvailable;
}
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
index 15ee76d..f8007de 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
@@ -42,7 +42,7 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
@SqlQuery
@Mapper(PersistentBusSqlMapper.class)
- public BusEventEntry getNextBusEventEntry(@Bind("max") int max, @Bind("now") Date now);
+ public BusEventEntry getNextBusEventEntry(@Bind("max") int max, @Bind("owner") String owner, @Bind("now") Date now);
@SqlUpdate
public int claimBusEvent(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("id") long id, @Bind("now") Date now);
@@ -67,6 +67,7 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
stmt.bind("class_name", evt.getBusEventClass());
stmt.bind("event_json", evt.getBusEventJson());
stmt.bind("created_dt", getDate(new DateTime()));
+ stmt.bind("creating_owner", evt.getCreatedOwner());
stmt.bind("processing_available_dt", getDate(evt.getNextAvailableDate()));
stmt.bind("processing_owner", evt.getOwner());
stmt.bind("processing_state", NotificationLifecycleState.AVAILABLE.toString());
@@ -81,12 +82,13 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
final long id = r.getLong("id");
final String className = r.getString("class_name");
+ final String createdOwner = r.getString("creating_owner");
final String eventJson = r.getString("event_json");
final DateTime nextAvailableDate = getDate(r, "processing_available_dt");
final String processingOwner = r.getString("processing_owner");
final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
- return new BusEventEntry(id, processingOwner, nextAvailableDate, processingState, className, eventJson);
+ return new BusEventEntry(id, createdOwner, processingOwner, nextAvailableDate, processingState, className, eventJson);
}
}
}
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
index a556028..2b1b834 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
@@ -218,7 +218,7 @@ public class PersistentBus implements Bus {
final Date now = clock.getUTCNow().toDate();
final Date nextAvailable = clock.getUTCNow().plus(DELTA_IN_PROCESSING_TIME_MS).toDate();
- BusEventEntry input = dao.getNextBusEventEntry(MAX_BUS_EVENTS, now);
+ BusEventEntry input = dao.getNextBusEventEntry(MAX_BUS_EVENTS, hostname, now);
if (input == null) {
return Collections.emptyList();
}
@@ -291,7 +291,7 @@ public class PersistentBus implements Bus {
private void postFromTransaction(BusEvent event, PersistentBusSqlDao transactional) {
try {
String json = objectMapper.writeValueAsString(event);
- BusEventEntry entry = new BusEventEntry(event.getClass().getName(), json);
+ BusEventEntry entry = new BusEventEntry(hostname, event.getClass().getName(), json);
transactional.insertBusEvent(entry);
} catch (Exception e) {
log.error("Failed to post BusEvent " + event.toString(), e);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index bb97dd7..29cd69c 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -49,7 +49,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
//
@SqlQuery
@Mapper(NotificationSqlMapper.class)
- public List<Notification> getReadyNotifications(@Bind("now") Date now, @Bind("max") int max, @Bind("queue_name") String queueName);
+ public List<Notification> getReadyNotifications(@Bind("now") Date now, @Bind("owner") String owner, @Bind("max") int max, @Bind("queue_name") String queueName);
@SqlUpdate
public int claimNotification(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("id") long id, @Bind("now") Date now);
@@ -71,6 +71,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
public void bind(@SuppressWarnings("rawtypes") SQLStatement stmt, Bind bind, Notification evt) {
stmt.bind("notification_id", evt.getUUID().toString());
stmt.bind("created_dt", getDate(new DateTime()));
+ stmt.bind("creating_owner", evt.getCreatedOwner());
stmt.bind("notification_key", evt.getNotificationKey());
stmt.bind("effective_dt", getDate(evt.getEffectiveDate()));
stmt.bind("queue_name", evt.getQueueName());
@@ -88,6 +89,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
final long id = r.getLong("id");
final UUID uuid = UUID.fromString(r.getString("notification_id"));
+ final String createdOwner = r.getString("creating_owner");
final String notificationKey = r.getString("notification_key");
final String queueName = r.getString("queue_name");
final DateTime effectiveDate = getDate(r, "effective_dt");
@@ -95,7 +97,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
final String processingOwner = r.getString("processing_owner");
final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
- return new DefaultNotification(id, uuid, processingOwner, queueName, nextAvailableDate,
+ return new DefaultNotification(id, uuid, createdOwner, processingOwner, queueName, nextAvailableDate,
processingState, notificationKey, effectiveDate);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index 26e6c4e..9d2c601 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -25,6 +25,7 @@ public class DefaultNotification implements Notification {
private final long id;
private final UUID uuid;
private final String owner;
+ private final String createdOwner;
private final String queueName;
private final DateTime nextAvailableDate;
private final NotificationLifecycleState lifecycleState;
@@ -32,13 +33,14 @@ public class DefaultNotification implements Notification {
private final DateTime effectiveDate;
- public DefaultNotification(long id, UUID uuid, String owner, String queueName, DateTime nextAvailableDate,
+ public DefaultNotification(long id, UUID uuid, String createdOwner, String owner, String queueName, DateTime nextAvailableDate,
NotificationLifecycleState lifecycleState,
String notificationKey, DateTime effectiveDate) {
super();
this.id = id;
this.uuid = uuid;
this.owner = owner;
+ this.createdOwner = createdOwner;
this.queueName = queueName;
this.nextAvailableDate = nextAvailableDate;
this.lifecycleState = lifecycleState;
@@ -51,8 +53,8 @@ public class DefaultNotification implements Notification {
return id;
}
- public DefaultNotification(String queueName, String notificationKey, DateTime effectiveDate) {
- this(-1L, UUID.randomUUID(), null, queueName, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
+ public DefaultNotification(String queueName, String createdOwner, String notificationKey, DateTime effectiveDate) {
+ this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
}
@Override
public UUID getUUID() {
@@ -108,4 +110,8 @@ public class DefaultNotification implements Notification {
return queueName;
}
+ @Override
+ public String getCreatedOwner() {
+ return createdOwner;
+ }
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 392a218..c6958c2 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -68,7 +68,7 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
@Override
public void recordFutureNotification(DateTime futureNotificationTime, NotificationKey notificationKey) {
- Notification notification = new DefaultNotification(getFullQName(), notificationKey.toString(), futureNotificationTime);
+ Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.toString(), futureNotificationTime);
dao.insertNotification(notification);
}
@@ -76,7 +76,7 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
final DateTime futureNotificationTime, final NotificationKey notificationKey) {
NotificationSqlDao transactionalNotificationDao = transactionalDao.become(NotificationSqlDao.class);
- Notification notification = new DefaultNotification(getFullQName(), notificationKey.toString(), futureNotificationTime);
+ Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.toString(), futureNotificationTime);
transactionalNotificationDao.insertNotification(notification);
}
@@ -90,7 +90,7 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
final Date now = clock.getUTCNow().toDate();
final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
- List<Notification> input = dao.getReadyNotifications(now, config.getDaoMaxReadyEvents(), getFullQName());
+ List<Notification> input = dao.getReadyNotifications(now, hostname, config.getDaoMaxReadyEvents(), getFullQName());
List<Notification> claimedNotifications = new ArrayList<Notification>();
for (Notification cur : input) {
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
index d59098b..0b60c2d 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
@@ -26,7 +26,7 @@ public interface Notification extends NotificationLifecycle {
public long getId();
public UUID getUUID();
-
+
public String getNotificationKey();
public DateTime getEffectiveDate();
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
index 32cba9d..0110cf5 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
@@ -29,6 +29,8 @@ public interface NotificationLifecycle {
}
public String getOwner();
+
+ public String getCreatedOwner();
public DateTime getNextAvailableDate();
diff --git a/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
index 93d0a9b..2ac1869 100644
--- a/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
@@ -6,6 +6,7 @@ getNextBusEventEntry(max, now) ::= <<
, class_name
, event_json
, created_dt
+ , creating_owner
, processing_owner
, processing_available_dt
, processing_state
@@ -59,6 +60,7 @@ insertBusEvent() ::= <<
class_name
, event_json
, created_dt
+ , creating_owner
, processing_owner
, processing_available_dt
, processing_state
@@ -66,6 +68,7 @@ insertBusEvent() ::= <<
:class_name
, :event_json
, :created_dt
+ , :creating_owner
, :processing_owner
, :processing_available_dt
, :processing_state
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index 9a241a0..cec35b1 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -85,6 +85,7 @@ CREATE TABLE notifications (
notification_id char(36) NOT NULL,
created_dt datetime NOT NULL,
notification_key varchar(256) NOT NULL,
+ creating_owner char(50) NOT NULL,
effective_dt datetime NOT NULL,
queue_name char(64) NOT NULL,
processing_owner char(50) DEFAULT NULL,
@@ -128,6 +129,7 @@ CREATE TABLE bus_events (
class_name varchar(128) NOT NULL,
event_json varchar(2048) NOT NULL,
created_dt datetime NOT NULL,
+ creating_owner char(50) NOT NULL,
processing_owner char(50) DEFAULT NULL,
processing_available_dt datetime DEFAULT NULL,
processing_state varchar(14) DEFAULT 'AVAILABLE',
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 899e828..6751f04 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -6,6 +6,7 @@ getReadyNotifications(now, max) ::= <<
, notification_id
, notification_key
, created_dt
+ , creating_owner
, effective_dt
, queue_name
, processing_owner
@@ -65,6 +66,7 @@ insertNotification() ::= <<
notification_id
, notification_key
, created_dt
+ , creating_owner
, effective_dt
, queue_name
, processing_owner
@@ -74,6 +76,7 @@ insertNotification() ::= <<
:notification_id
, :notification_key
, :created_dt
+ , :creating_owner
, :effective_dt
, :queue_name
, :processing_owner
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index d5de415..e52bf20 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -50,7 +50,8 @@ import static org.testng.Assert.assertNotNull;
public class TestNotificationSqlDao {
private static AtomicInteger sequenceId = new AtomicInteger();
-
+ private final static String hostname = "Yop";
+
@Inject
private IDBI dbi;
@@ -104,12 +105,12 @@ public class TestNotificationSqlDao {
String notificationKey = UUID.randomUUID().toString();
DateTime effDt = new DateTime();
- Notification notif = new DefaultNotification("testBasic",notificationKey, effDt);
+ Notification notif = new DefaultNotification("testBasic", hostname, notificationKey, effDt);
dao.insertNotification(notif);
Thread.sleep(1000);
DateTime now = new DateTime();
- List<Notification> notifications = dao.getReadyNotifications(now.toDate(), 3, "testBasic");
+ List<Notification> notifications = dao.getReadyNotifications(now.toDate(), hostname, 3, "testBasic");
assertNotNull(notifications);
assertEquals(notifications.size(), 1);
@@ -153,6 +154,7 @@ public class TestNotificationSqlDao {
", notification_id" +
", notification_key" +
", created_dt" +
+ ", creating_owner" +
", effective_dt" +
", queue_name" +
", processing_owner" +
@@ -197,12 +199,6 @@ public class TestNotificationSqlDao {
bind(MysqlTestingHelper.class).toInstance(helper);
IDBI dbi = helper.getDBI();
bind(IDBI.class).toInstance(dbi);
-
- /*
- bind(DBI.class).toProvider(DBIProvider.class).asEagerSingleton();
- final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
- bind(DbiConfig.class).toInstance(config);
- */
}
}
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index a9cd1db..658752a 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -50,7 +50,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
@Override
public void recordFutureNotification(DateTime futureNotificationTime, NotificationKey notificationKey) {
- Notification notification = new DefaultNotification("MockQueue", notificationKey.toString(), futureNotificationTime);
+ Notification notification = new DefaultNotification("MockQueue", hostname, notificationKey.toString(), futureNotificationTime);
synchronized(notifications) {
notifications.add(notification);
}
@@ -96,7 +96,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
result = readyNotifications.size();
for (Notification cur : readyNotifications) {
handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
- DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
+ DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
oldNotifications.add(cur);
processedNotifications.add(processedNotification);
}