EntitlementDaoMemoryMock.java

334 lines | 11.076 kB Blame History Raw Download
/*
 * Copyright 2010-2011 Ning, Inc.
 *
 * Ning licenses this file to you under the Apache License, version 2.0
 * (the "License"); you may not use this file except in compliance with the
 * License.  You may obtain a copy of the License at:
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package com.ning.billing.entitlement.engine.dao;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeSet;
import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.inject.Inject;
import com.ning.billing.catalog.api.ProductCategory;
import com.ning.billing.catalog.api.TimeUnit;
import com.ning.billing.entitlement.api.user.ISubscription;
import com.ning.billing.entitlement.api.user.ISubscriptionBundle;
import com.ning.billing.entitlement.api.user.Subscription;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.entitlement.events.IEvent;
import com.ning.billing.entitlement.events.IEvent.EventType;
import com.ning.billing.entitlement.events.IEventLyfecycle.IEventLyfecycleState;
import com.ning.billing.entitlement.events.phase.IPhaseEvent;
import com.ning.billing.entitlement.events.phase.PhaseEvent;
import com.ning.billing.entitlement.events.user.ApiEventType;
import com.ning.billing.entitlement.events.user.IUserEvent;
import com.ning.billing.entitlement.glue.IEngineConfig;
import com.ning.billing.util.clock.IClock;

public class EntitlementDaoMemoryMock implements IEntitlementDao, IEntitlementDaoMock {

    protected final static Logger log = LoggerFactory.getLogger(IEntitlementDao.class);

    private final List<ISubscriptionBundle> bundles;
    private final List<ISubscription> subscriptions;
    private final TreeSet<IEvent> events;
    private final IClock clock;
    private final IEngineConfig config;

    @Inject
    public EntitlementDaoMemoryMock(IClock clock, IEngineConfig config) {
        super();
        this.clock = clock;
        this.config = config;
        this.bundles = new ArrayList<ISubscriptionBundle>();
        this.subscriptions = new ArrayList<ISubscription>();
        this.events = new TreeSet<IEvent>();
    }

    @Override
    public void reset() {
        bundles.clear();
        subscriptions.clear();
        events.clear();
    }

    @Override
    public List<ISubscriptionBundle> getSubscriptionBundleForAccount(UUID accountId) {
        List<ISubscriptionBundle> results = new ArrayList<ISubscriptionBundle>();
        for (ISubscriptionBundle cur : bundles) {
            if (cur.getAccountId().equals(accountId)) {
                results.add(cur);
            }
        }
        return results;
    }

    @Override
    public ISubscriptionBundle getSubscriptionBundleFromId(UUID bundleId) {
        for (ISubscriptionBundle cur : bundles) {
            if (cur.getId().equals(bundleId)) {
                return cur;
            }
        }
        return null;
    }

    @Override
    public ISubscriptionBundle createSubscriptionBundle(SubscriptionBundle bundle) {
        bundles.add(bundle);
        return getSubscriptionBundleFromId(bundle.getId());
    }

    @Override
    public ISubscription getSubscriptionFromId(UUID subscriptionId) {
        for (ISubscription cur : subscriptions) {
            if (cur.getId().equals(subscriptionId)) {
                return buildSubscription((Subscription) cur);
            }
        }
        return null;
    }

    @Override
    public ISubscription createSubscription(Subscription subscription, List<IEvent> initalEvents) {

        synchronized(events) {
            events.addAll(initalEvents);
        }
        ISubscription updatedSubscription = buildSubscription(subscription);
        subscriptions.add(updatedSubscription);
        return updatedSubscription;
    }

    @Override
    public List<ISubscription> getSubscriptions(UUID bundleId) {

        List<ISubscription> results = new ArrayList<ISubscription>();
        for (ISubscription cur : subscriptions) {
            if (cur.getBundleId().equals(bundleId)) {
                results.add(buildSubscription((Subscription) cur));
            }
        }
        return results;
    }

    @Override
    public List<IEvent> getEventsForSubscription(UUID subscriptionId) {
        synchronized(events) {
            List<IEvent> results = new LinkedList<IEvent>();
            for (IEvent cur : events) {
                if (cur.getSubscriptionId().equals(subscriptionId)) {
                    results.add(cur);
                }
            }
            return results;
        }
    }

    @Override
    public List<IEvent> getPendingEventsForSubscription(UUID subscriptionId) {
        synchronized(events) {
            List<IEvent> results = new LinkedList<IEvent>();
            for (IEvent cur : events) {
                if (cur.isActive() &&
                        cur.getProcessingState() == IEventLyfecycleState.AVAILABLE &&
                            cur.getSubscriptionId().equals(subscriptionId)) {
                    results.add(cur);
                }
            }
            return results;
        }
    }


    @Override
    public ISubscription getBaseSubscription(UUID bundleId) {
        for (ISubscription cur : subscriptions) {
            if (cur.getBundleId().equals(bundleId) &&
                    cur.getCurrentPlan().getProduct().getCategory() == ProductCategory.BASE) {
                return buildSubscription((Subscription) cur);
            }
        }
        return null;
    }

    @Override
    public void createNextPhaseEvent(UUID subscriptionId, IEvent nextPhase) {
        cancelNextPhaseEvent(subscriptionId);
        insertEvent(nextPhase);
    }


    @Override
    public List<IEvent> getEventsReady(UUID ownerId, int sequenceId) {
        synchronized(events) {
            List<IEvent> readyList = new LinkedList<IEvent>();
            for (IEvent cur : events) {
                if (cur.isAvailableForProcessing(clock.getUTCNow())) {

                    if (cur.getOwner() != null) {
                        log.warn(String.format("EventProcessor %s stealing event %s from %s", ownerId, cur, cur.getOwner()));
                    }
                    cur.setOwner(ownerId);
                    cur.setNextAvailableDate(clock.getUTCNow().plus(config.getDaoClaimTimeMs()));
                    cur.setProcessingState(IEventLyfecycleState.IN_PROCESSING);
                    readyList.add(cur);
                }
            }
            Collections.sort(readyList);
            return readyList;
        }
    }

    @Override
    public void clearEventsReady(UUID ownerId, List<IEvent> cleared) {
        synchronized(events) {
            for (IEvent cur : cleared) {
                if (cur.getOwner().equals(ownerId)) {
                    cur.setProcessingState(IEventLyfecycleState.PROCESSED);
                } else {
                    log.warn(String.format("EventProcessor %s trying to clear event %s that it does not own", ownerId, cur));
                }
            }
        }
    }

    private ISubscription buildSubscription(Subscription in) {
        return new Subscription(in.getId(), in.getBundleId(), in.getCategory(), in.getBundleStartDate(),
                in.getStartDate(), in.getChargedThroughDate(), in.getPaidThroughDate(), in.getActiveVersion());
    }

    @Override
    public void updateSubscription(Subscription subscription) {

        boolean found = false;
        Iterator<ISubscription> it = subscriptions.iterator();
        while (it.hasNext()) {
            ISubscription cur = it.next();
            if (cur.getId().equals(subscription.getId())) {
                found = true;
                it.remove();
                break;
            }
        }
        if (found) {
            subscriptions.add(subscription);
        }
    }

    @Override
    public void cancelSubscription(UUID subscriptionId, IEvent cancelEvent) {
        synchronized (cancelEvent) {
            cancelNextPhaseEvent(subscriptionId);
            insertEvent(cancelEvent);
        }
    }

    @Override
    public void changePlan(UUID subscriptionId, List<IEvent> changeEvents) {
        synchronized(events) {
            cancelNextChangeEvent(subscriptionId);
            cancelNextPhaseEvent(subscriptionId);
            events.addAll(changeEvents);
        }
    }

    private void insertEvent(IEvent event) {
        synchronized(events) {
            events.add(event);
        }
    }

    private void cancelNextPhaseEvent(UUID subscriptionId) {

        ISubscription curSubscription = getSubscriptionFromId(subscriptionId);
        if (curSubscription.getCurrentPhase() == null ||
                curSubscription.getCurrentPhase().getDuration().getUnit() == TimeUnit.UNLIMITED) {
            return;
        }

        synchronized(events) {

            Iterator<IEvent> it = events.descendingIterator();
            while (it.hasNext()) {
                IEvent cur = it.next();
                if (cur.getSubscriptionId() != subscriptionId) {
                    continue;
                }
                if (cur.getType() == EventType.PHASE &&
                        cur.getProcessingState() == IEventLyfecycleState.AVAILABLE) {
                    cur.deactivate();
                    break;
                }
            }
        }
    }


    private void cancelNextChangeEvent(UUID subscriptionId) {

        synchronized(events) {

            Iterator<IEvent> it = events.descendingIterator();
            while (it.hasNext()) {
                IEvent cur = it.next();
                if (cur.getSubscriptionId() != subscriptionId) {
                    continue;
                }
                if (cur.getType() == EventType.API_USER &&
                        ApiEventType.CHANGE == ((IUserEvent) cur).getEventType() &&
                        cur.getProcessingState() == IEventLyfecycleState.AVAILABLE) {
                    cur.deactivate();
                    break;
                }
            }
        }
    }

    @Override
    public void uncancelSubscription(UUID subscriptionId, List<IEvent> uncancelEvents) {

        synchronized (events) {
            boolean foundCancel = false;
            Iterator<IEvent> it = events.descendingIterator();
            while (it.hasNext()) {
                IEvent cur = it.next();
                if (cur.getSubscriptionId() != subscriptionId) {
                    continue;
                }
                if (cur.getType() == EventType.API_USER &&
                        ((IUserEvent) cur).getEventType() == ApiEventType.CANCEL) {
                    cur.deactivate();
                    foundCancel = true;
                    break;
                }
            }
            if (foundCancel) {
                for (IEvent cur : uncancelEvents) {
                    insertEvent(cur);
                }
            }
        }
    }
}