Details
diff --git a/util/src/main/java/com/ning/billing/util/tag/api/DefaultTagUserApi.java b/util/src/main/java/com/ning/billing/util/tag/api/DefaultTagUserApi.java
index 1c812b0..075e10e 100644
--- a/util/src/main/java/com/ning/billing/util/tag/api/DefaultTagUserApi.java
+++ b/util/src/main/java/com/ning/billing/util/tag/api/DefaultTagUserApi.java
@@ -64,7 +64,10 @@ public class DefaultTagUserApi implements TagUserApi {
@Override
public void addTags(final UUID objectId, final ObjectType objectType, final List<TagDefinition> tagDefinitions, final CallContext context) throws TagApiException {
- tagDao.insertTags(objectId, objectType, tagDefinitions, context);
+ // TODO: consider making this batch
+ for (final TagDefinition tagDefinition : tagDefinitions) {
+ tagDao.insertTag(objectId, objectType, tagDefinition, context);
+ }
}
@Override
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java
index 6df6ca2..67410ee 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java
@@ -16,7 +16,6 @@
package com.ning.billing.util.tag.dao;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -27,6 +26,8 @@ import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.TransactionFailedException;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.ning.billing.ErrorCode;
@@ -42,14 +43,14 @@ import com.ning.billing.util.dao.Mapper;
import com.ning.billing.util.dao.ObjectType;
import com.ning.billing.util.dao.TableName;
import com.ning.billing.util.entity.collection.dao.UpdatableEntityCollectionSqlDao;
-import com.ning.billing.util.tag.ControlTagType;
-import com.ning.billing.util.tag.DefaultControlTag;
-import com.ning.billing.util.tag.DescriptiveTag;
import com.ning.billing.util.tag.Tag;
import com.ning.billing.util.tag.TagDefinition;
+import com.ning.billing.util.tag.api.TagEvent;
import com.ning.billing.util.tag.api.user.TagEventBuilder;
public class AuditedTagDao extends AuditedCollectionDaoBase<Tag, Tag> implements TagDao {
+ private static final Logger log = LoggerFactory.getLogger(AuditedTagDao.class);
+
private final TagSqlDao tagSqlDao;
private final TagEventBuilder tagEventBuilder;
private final Bus bus;
@@ -67,55 +68,53 @@ public class AuditedTagDao extends AuditedCollectionDaoBase<Tag, Tag> implements
}
@Override
- public void insertTag(final UUID objectId, final ObjectType objectType,
- final TagDefinition tagDefinition, final CallContext context) {
+ public void insertTag(final UUID objectId, final ObjectType objectType, final TagDefinition tagDefinition, final CallContext context) {
tagSqlDao.inTransaction(new Transaction<Void, TagSqlDao>() {
@Override
public Void inTransaction(final TagSqlDao tagSqlDao, final TransactionStatus status) throws Exception {
final String tagId = UUID.randomUUID().toString();
final String tagName = tagDefinition.getName();
+
+ // Create the tag
tagSqlDao.addTagFromTransaction(tagId, tagName, objectId.toString(), objectType, context);
final Tag tag = tagSqlDao.findTag(tagName, objectId.toString(), objectType);
- final List<Tag> tagList = new ArrayList<Tag>();
- tagList.add(tag);
+ final List<Tag> tagList = Arrays.asList(tag);
+ // Gather the tag ids for this object id
final List<Mapper<UUID, Long>> recordIds = tagSqlDao.getRecordIds(objectId.toString(), objectType);
final Map<UUID, Long> recordIdMap = convertToHistoryMap(recordIds);
- final List<EntityHistory<Tag>> entityHistories = new ArrayList<EntityHistory<Tag>>();
- entityHistories.addAll(convertToHistory(tagList, recordIdMap, ChangeType.INSERT));
-
+ // Update the history table
+ final List<EntityHistory<Tag>> entityHistories = convertToHistory(tagList, recordIdMap, ChangeType.INSERT);
final Long maxHistoryRecordId = tagSqlDao.getMaxHistoryRecordId();
tagSqlDao.addHistoryFromTransaction(objectId.toString(), objectType, entityHistories, context);
- // have to fetch history record ids to update audit log
+ // Have to fetch the history record ids to update the audit log
final List<Mapper<Long, Long>> historyRecordIds = tagSqlDao.getHistoryRecordIds(maxHistoryRecordId);
final Map<Long, Long> historyRecordIdMap = convertToAuditMap(historyRecordIds);
final List<EntityAudit> entityAudits = convertToAudits(entityHistories, historyRecordIdMap);
tagSqlDao.insertAuditFromTransaction(entityAudits, context);
+ // Post an event to the Bus
+ final TagEvent tagEvent;
+ if (tagDefinition.isControlTag()) {
+ tagEvent = tagEventBuilder.newControlTagCreationEvent(tag.getId(), objectId, objectType, tagDefinition, context.getUserToken());
+ } else {
+ tagEvent = tagEventBuilder.newUserTagCreationEvent(tag.getId(), objectId, objectType, tagDefinition, context.getUserToken());
+ }
+ try {
+ bus.postFromTransaction(tagEvent, AuditedTagDao.this.tagSqlDao);
+ } catch (Bus.EventBusException e) {
+ log.warn("Failed to post tag creation event for tag " + tag.getId().toString(), e);
+ }
+
return null;
}
});
}
@Override
- public void insertTags(final UUID objectId, final ObjectType objectType, final List<TagDefinition> tagDefinitions, final CallContext context) {
- final List<Tag> tags = new ArrayList<Tag>();
- for (final TagDefinition tagDefinition : tagDefinitions) {
- if (tagDefinition.isControlTag()) {
- final ControlTagType controlTagType = ControlTagType.valueOf(tagDefinition.getName());
- tags.add(new DefaultControlTag(controlTagType));
- } else {
- tags.add(new DescriptiveTag(tagDefinition));
- }
- }
-
- saveEntities(objectId, objectType, tags, context);
- }
-
- @Override
public void deleteTag(final UUID objectId, final ObjectType objectType, final TagDefinition tagDefinition, final CallContext context) throws TagApiException {
try {
tagSqlDao.inTransaction(new Transaction<Void, TagSqlDao>() {
@@ -130,23 +129,36 @@ public class AuditedTagDao extends AuditedCollectionDaoBase<Tag, Tag> implements
final List<Tag> tagList = Arrays.asList(tag);
+ // Before the deletion, gather the tag ids for this object id
final List<Mapper<UUID, Long>> recordIds = tagSqlDao.getRecordIds(objectId.toString(), objectType);
final Map<UUID, Long> recordIdMap = convertToHistoryMap(recordIds);
+ // Delete the tag
tagSqlDao.deleteFromTransaction(objectId.toString(), objectType, tagList, context);
- final List<EntityHistory<Tag>> entityHistories = new ArrayList<EntityHistory<Tag>>();
- entityHistories.addAll(convertToHistory(tagList, recordIdMap, ChangeType.DELETE));
-
+ // Update the history table
+ final List<EntityHistory<Tag>> entityHistories = convertToHistory(tagList, recordIdMap, ChangeType.DELETE);
final Long maxHistoryRecordId = tagSqlDao.getMaxHistoryRecordId();
tagSqlDao.addHistoryFromTransaction(objectId.toString(), objectType, entityHistories, context);
- // have to fetch history record ids to update audit log
+ // Have to fetch the history record ids to update the audit log
final List<Mapper<Long, Long>> historyRecordIds = tagSqlDao.getHistoryRecordIds(maxHistoryRecordId);
final Map<Long, Long> historyRecordIdMap = convertToAuditMap(historyRecordIds);
final List<EntityAudit> entityAudits = convertToAudits(entityHistories, historyRecordIdMap);
tagSqlDao.insertAuditFromTransaction(entityAudits, context);
+ // Post an event to the Bus
+ final TagEvent tagEvent;
+ if (tagDefinition.isControlTag()) {
+ tagEvent = tagEventBuilder.newControlTagDeletionEvent(tag.getId(), objectId, objectType, tagDefinition, context.getUserToken());
+ } else {
+ tagEvent = tagEventBuilder.newUserTagDeletionEvent(tag.getId(), objectId, objectType, tagDefinition, context.getUserToken());
+ }
+ try {
+ bus.postFromTransaction(tagEvent, tagSqlDao);
+ } catch (Bus.EventBusException e) {
+ log.warn("Failed to post tag deletion event for tag " + tag.getId().toString(), e);
+ }
return null;
}
});
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/TagDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/TagDao.java
index 7d0105c..3ffccf2 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/TagDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/TagDao.java
@@ -29,7 +29,5 @@ import com.ning.billing.util.tag.TagDefinition;
public interface TagDao extends AuditedCollectionDao<Tag> {
void insertTag(UUID objectId, ObjectType objectType, TagDefinition tagDefinition, CallContext context) throws TagApiException;
- void insertTags(UUID objectId, ObjectType objectType, List<TagDefinition> tagDefinitions, CallContext context) throws TagApiException;
-
void deleteTag(UUID objectId, ObjectType objectType, TagDefinition tagDefinition, CallContext context) throws TagApiException;
}
diff --git a/util/src/test/java/com/ning/billing/util/tag/dao/MockTagDao.java b/util/src/test/java/com/ning/billing/util/tag/dao/MockTagDao.java
index 09e757d..a09a952 100644
--- a/util/src/test/java/com/ning/billing/util/tag/dao/MockTagDao.java
+++ b/util/src/test/java/com/ning/billing/util/tag/dao/MockTagDao.java
@@ -90,14 +90,6 @@ public class MockTagDao implements TagDao {
}
@Override
- public void insertTags(final UUID objectId, final ObjectType objectType,
- final List<TagDefinition> tagDefinitions, final CallContext context) {
- for (final TagDefinition tagDefinition : tagDefinitions) {
- insertTag(objectId, objectType, tagDefinition, context);
- }
- }
-
- @Override
public void deleteTag(final UUID objectId, final ObjectType objectType,
final TagDefinition tagDefinition, final CallContext context) {
final List<Tag> tags = tagStore.get(objectId);
diff --git a/util/src/test/java/com/ning/billing/util/tag/dao/TestAuditedTagDao.java b/util/src/test/java/com/ning/billing/util/tag/dao/TestAuditedTagDao.java
new file mode 100644
index 0000000..e986950
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/tag/dao/TestAuditedTagDao.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.tag.dao;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.ning.billing.dbi.MysqlTestingHelper;
+import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.bus.BusEvent;
+import com.ning.billing.util.callcontext.CallContext;
+import com.ning.billing.util.callcontext.CallOrigin;
+import com.ning.billing.util.callcontext.DefaultCallContextFactory;
+import com.ning.billing.util.callcontext.UserType;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.dao.ObjectType;
+import com.ning.billing.util.tag.MockTagStoreModuleSql;
+import com.ning.billing.util.tag.Tag;
+import com.ning.billing.util.tag.TagDefinition;
+import com.ning.billing.util.tag.TestTagStore;
+import com.ning.billing.util.tag.api.TagEvent;
+
+@Guice(modules = MockTagStoreModuleSql.class)
+public class TestAuditedTagDao {
+ @Inject
+ private MysqlTestingHelper helper;
+
+ @Inject
+ private TagDefinitionDao tagDefinitionDao;
+
+ @Inject
+ private AuditedTagDao tagDao;
+
+ @Inject
+ private Clock clock;
+
+ @Inject
+ private Bus bus;
+
+ private CallContext context;
+ private EventsListener eventsListener;
+
+ @BeforeClass(groups = "slow")
+ public void setup() throws IOException {
+ final String utilDdl = IOUtils.toString(TestTagStore.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
+
+ helper.startMysql();
+ helper.initDb(utilDdl);
+
+ context = new DefaultCallContextFactory(clock).createCallContext("Tag DAO test", CallOrigin.TEST, UserType.TEST, UUID.randomUUID());
+ bus.start();
+ }
+
+ @BeforeMethod(groups = "slow")
+ public void cleanup() throws Bus.EventBusException {
+ eventsListener = new EventsListener();
+ bus.register(eventsListener);
+ }
+
+ @AfterClass(groups = "slow")
+ public void stopMysql() {
+ bus.stop();
+ helper.stopMysql();
+ }
+
+ @Test(groups = "slow")
+ public void testCatchEventsOnCreateAndDelete() throws Exception {
+ final String definitionName = UUID.randomUUID().toString().substring(0, 5);
+ final String description = UUID.randomUUID().toString().substring(0, 5);
+ final UUID objectId = UUID.randomUUID();
+ final ObjectType objectType = ObjectType.RECURRING_INVOICE_ITEM;
+
+ // Verify the initial state
+ Assert.assertEquals(eventsListener.getEvents().size(), 0);
+ Assert.assertEquals(eventsListener.getTagEvents().size(), 0);
+
+ // Create a tag definition
+ final TagDefinition createdTagDefinition = tagDefinitionDao.create(definitionName, description, context);
+ Assert.assertEquals(createdTagDefinition.getName(), definitionName);
+ Assert.assertEquals(createdTagDefinition.getDescription(), description);
+
+ // Make sure we can create a tag
+ tagDao.insertTag(objectId, objectType, createdTagDefinition, context);
+
+ // Make sure we can retrieve it via the DAO
+ final Map<String, Tag> foundTags = tagDao.loadEntities(objectId, objectType);
+ Assert.assertEquals(foundTags.keySet().size(), 1);
+ Assert.assertEquals(foundTags.get(definitionName).getTagDefinitionName(), definitionName);
+
+ // Verify we caught an event on the bus - we got 2 total (one for the tag definition, one for the tag)
+ Assert.assertEquals(eventsListener.getEvents().size(), 2);
+ Assert.assertEquals(eventsListener.getTagEvents().size(), 1);
+ final TagEvent tagFirstEventReceived = eventsListener.getTagEvents().get(0);
+ Assert.assertEquals(eventsListener.getEvents().get(1), tagFirstEventReceived);
+ Assert.assertEquals(tagFirstEventReceived.getObjectId(), objectId);
+ Assert.assertEquals(tagFirstEventReceived.getObjectType(), objectType);
+ Assert.assertEquals(tagFirstEventReceived.getTagDefinition().getName(), createdTagDefinition.getName());
+ Assert.assertEquals(tagFirstEventReceived.getTagDefinition().getDescription(), createdTagDefinition.getDescription());
+ Assert.assertEquals(tagFirstEventReceived.getBusEventType(), BusEvent.BusEventType.USER_TAG_CREATION);
+ Assert.assertEquals(tagFirstEventReceived.getUserToken(), context.getUserToken());
+
+ // Delete the tag
+ tagDao.deleteTag(objectId, objectType, createdTagDefinition, context);
+
+ // Make sure the tag is deleted
+ Assert.assertEquals(tagDao.loadEntities(objectId, objectType).keySet().size(), 0);
+
+ // Verify we caught an event on the bus
+ Assert.assertEquals(eventsListener.getEvents().size(), 3);
+ Assert.assertEquals(eventsListener.getTagEvents().size(), 2);
+ final TagEvent tagSecondEventReceived = eventsListener.getTagEvents().get(1);
+ Assert.assertEquals(eventsListener.getEvents().get(2), tagSecondEventReceived);
+ Assert.assertEquals(tagSecondEventReceived.getObjectId(), objectId);
+ Assert.assertEquals(tagSecondEventReceived.getObjectType(), objectType);
+ Assert.assertEquals(tagSecondEventReceived.getTagDefinition().getName(), createdTagDefinition.getName());
+ Assert.assertEquals(tagSecondEventReceived.getTagDefinition().getDescription(), createdTagDefinition.getDescription());
+ Assert.assertEquals(tagSecondEventReceived.getBusEventType(), BusEvent.BusEventType.USER_TAG_DELETION);
+ Assert.assertEquals(tagSecondEventReceived.getUserToken(), context.getUserToken());
+ }
+
+ private static final class EventsListener {
+ private final List<BusEvent> events = new ArrayList<BusEvent>();
+ private final List<TagEvent> tagEvents = new ArrayList<TagEvent>();
+
+ @Subscribe
+ public synchronized void processEvent(final BusEvent event) {
+ events.add(event);
+ }
+
+ @Subscribe
+ public synchronized void processTagDefinitionEvent(final TagEvent tagEvent) {
+ tagEvents.add(tagEvent);
+ }
+
+ public List<BusEvent> getEvents() {
+ return events;
+ }
+
+ public List<TagEvent> getTagEvents() {
+ return tagEvents;
+ }
+ }
+}
diff --git a/util/src/test/java/com/ning/billing/util/tag/dao/TestDefaultTagDefinitionDao.java b/util/src/test/java/com/ning/billing/util/tag/dao/TestDefaultTagDefinitionDao.java
index 3a44325..cc74c55 100644
--- a/util/src/test/java/com/ning/billing/util/tag/dao/TestDefaultTagDefinitionDao.java
+++ b/util/src/test/java/com/ning/billing/util/tag/dao/TestDefaultTagDefinitionDao.java
@@ -112,10 +112,10 @@ public class TestDefaultTagDefinitionDao {
Assert.assertEquals(tagDefinitionFirstEventReceived.getBusEventType(), BusEvent.BusEventType.USER_TAGDEFINITION_CREATION);
Assert.assertEquals(tagDefinitionFirstEventReceived.getUserToken(), context.getUserToken());
- // Delete the tag
+ // Delete the tag definition
tagDefinitionDao.deleteTagDefinition(definitionName, context);
- // Make sure the tag is deleted
+ // Make sure the tag definition is deleted
Assert.assertNull(tagDefinitionDao.getByName(definitionName));
// Verify we caught an event on the bus