killbill-memoizeit

util: add notifications for tag changes This commit deprecates

6/13/2012 1:46:19 AM

Details

diff --git a/util/src/main/java/com/ning/billing/util/tag/api/DefaultTagUserApi.java b/util/src/main/java/com/ning/billing/util/tag/api/DefaultTagUserApi.java
index 1c812b0..075e10e 100644
--- a/util/src/main/java/com/ning/billing/util/tag/api/DefaultTagUserApi.java
+++ b/util/src/main/java/com/ning/billing/util/tag/api/DefaultTagUserApi.java
@@ -64,7 +64,10 @@ public class DefaultTagUserApi implements TagUserApi {
 
     @Override
     public void addTags(final UUID objectId, final ObjectType objectType, final List<TagDefinition> tagDefinitions, final CallContext context) throws TagApiException {
-        tagDao.insertTags(objectId, objectType, tagDefinitions, context);
+        // TODO: consider making this batch
+        for (final TagDefinition tagDefinition : tagDefinitions) {
+            tagDao.insertTag(objectId, objectType, tagDefinition, context);
+        }
     }
 
     @Override
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java
index 6df6ca2..67410ee 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java
@@ -16,7 +16,6 @@
 
 package com.ning.billing.util.tag.dao;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -27,6 +26,8 @@ import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
 import org.skife.jdbi.v2.exceptions.TransactionFailedException;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 import com.ning.billing.ErrorCode;
@@ -42,14 +43,14 @@ import com.ning.billing.util.dao.Mapper;
 import com.ning.billing.util.dao.ObjectType;
 import com.ning.billing.util.dao.TableName;
 import com.ning.billing.util.entity.collection.dao.UpdatableEntityCollectionSqlDao;
-import com.ning.billing.util.tag.ControlTagType;
-import com.ning.billing.util.tag.DefaultControlTag;
-import com.ning.billing.util.tag.DescriptiveTag;
 import com.ning.billing.util.tag.Tag;
 import com.ning.billing.util.tag.TagDefinition;
+import com.ning.billing.util.tag.api.TagEvent;
 import com.ning.billing.util.tag.api.user.TagEventBuilder;
 
 public class AuditedTagDao extends AuditedCollectionDaoBase<Tag, Tag> implements TagDao {
+    private static final Logger log = LoggerFactory.getLogger(AuditedTagDao.class);
+
     private final TagSqlDao tagSqlDao;
     private final TagEventBuilder tagEventBuilder;
     private final Bus bus;
@@ -67,55 +68,53 @@ public class AuditedTagDao extends AuditedCollectionDaoBase<Tag, Tag> implements
     }
 
     @Override
-    public void insertTag(final UUID objectId, final ObjectType objectType,
-                          final TagDefinition tagDefinition, final CallContext context) {
+    public void insertTag(final UUID objectId, final ObjectType objectType, final TagDefinition tagDefinition, final CallContext context) {
         tagSqlDao.inTransaction(new Transaction<Void, TagSqlDao>() {
             @Override
             public Void inTransaction(final TagSqlDao tagSqlDao, final TransactionStatus status) throws Exception {
                 final String tagId = UUID.randomUUID().toString();
                 final String tagName = tagDefinition.getName();
+
+                // Create the tag
                 tagSqlDao.addTagFromTransaction(tagId, tagName, objectId.toString(), objectType, context);
 
                 final Tag tag = tagSqlDao.findTag(tagName, objectId.toString(), objectType);
-                final List<Tag> tagList = new ArrayList<Tag>();
-                tagList.add(tag);
+                final List<Tag> tagList = Arrays.asList(tag);
 
+                // Gather the tag ids for this object id
                 final List<Mapper<UUID, Long>> recordIds = tagSqlDao.getRecordIds(objectId.toString(), objectType);
                 final Map<UUID, Long> recordIdMap = convertToHistoryMap(recordIds);
 
-                final List<EntityHistory<Tag>> entityHistories = new ArrayList<EntityHistory<Tag>>();
-                entityHistories.addAll(convertToHistory(tagList, recordIdMap, ChangeType.INSERT));
-
+                // Update the history table
+                final List<EntityHistory<Tag>> entityHistories = convertToHistory(tagList, recordIdMap, ChangeType.INSERT);
                 final Long maxHistoryRecordId = tagSqlDao.getMaxHistoryRecordId();
                 tagSqlDao.addHistoryFromTransaction(objectId.toString(), objectType, entityHistories, context);
 
-                // have to fetch history record ids to update audit log
+                // Have to fetch the history record ids to update the audit log
                 final List<Mapper<Long, Long>> historyRecordIds = tagSqlDao.getHistoryRecordIds(maxHistoryRecordId);
                 final Map<Long, Long> historyRecordIdMap = convertToAuditMap(historyRecordIds);
                 final List<EntityAudit> entityAudits = convertToAudits(entityHistories, historyRecordIdMap);
                 tagSqlDao.insertAuditFromTransaction(entityAudits, context);
 
+                // Post an event to the Bus
+                final TagEvent tagEvent;
+                if (tagDefinition.isControlTag()) {
+                    tagEvent = tagEventBuilder.newControlTagCreationEvent(tag.getId(), objectId, objectType, tagDefinition, context.getUserToken());
+                } else {
+                    tagEvent = tagEventBuilder.newUserTagCreationEvent(tag.getId(), objectId, objectType, tagDefinition, context.getUserToken());
+                }
+                try {
+                    bus.postFromTransaction(tagEvent, AuditedTagDao.this.tagSqlDao);
+                } catch (Bus.EventBusException e) {
+                    log.warn("Failed to post tag creation event for tag " + tag.getId().toString(), e);
+                }
+
                 return null;
             }
         });
     }
 
     @Override
-    public void insertTags(final UUID objectId, final ObjectType objectType, final List<TagDefinition> tagDefinitions, final CallContext context) {
-        final List<Tag> tags = new ArrayList<Tag>();
-        for (final TagDefinition tagDefinition : tagDefinitions) {
-            if (tagDefinition.isControlTag()) {
-                final ControlTagType controlTagType = ControlTagType.valueOf(tagDefinition.getName());
-                tags.add(new DefaultControlTag(controlTagType));
-            } else {
-                tags.add(new DescriptiveTag(tagDefinition));
-            }
-        }
-
-        saveEntities(objectId, objectType, tags, context);
-    }
-
-    @Override
     public void deleteTag(final UUID objectId, final ObjectType objectType, final TagDefinition tagDefinition, final CallContext context) throws TagApiException {
         try {
             tagSqlDao.inTransaction(new Transaction<Void, TagSqlDao>() {
@@ -130,23 +129,36 @@ public class AuditedTagDao extends AuditedCollectionDaoBase<Tag, Tag> implements
 
                     final List<Tag> tagList = Arrays.asList(tag);
 
+                    // Before the deletion, gather the tag ids for this object id
                     final List<Mapper<UUID, Long>> recordIds = tagSqlDao.getRecordIds(objectId.toString(), objectType);
                     final Map<UUID, Long> recordIdMap = convertToHistoryMap(recordIds);
 
+                    // Delete the tag
                     tagSqlDao.deleteFromTransaction(objectId.toString(), objectType, tagList, context);
 
-                    final List<EntityHistory<Tag>> entityHistories = new ArrayList<EntityHistory<Tag>>();
-                    entityHistories.addAll(convertToHistory(tagList, recordIdMap, ChangeType.DELETE));
-
+                    // Update the history table
+                    final List<EntityHistory<Tag>> entityHistories = convertToHistory(tagList, recordIdMap, ChangeType.DELETE);
                     final Long maxHistoryRecordId = tagSqlDao.getMaxHistoryRecordId();
                     tagSqlDao.addHistoryFromTransaction(objectId.toString(), objectType, entityHistories, context);
 
-                    // have to fetch history record ids to update audit log
+                    // Have to fetch the history record ids to update the audit log
                     final List<Mapper<Long, Long>> historyRecordIds = tagSqlDao.getHistoryRecordIds(maxHistoryRecordId);
                     final Map<Long, Long> historyRecordIdMap = convertToAuditMap(historyRecordIds);
                     final List<EntityAudit> entityAudits = convertToAudits(entityHistories, historyRecordIdMap);
                     tagSqlDao.insertAuditFromTransaction(entityAudits, context);
 
+                    // Post an event to the Bus
+                    final TagEvent tagEvent;
+                    if (tagDefinition.isControlTag()) {
+                        tagEvent = tagEventBuilder.newControlTagDeletionEvent(tag.getId(), objectId, objectType, tagDefinition, context.getUserToken());
+                    } else {
+                        tagEvent = tagEventBuilder.newUserTagDeletionEvent(tag.getId(), objectId, objectType, tagDefinition, context.getUserToken());
+                    }
+                    try {
+                        bus.postFromTransaction(tagEvent, tagSqlDao);
+                    } catch (Bus.EventBusException e) {
+                        log.warn("Failed to post tag deletion event for tag " + tag.getId().toString(), e);
+                    }
                     return null;
                 }
             });
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/TagDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/TagDao.java
index 7d0105c..3ffccf2 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/TagDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/TagDao.java
@@ -29,7 +29,5 @@ import com.ning.billing.util.tag.TagDefinition;
 public interface TagDao extends AuditedCollectionDao<Tag> {
     void insertTag(UUID objectId, ObjectType objectType, TagDefinition tagDefinition, CallContext context) throws TagApiException;
 
-    void insertTags(UUID objectId, ObjectType objectType, List<TagDefinition> tagDefinitions, CallContext context) throws TagApiException;
-
     void deleteTag(UUID objectId, ObjectType objectType, TagDefinition tagDefinition, CallContext context) throws TagApiException;
 }
diff --git a/util/src/test/java/com/ning/billing/util/tag/dao/MockTagDao.java b/util/src/test/java/com/ning/billing/util/tag/dao/MockTagDao.java
index 09e757d..a09a952 100644
--- a/util/src/test/java/com/ning/billing/util/tag/dao/MockTagDao.java
+++ b/util/src/test/java/com/ning/billing/util/tag/dao/MockTagDao.java
@@ -90,14 +90,6 @@ public class MockTagDao implements TagDao {
     }
 
     @Override
-    public void insertTags(final UUID objectId, final ObjectType objectType,
-                           final List<TagDefinition> tagDefinitions, final CallContext context) {
-        for (final TagDefinition tagDefinition : tagDefinitions) {
-            insertTag(objectId, objectType, tagDefinition, context);
-        }
-    }
-
-    @Override
     public void deleteTag(final UUID objectId, final ObjectType objectType,
                           final TagDefinition tagDefinition, final CallContext context) {
         final List<Tag> tags = tagStore.get(objectId);
diff --git a/util/src/test/java/com/ning/billing/util/tag/dao/TestAuditedTagDao.java b/util/src/test/java/com/ning/billing/util/tag/dao/TestAuditedTagDao.java
new file mode 100644
index 0000000..e986950
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/tag/dao/TestAuditedTagDao.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.tag.dao;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.ning.billing.dbi.MysqlTestingHelper;
+import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.bus.BusEvent;
+import com.ning.billing.util.callcontext.CallContext;
+import com.ning.billing.util.callcontext.CallOrigin;
+import com.ning.billing.util.callcontext.DefaultCallContextFactory;
+import com.ning.billing.util.callcontext.UserType;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.dao.ObjectType;
+import com.ning.billing.util.tag.MockTagStoreModuleSql;
+import com.ning.billing.util.tag.Tag;
+import com.ning.billing.util.tag.TagDefinition;
+import com.ning.billing.util.tag.TestTagStore;
+import com.ning.billing.util.tag.api.TagEvent;
+
+@Guice(modules = MockTagStoreModuleSql.class)
+public class TestAuditedTagDao {
+    @Inject
+    private MysqlTestingHelper helper;
+
+    @Inject
+    private TagDefinitionDao tagDefinitionDao;
+
+    @Inject
+    private AuditedTagDao tagDao;
+
+    @Inject
+    private Clock clock;
+
+    @Inject
+    private Bus bus;
+
+    private CallContext context;
+    private EventsListener eventsListener;
+
+    @BeforeClass(groups = "slow")
+    public void setup() throws IOException {
+        final String utilDdl = IOUtils.toString(TestTagStore.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
+
+        helper.startMysql();
+        helper.initDb(utilDdl);
+
+        context = new DefaultCallContextFactory(clock).createCallContext("Tag DAO test", CallOrigin.TEST, UserType.TEST, UUID.randomUUID());
+        bus.start();
+    }
+
+    @BeforeMethod(groups = "slow")
+    public void cleanup() throws Bus.EventBusException {
+        eventsListener = new EventsListener();
+        bus.register(eventsListener);
+    }
+
+    @AfterClass(groups = "slow")
+    public void stopMysql() {
+        bus.stop();
+        helper.stopMysql();
+    }
+
+    @Test(groups = "slow")
+    public void testCatchEventsOnCreateAndDelete() throws Exception {
+        final String definitionName = UUID.randomUUID().toString().substring(0, 5);
+        final String description = UUID.randomUUID().toString().substring(0, 5);
+        final UUID objectId = UUID.randomUUID();
+        final ObjectType objectType = ObjectType.RECURRING_INVOICE_ITEM;
+
+        // Verify the initial state
+        Assert.assertEquals(eventsListener.getEvents().size(), 0);
+        Assert.assertEquals(eventsListener.getTagEvents().size(), 0);
+
+        // Create a tag definition
+        final TagDefinition createdTagDefinition = tagDefinitionDao.create(definitionName, description, context);
+        Assert.assertEquals(createdTagDefinition.getName(), definitionName);
+        Assert.assertEquals(createdTagDefinition.getDescription(), description);
+
+        // Make sure we can create a tag
+        tagDao.insertTag(objectId, objectType, createdTagDefinition, context);
+
+        // Make sure we can retrieve it via the DAO
+        final Map<String, Tag> foundTags = tagDao.loadEntities(objectId, objectType);
+        Assert.assertEquals(foundTags.keySet().size(), 1);
+        Assert.assertEquals(foundTags.get(definitionName).getTagDefinitionName(), definitionName);
+
+        // Verify we caught an event on the bus -  we got 2 total (one for the tag definition, one for the tag)
+        Assert.assertEquals(eventsListener.getEvents().size(), 2);
+        Assert.assertEquals(eventsListener.getTagEvents().size(), 1);
+        final TagEvent tagFirstEventReceived = eventsListener.getTagEvents().get(0);
+        Assert.assertEquals(eventsListener.getEvents().get(1), tagFirstEventReceived);
+        Assert.assertEquals(tagFirstEventReceived.getObjectId(), objectId);
+        Assert.assertEquals(tagFirstEventReceived.getObjectType(), objectType);
+        Assert.assertEquals(tagFirstEventReceived.getTagDefinition().getName(), createdTagDefinition.getName());
+        Assert.assertEquals(tagFirstEventReceived.getTagDefinition().getDescription(), createdTagDefinition.getDescription());
+        Assert.assertEquals(tagFirstEventReceived.getBusEventType(), BusEvent.BusEventType.USER_TAG_CREATION);
+        Assert.assertEquals(tagFirstEventReceived.getUserToken(), context.getUserToken());
+
+        // Delete the tag
+        tagDao.deleteTag(objectId, objectType, createdTagDefinition, context);
+
+        // Make sure the tag is deleted
+        Assert.assertEquals(tagDao.loadEntities(objectId, objectType).keySet().size(), 0);
+
+        // Verify we caught an event on the bus
+        Assert.assertEquals(eventsListener.getEvents().size(), 3);
+        Assert.assertEquals(eventsListener.getTagEvents().size(), 2);
+        final TagEvent tagSecondEventReceived = eventsListener.getTagEvents().get(1);
+        Assert.assertEquals(eventsListener.getEvents().get(2), tagSecondEventReceived);
+        Assert.assertEquals(tagSecondEventReceived.getObjectId(), objectId);
+        Assert.assertEquals(tagSecondEventReceived.getObjectType(), objectType);
+        Assert.assertEquals(tagSecondEventReceived.getTagDefinition().getName(), createdTagDefinition.getName());
+        Assert.assertEquals(tagSecondEventReceived.getTagDefinition().getDescription(), createdTagDefinition.getDescription());
+        Assert.assertEquals(tagSecondEventReceived.getBusEventType(), BusEvent.BusEventType.USER_TAG_DELETION);
+        Assert.assertEquals(tagSecondEventReceived.getUserToken(), context.getUserToken());
+    }
+
+    private static final class EventsListener {
+        private final List<BusEvent> events = new ArrayList<BusEvent>();
+        private final List<TagEvent> tagEvents = new ArrayList<TagEvent>();
+
+        @Subscribe
+        public synchronized void processEvent(final BusEvent event) {
+            events.add(event);
+        }
+
+        @Subscribe
+        public synchronized void processTagDefinitionEvent(final TagEvent tagEvent) {
+            tagEvents.add(tagEvent);
+        }
+
+        public List<BusEvent> getEvents() {
+            return events;
+        }
+
+        public List<TagEvent> getTagEvents() {
+            return tagEvents;
+        }
+    }
+}
diff --git a/util/src/test/java/com/ning/billing/util/tag/dao/TestDefaultTagDefinitionDao.java b/util/src/test/java/com/ning/billing/util/tag/dao/TestDefaultTagDefinitionDao.java
index 3a44325..cc74c55 100644
--- a/util/src/test/java/com/ning/billing/util/tag/dao/TestDefaultTagDefinitionDao.java
+++ b/util/src/test/java/com/ning/billing/util/tag/dao/TestDefaultTagDefinitionDao.java
@@ -112,10 +112,10 @@ public class TestDefaultTagDefinitionDao {
         Assert.assertEquals(tagDefinitionFirstEventReceived.getBusEventType(), BusEvent.BusEventType.USER_TAGDEFINITION_CREATION);
         Assert.assertEquals(tagDefinitionFirstEventReceived.getUserToken(), context.getUserToken());
 
-        // Delete the tag
+        // Delete the tag definition
         tagDefinitionDao.deleteTagDefinition(definitionName, context);
 
-        // Make sure the tag is deleted
+        // Make sure the tag definition is deleted
         Assert.assertNull(tagDefinitionDao.getByName(definitionName));
 
         // Verify we caught an event on the bus