killbill-aplcache

Merge remote-tracking branch 'origin/work-for-release-0.16.4'

4/14/2016 5:33:58 PM

Changes

account/pom.xml 2(+1 -1)

api/pom.xml 2(+1 -1)

beatrix/pom.xml 2(+1 -1)

bin/db-helper 28(+26 -2)

catalog/pom.xml 2(+1 -1)

currency/pom.xml 2(+1 -1)

invoice/pom.xml 2(+1 -1)

jaxrs/pom.xml 2(+1 -1)

junction/pom.xml 2(+1 -1)

NEWS 3(+3 -0)

overdue/pom.xml 2(+1 -1)

payment/pom.xml 2(+1 -1)

pom.xml 2(+1 -1)

profiles/pom.xml 2(+1 -1)

tenant/pom.xml 2(+1 -1)

usage/pom.xml 2(+1 -1)

util/pom.xml 32(+31 -1)

Details

account/pom.xml 2(+1 -1)

diff --git a/account/pom.xml b/account/pom.xml
index a90876a..8dde6bf 100644
--- a/account/pom.xml
+++ b/account/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-account</artifactId>

api/pom.xml 2(+1 -1)

diff --git a/api/pom.xml b/api/pom.xml
index 3dc5a1e..2c630a3 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-internal-api</artifactId>

beatrix/pom.xml 2(+1 -1)

diff --git a/beatrix/pom.xml b/beatrix/pom.xml
index 3b3064f..ccb6601 100644
--- a/beatrix/pom.xml
+++ b/beatrix/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-beatrix</artifactId>

bin/db-helper 28(+26 -2)

diff --git a/bin/db-helper b/bin/db-helper
index c642b1c..7b9c051 100755
--- a/bin/db-helper
+++ b/bin/db-helper
@@ -60,7 +60,7 @@ eval set -- "${ARGS}"
 
 function usage() {
     echo -n "./db_helper"
-    echo -n " -a|--action <create|clean|dump>"
+    echo -n " -a|--action <create|clean|dump|migrate|dryRunMigrate|repair|info>"
     echo -n " --driver <mysql|postgres> (default = mysql)"
     echo -n " -h|--host host (default = localhost)"
     echo -n " --port port"
@@ -159,6 +159,20 @@ function cleanup() {
     rm -f "/tmp/*.$$"
 }
 
+function flyway() {
+    flyway_bin=util/target/killbill-flyway.jar
+    if [ ! -f "$flyway_bin" ]; then
+        echo "File $flyway_bin does not exists - build util first"
+        usage
+    fi
+
+    locations=
+    for migration_dir in `find */src/main/resources -type d -name migration`; do
+        locations="${locations}filesystem:$migration_dir,"
+    done
+
+    java -jar $flyway_bin -locations=$locations -user=$USER -password=$PWD -url=$URL ${@}
+}
 
 while true; do
   case "$1" in
@@ -178,7 +192,7 @@ done
 
 
 if [ -z $ACTION ]; then
-    echo "Need to specify an action <CREATE|CLEAN|DUMP>"
+    echo "Need to specify an action"
     usage
 fi
 
@@ -195,6 +209,12 @@ fi
 if [ $DRIVER == "postgres" ] && [ -z $PORT ]; then
     PORT=$PORT_POSTGRES
 fi
+if [ $DRIVER == "mysql" ] && [ -z $URL ]; then
+    URL=jdbc:mysql://$HOST:$PORT/$DATABASE
+fi
+if [ $DRIVER == "postgres" ] && [ -z $URL ]; then
+    URL=jdbc:postgresql://$HOST:$PORT/$DATABASE
+fi
 
 
 if [ $ACTION == "dump" ]; then
@@ -233,4 +253,8 @@ if [ $ACTION == "clean" ]; then
     fi
 fi
 
+if [ $ACTION == "migrate" ] || [ $ACTION == "dryRunMigrate" ] || [ $ACTION == "repair" ] || [ $ACTION == "info" ]; then
+    flyway $ACTION
+fi
+
 cleanup

catalog/pom.xml 2(+1 -1)

diff --git a/catalog/pom.xml b/catalog/pom.xml
index 92e9692..0dde810 100644
--- a/catalog/pom.xml
+++ b/catalog/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-catalog</artifactId>

currency/pom.xml 2(+1 -1)

diff --git a/currency/pom.xml b/currency/pom.xml
index 748b65e..b06884f 100644
--- a/currency/pom.xml
+++ b/currency/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-currency</artifactId>
diff --git a/entitlement/pom.xml b/entitlement/pom.xml
index 0537b95..de60333 100644
--- a/entitlement/pom.xml
+++ b/entitlement/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-entitlement</artifactId>

invoice/pom.xml 2(+1 -1)

diff --git a/invoice/pom.xml b/invoice/pom.xml
index cd544ac..858c44b 100644
--- a/invoice/pom.xml
+++ b/invoice/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-invoice</artifactId>

jaxrs/pom.xml 2(+1 -1)

diff --git a/jaxrs/pom.xml b/jaxrs/pom.xml
index 1b7d890..512b5d2 100644
--- a/jaxrs/pom.xml
+++ b/jaxrs/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-jaxrs</artifactId>
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AccountResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AccountResource.java
index 9ccb3f7..a2f5910 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AccountResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AccountResource.java
@@ -96,6 +96,7 @@ import org.killbill.billing.payment.api.PaymentApi;
 import org.killbill.billing.payment.api.PaymentApiException;
 import org.killbill.billing.payment.api.PaymentMethod;
 import org.killbill.billing.payment.api.PaymentOptions;
+import org.killbill.billing.payment.api.PaymentTransaction;
 import org.killbill.billing.payment.api.PluginProperty;
 import org.killbill.billing.payment.api.TransactionType;
 import org.killbill.billing.util.UUIDs;
@@ -119,6 +120,7 @@ import org.killbill.commons.metrics.MetricTag;
 import org.killbill.commons.metrics.TimedResource;
 
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
@@ -917,12 +919,28 @@ public class AccountResource extends JaxRsResourceBase {
                              json.getAmount(), "PaymentTransactionJson amount needs to be set");
 
         final Iterable<PluginProperty> pluginProperties = extractPluginProperties(pluginPropertiesString);
-        final UUID paymentMethodId = paymentMethodIdStr == null ? account.getPaymentMethodId() : UUID.fromString(paymentMethodIdStr);
         final Currency currency = json.getCurrency() == null ? account.getCurrency() : Currency.valueOf(json.getCurrency());
         final UUID paymentId = json.getPaymentId() == null ? null : UUID.fromString(json.getPaymentId());
 
+        //
+        // If paymentId was specified, it means we are attempting a payment completion. The preferred way is to use the PaymentResource
+        // (PUT /1.0/kb/payments/{paymentId}/completeTransaction), but for backward compatibility we still allow the call to proceed
+        // as long as the request/existing state is healthy (i.e there is a matching PENDING transaction)
+        //
+        final UUID paymentMethodId;
+        if (paymentId != null) {
+            final Payment initialPayment = paymentApi.getPayment(paymentId, false, pluginProperties, callContext);
+            final PaymentTransaction pendingTransaction = lookupPendingTransaction(initialPayment,
+                                                                                   json != null ? json.getTransactionId() : null,
+                                                                                   json != null ? json.getTransactionExternalKey() : null,
+                                                                                   json != null ? json.getTransactionType() : null);
+            paymentMethodId = initialPayment.getPaymentMethodId();
+        } else {
+            paymentMethodId = paymentMethodIdStr == null ? account.getPaymentMethodId() : UUID.fromString(paymentMethodIdStr);
+        }
         validatePaymentMethodForAccount(account.getId(), paymentMethodId, callContext);
 
+
         final TransactionType transactionType = TransactionType.valueOf(json.getTransactionType());
         final PaymentOptions paymentOptions = createControlPluginApiPaymentOptions(paymentControlPluginNames);
         final Payment result;
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxRsResourceBase.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxRsResourceBase.java
index 0a5c9d2..6ed0e1b 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxRsResourceBase.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxRsResourceBase.java
@@ -40,6 +40,7 @@ import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.core.UriInfo;
 
+import org.joda.time.DateTime;
 import org.joda.time.LocalDate;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
@@ -71,6 +72,7 @@ import org.killbill.billing.payment.api.PaymentMethod;
 import org.killbill.billing.payment.api.PaymentOptions;
 import org.killbill.billing.payment.api.PaymentTransaction;
 import org.killbill.billing.payment.api.PluginProperty;
+import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.api.TransactionType;
 import org.killbill.billing.util.UUIDs;
 import org.killbill.billing.util.api.AuditUserApi;
@@ -335,9 +337,54 @@ public abstract class JaxRsResourceBase implements JaxrsResource {
         }
     }
 
-    protected LocalDate toLocalDateDefaultToday(final UUID accountId, @Nullable final String inputDate, final TenantContext context) throws AccountApiException {
-        final Account account = accountId != null ? accountUserApi.getAccountById(accountId, context) : null;
-        return toLocalDateDefaultToday(account, inputDate, context);
+    protected PaymentTransaction lookupPendingTransaction(final Payment initialPayment, @Nullable final String transactionId, @Nullable final String transactionExternalKey, @Nullable final String transactionType) throws PaymentApiException {
+        final Collection<PaymentTransaction> pendingTransaction  =  Collections2.filter(initialPayment.getTransactions(), new Predicate<PaymentTransaction>() {
+            @Override
+            public boolean apply(final PaymentTransaction input) {
+                if (input.getTransactionStatus() != TransactionStatus.PENDING) {
+                    return false;
+                }
+                if (transactionId != null && !transactionId.equals(input.getId().toString())) {
+                    return false;
+                }
+                if (transactionExternalKey != null && !transactionExternalKey.equals(input.getExternalKey())) {
+                    return false;
+                }
+                if (transactionType != null && !transactionType.equals(input.getTransactionType().name())) {
+                    return false;
+                }
+                //
+                // If we were given a transactionId or a transactionExternalKey or a transactionType we checked there was a match;
+                // In the worst case, if we were given nothing, we return the PENDING transaction for that payment
+                //
+                return true;
+            }
+        });
+        switch (pendingTransaction.size()) {
+            // Nothing: invalid input...
+            case 0:
+                final String parameterType;
+                final String parameterValue;
+                if (transactionId != null) {
+                    parameterType = "transactionId";
+                    parameterValue = transactionId;
+                } else if (transactionExternalKey != null) {
+                    parameterType = "transactionExternalKey";
+                    parameterValue = transactionExternalKey;
+                } else if (transactionType != null) {
+                    parameterType = "transactionType";
+                    parameterValue = transactionType;
+                } else {
+                    parameterType = "paymentId";
+                    parameterValue = initialPayment.getId().toString();
+                }
+                throw new PaymentApiException(ErrorCode.PAYMENT_INVALID_PARAMETER, parameterType, parameterValue);
+            case 1:
+                return pendingTransaction.iterator().next();
+            default:
+                throw new PaymentApiException(ErrorCode.PAYMENT_INTERNAL_ERROR, String.format("Illegal payment state: Found multiple PENDING payment transactions for paymentId='%s'", initialPayment.getId()));
+
+        }
     }
 
     protected LocalDate toLocalDateDefaultToday(final Account account, @Nullable final String inputDate, final TenantContext context) {
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/PaymentResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/PaymentResource.java
index 0525429..0978be1 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/PaymentResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/PaymentResource.java
@@ -19,7 +19,6 @@ package org.killbill.billing.jaxrs.resources;
 
 import java.math.BigDecimal;
 import java.net.URI;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -73,9 +72,7 @@ import org.killbill.commons.metrics.MetricTag;
 import org.killbill.commons.metrics.TimedResource;
 
 import com.google.common.base.Function;
-import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
-import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableMap;
 import com.wordnik.swagger.annotations.Api;
 import com.wordnik.swagger.annotations.ApiOperation;
@@ -266,6 +263,11 @@ public class PaymentResource extends ComboPaymentResource {
         return completeTransactionInternal(json, null, paymentControlPluginNames, pluginPropertiesString, createdBy, reason, comment, uriInfo, request);
     }
 
+
+
+
+
+
     private Response completeTransactionInternal(final PaymentTransactionJson json,
                                                  @Nullable final String paymentIdStr,
                                                  final List<String> paymentControlPluginNames,
@@ -275,6 +277,7 @@ public class PaymentResource extends ComboPaymentResource {
                                                  final String comment,
                                                  final UriInfo uriInfo,
                                                  final HttpServletRequest request) throws PaymentApiException, AccountApiException {
+
         final Iterable<PluginProperty> pluginProperties = extractPluginProperties(pluginPropertiesString);
         final CallContext callContext = context.createContext(createdBy, reason, comment, request);
         final Payment initialPayment = getPaymentByIdOrKey(paymentIdStr, json == null ? null : json.getPaymentExternalKey(), pluginProperties, callContext);
@@ -283,96 +286,38 @@ public class PaymentResource extends ComboPaymentResource {
         final BigDecimal amount = json == null ? null : json.getAmount();
         final Currency currency = json == null || json.getCurrency() == null ? null : Currency.valueOf(json.getCurrency());
 
-        final TransactionType transactionType;
-        final String transactionExternalKey;
-        if (json != null && json.getTransactionId() != null) {
-            final Collection<PaymentTransaction> paymentTransactionCandidates = Collections2.<PaymentTransaction>filter(initialPayment.getTransactions(),
-                                                                                                                        new Predicate<PaymentTransaction>() {
-                                                                                                                            @Override
-                                                                                                                            public boolean apply(final PaymentTransaction input) {
-                                                                                                                                return input.getId().toString().equals(json.getTransactionId());
-                                                                                                                            }
-                                                                                                                        });
-            if (paymentTransactionCandidates.size() == 1) {
-                final PaymentTransaction paymentTransaction = paymentTransactionCandidates.iterator().next();
-                transactionType = paymentTransaction.getTransactionType();
-                transactionExternalKey = paymentTransaction.getExternalKey();
-            } else {
-                return Response.status(Status.NOT_FOUND).build();
-            }
-        } else if (json != null && json.getTransactionExternalKey() != null && json.getTransactionType() != null) {
-            transactionType = TransactionType.valueOf(json.getTransactionType());
-            transactionExternalKey = json.getTransactionExternalKey();
-        } else if (json != null && json.getTransactionExternalKey() != null) {
-            final Collection<PaymentTransaction> paymentTransactionCandidates = Collections2.<PaymentTransaction>filter(initialPayment.getTransactions(),
-                                                                                                                        new Predicate<PaymentTransaction>() {
-                                                                                                                            @Override
-                                                                                                                            public boolean apply(final PaymentTransaction input) {
-                                                                                                                                return input.getExternalKey().equals(json.getTransactionExternalKey());
-                                                                                                                            }
-                                                                                                                        });
-            if (paymentTransactionCandidates.size() == 1) {
-                transactionType = paymentTransactionCandidates.iterator().next().getTransactionType();
-                transactionExternalKey = json.getTransactionExternalKey();
-            } else {
-                // Note: we could bit a bit smarter but keep the logic in the payment system
-                verifyNonNullOrEmpty(null, "PaymentTransactionJson transactionType needs to be set");
-                // Never reached
-                return Response.status(Status.PRECONDITION_FAILED).build();
-            }
-        } else if (json != null && json.getTransactionType() != null) {
-            final Collection<PaymentTransaction> paymentTransactionCandidates = Collections2.<PaymentTransaction>filter(initialPayment.getTransactions(),
-                                                                                                                        new Predicate<PaymentTransaction>() {
-                                                                                                                            @Override
-                                                                                                                            public boolean apply(final PaymentTransaction input) {
-                                                                                                                                return input.getTransactionType().toString().equals(json.getTransactionType());
-                                                                                                                            }
-                                                                                                                        });
-            if (paymentTransactionCandidates.size() == 1) {
-                transactionType = TransactionType.valueOf(json.getTransactionType());
-                transactionExternalKey = paymentTransactionCandidates.iterator().next().getExternalKey();
-            } else {
-                verifyNonNullOrEmpty(null, "PaymentTransactionJson externalKey needs to be set");
-                // Never reached
-                return Response.status(Status.PRECONDITION_FAILED).build();
-            }
-        } else if (initialPayment.getTransactions().size() == 1) {
-            final PaymentTransaction paymentTransaction = initialPayment.getTransactions().get(0);
-            transactionType = paymentTransaction.getTransactionType();
-            transactionExternalKey = paymentTransaction.getExternalKey();
-        } else {
-            verifyNonNullOrEmpty(null, "PaymentTransactionJson transactionType and externalKey need to be set");
-            // Never reached
-            return Response.status(Status.PRECONDITION_FAILED).build();
-        }
+            final PaymentTransaction pendingTransaction = lookupPendingTransaction(initialPayment,
+                                                                                   json != null ? json.getTransactionId() : null,
+                                                                                   json != null ? json.getTransactionExternalKey() : null,
+                                                                                   json != null ? json.getTransactionType() : null);
 
-        final PaymentOptions paymentOptions = createControlPluginApiPaymentOptions(paymentControlPluginNames);
-        switch (transactionType) {
+            final PaymentOptions paymentOptions = createControlPluginApiPaymentOptions(paymentControlPluginNames);
+            switch (pendingTransaction.getTransactionType()) {
             case AUTHORIZE:
                 paymentApi.createAuthorizationWithPaymentControl(account, initialPayment.getPaymentMethodId(), initialPayment.getId(), amount, currency,
-                                                                 initialPayment.getExternalKey(), transactionExternalKey,
+                                                                 initialPayment.getExternalKey(), pendingTransaction.getExternalKey(),
                                                                  pluginProperties, paymentOptions, callContext);
                 break;
             case CAPTURE:
-                paymentApi.createCaptureWithPaymentControl(account, initialPayment.getId(), amount, currency, transactionExternalKey,
+                paymentApi.createCaptureWithPaymentControl(account, initialPayment.getId(), amount, currency, pendingTransaction.getExternalKey(),
                                                            pluginProperties, paymentOptions, callContext);
                 break;
             case PURCHASE:
                 paymentApi.createPurchaseWithPaymentControl(account, initialPayment.getPaymentMethodId(), initialPayment.getId(), amount, currency,
-                                                            initialPayment.getExternalKey(), transactionExternalKey,
+                                                            initialPayment.getExternalKey(), pendingTransaction.getExternalKey(),
                                                             pluginProperties, paymentOptions, callContext);
                 break;
             case CREDIT:
                 paymentApi.createCreditWithPaymentControl(account, initialPayment.getPaymentMethodId(), initialPayment.getId(), amount, currency,
-                                                          initialPayment.getExternalKey(), transactionExternalKey,
+                                                          initialPayment.getExternalKey(), pendingTransaction.getExternalKey(),
                                                           pluginProperties, paymentOptions, callContext);
                 break;
             case REFUND:
                 paymentApi.createRefundWithPaymentControl(account, initialPayment.getId(), amount, currency,
-                                                          transactionExternalKey, pluginProperties, paymentOptions, callContext);
+                                                          pendingTransaction.getExternalKey(), pluginProperties, paymentOptions, callContext);
                 break;
             default:
-                return Response.status(Status.PRECONDITION_FAILED).entity("TransactionType " + transactionType + " cannot be completed").build();
+                return Response.status(Status.PRECONDITION_FAILED).entity("TransactionType " + pendingTransaction.getTransactionType() + " cannot be completed").build();
         }
         return uriBuilder.buildResponse(uriInfo, PaymentResource.class, "getPayment", initialPayment.getId());
     }
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TransactionResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TransactionResource.java
index 60566af..09ae361 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TransactionResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TransactionResource.java
@@ -58,7 +58,7 @@ import com.wordnik.swagger.annotations.ApiResponses;
 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
 
 @Path(JaxrsResource.PAYMENT_TRANSACTIONS_PATH)
-@Api(value = JaxrsResource.PAYMENT_TRANSACTIONS, description = "Operations on payment transactions")
+@Api(value = JaxrsResource.PAYMENT_TRANSACTIONS_PATH, description = "Operations on payment transactions")
 public class TransactionResource extends JaxRsResourceBase {
 
     @Inject
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/util/Context.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/util/Context.java
index b2a67c0..a30a9d3 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/util/Context.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/util/Context.java
@@ -16,6 +16,8 @@
 
 package org.killbill.billing.jaxrs.util;
 
+import java.util.UUID;
+
 import javax.servlet.ServletRequest;
 
 import org.killbill.billing.jaxrs.resources.JaxrsResource;
@@ -26,6 +28,7 @@ import org.killbill.billing.util.callcontext.CallContextFactory;
 import org.killbill.billing.util.callcontext.CallOrigin;
 import org.killbill.billing.util.callcontext.TenantContext;
 import org.killbill.billing.util.callcontext.UserType;
+import org.killbill.commons.request.Request;
 
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
@@ -48,8 +51,10 @@ public class Context {
         try {
             Preconditions.checkNotNull(createdBy, String.format("Header %s needs to be set", JaxrsResource.HDR_CREATED_BY));
             final Tenant tenant = getTenantFromRequest(request);
+
+
             return contextFactory.createCallContext(tenant == null ? null : tenant.getId(), createdBy, origin, userType, reason,
-                                                    comment, UUIDs.randomUUID());
+                                                    comment, getOrCreateUserToken());
         } catch (final NullPointerException e) {
             throw new IllegalArgumentException(e.getMessage());
         }
@@ -65,6 +70,21 @@ public class Context {
         }
     }
 
+    // Use REQUEST_ID_HEADER if this is provided and lloks like a UUID, if not allocate a random one.
+    private UUID getOrCreateUserToken() {
+        UUID userToken;
+        if (Request.getPerThreadRequestData().getRequestId() != null) {
+            try {
+                userToken = UUID.fromString(Request.getPerThreadRequestData().getRequestId());
+            } catch (final IllegalArgumentException ignored) {
+                userToken = UUIDs.randomUUID();
+            }
+        } else {
+            userToken = UUIDs.randomUUID();
+        }
+        return userToken;
+    }
+
     private Tenant getTenantFromRequest(final ServletRequest request) {
         // See org.killbill.billing.server.security.TenantFilter
         final Object tenantObject = request.getAttribute("killbill_tenant");

junction/pom.xml 2(+1 -1)

diff --git a/junction/pom.xml b/junction/pom.xml
index 16f2cd0..90c49e6 100644
--- a/junction/pom.xml
+++ b/junction/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-junction</artifactId>

NEWS 3(+3 -0)

diff --git a/NEWS b/NEWS
index 3201911..70563fb 100644
--- a/NEWS
+++ b/NEWS
@@ -1,3 +1,6 @@
+0.16.3
+    See https://github.com/killbill/killbill/releases/tag/killbill-0.16.3
+
 0.16.2
     See https://github.com/killbill/killbill/releases/tag/killbill-0.16.2
 

overdue/pom.xml 2(+1 -1)

diff --git a/overdue/pom.xml b/overdue/pom.xml
index 47b77b1..0b1b54d 100644
--- a/overdue/pom.xml
+++ b/overdue/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-overdue</artifactId>

payment/pom.xml 2(+1 -1)

diff --git a/payment/pom.xml b/payment/pom.xml
index f33921d..7b7e187 100644
--- a/payment/pom.xml
+++ b/payment/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-payment</artifactId>
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentApi.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentApi.java
index 82fd053..7ec32d8 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentApi.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentApi.java
@@ -427,7 +427,7 @@ public class DefaultPaymentApi extends DefaultApiBase implements PaymentApi {
     @Override
     public List<PaymentMethod> getAccountPaymentMethods(final UUID accountId, final boolean withPluginInfo, final Iterable<PluginProperty> properties, final TenantContext context)
             throws PaymentApiException {
-        return paymentMethodProcessor.getPaymentMethods(accountId, withPluginInfo, properties, context, internalCallContextFactory.createInternalTenantContext(accountId, context));
+        return paymentMethodProcessor.getPaymentMethods(withPluginInfo, properties, context, internalCallContextFactory.createInternalTenantContext(accountId, context));
     }
 
     @Override
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PaymentMethodProcessor.java b/payment/src/main/java/org/killbill/billing/payment/core/PaymentMethodProcessor.java
index 11383e2..9c352c6 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/PaymentMethodProcessor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PaymentMethodProcessor.java
@@ -150,7 +150,7 @@ public class PaymentMethodProcessor extends ProcessorBase {
 
                                                                                                         private void validateUniqueExternalPaymentMethod(final UUID accountId, final String pluginName) throws PaymentApiException {
                                                                                                             if (ExternalPaymentProviderPlugin.PLUGIN_NAME.equals(pluginName)) {
-                                                                                                                final List<PaymentMethodModelDao> accountPaymentMethods = paymentDao.getPaymentMethods(accountId, context);
+                                                                                                                final List<PaymentMethodModelDao> accountPaymentMethods = paymentDao.getPaymentMethods(context);
                                                                                                                 if (Iterables.any(accountPaymentMethods, new Predicate<PaymentMethodModelDao>() {
                                                                                                                     @Override
                                                                                                                     public boolean apply(final PaymentMethodModelDao input) {
@@ -189,12 +189,12 @@ public class PaymentMethodProcessor extends ProcessorBase {
         }
     }
 
-    public List<PaymentMethod> getPaymentMethods(final UUID accountId, final boolean withPluginInfo, final Iterable<PluginProperty> properties, final InternalTenantContext context) throws PaymentApiException {
-        return getPaymentMethods(accountId, withPluginInfo, properties, buildTenantContext(context), context);
+    public List<PaymentMethod> getPaymentMethods(final boolean withPluginInfo, final Iterable<PluginProperty> properties, final InternalTenantContext context) throws PaymentApiException {
+        return getPaymentMethods(withPluginInfo, properties, buildTenantContext(context), context);
     }
 
-    public List<PaymentMethod> getPaymentMethods(final UUID accountId, final boolean withPluginInfo, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext context) throws PaymentApiException {
-        final List<PaymentMethodModelDao> paymentMethodModels = paymentDao.getPaymentMethods(accountId, context);
+    public List<PaymentMethod> getPaymentMethods(final boolean withPluginInfo, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext context) throws PaymentApiException {
+        final List<PaymentMethodModelDao> paymentMethodModels = paymentDao.getPaymentMethods(context);
         if (paymentMethodModels.isEmpty()) {
             return Collections.emptyList();
         }
@@ -353,8 +353,8 @@ public class PaymentMethodProcessor extends ProcessorBase {
                                   );
     }
 
-    public PaymentMethod getExternalPaymentMethod(final UUID accountId, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext context) throws PaymentApiException {
-        final List<PaymentMethod> paymentMethods = getPaymentMethods(accountId, false, properties, tenantContext, context);
+    public PaymentMethod getExternalPaymentMethod(final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext context) throws PaymentApiException {
+        final List<PaymentMethod> paymentMethods = getPaymentMethods(false, properties, tenantContext, context);
         for (final PaymentMethod paymentMethod : paymentMethods) {
             if (ExternalPaymentProviderPlugin.PLUGIN_NAME.equals(paymentMethod.getPluginName())) {
                 return paymentMethod;
@@ -366,7 +366,7 @@ public class PaymentMethodProcessor extends ProcessorBase {
     public UUID createOrGetExternalPaymentMethod(final String paymentMethodExternalKey, final Account account, final Iterable<PluginProperty> properties, final CallContext callContext, final InternalCallContext context) throws PaymentApiException {
         // Check if this account has already used the external payment plugin
         // If not, it's the first time - add a payment method for it
-        final PaymentMethod externalPaymentMethod = getExternalPaymentMethod(account.getId(), properties, callContext, context);
+        final PaymentMethod externalPaymentMethod = getExternalPaymentMethod(properties, callContext, context);
         if (externalPaymentMethod != null) {
             return externalPaymentMethod.getId();
         }
@@ -534,8 +534,7 @@ public class PaymentMethodProcessor extends ProcessorBase {
                         }
                     }
 
-                    final List<PaymentMethodModelDao> refreshedPaymentMethods = paymentDao.refreshPaymentMethods(account.getId(),
-                                                                                                                 pluginName,
+                    final List<PaymentMethodModelDao> refreshedPaymentMethods = paymentDao.refreshPaymentMethods(pluginName,
                                                                                                                  finalPaymentMethods,
                                                                                                                  context);
 
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentLeavingStateCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentLeavingStateCallback.java
index edc6f70..9fec37d 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentLeavingStateCallback.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentLeavingStateCallback.java
@@ -72,7 +72,7 @@ public abstract class PaymentLeavingStateCallback implements LeavingStateCallbac
             }
 
             // Validate the payment transactions belong to the right payment
-            validatePaymentId(existingPaymentTransactions);
+            validatePaymentIdAndTransactionType(existingPaymentTransactions);
 
             // Validate some constraints on the unicity of that paymentTransactionExternalKey
             validateUniqueTransactionExternalKey(existingPaymentTransactions);
@@ -134,11 +134,15 @@ public abstract class PaymentLeavingStateCallback implements LeavingStateCallbac
     }
 
     // At this point, the payment id should have been populated for follow-up transactions (see PaymentAutomationRunner#run)
-    protected void validatePaymentId(final List<PaymentTransactionModelDao> existingPaymentTransactions) throws PaymentApiException {
+    protected void validatePaymentIdAndTransactionType(final List<PaymentTransactionModelDao> existingPaymentTransactions) throws PaymentApiException {
         for (final PaymentTransactionModelDao paymentTransactionModelDao : existingPaymentTransactions) {
             if (!paymentTransactionModelDao.getPaymentId().equals(paymentStateContext.getPaymentId())) {
                 throw new PaymentApiException(ErrorCode.PAYMENT_INVALID_PARAMETER, paymentTransactionModelDao.getId(), "does not belong to payment " + paymentStateContext.getPaymentId());
             }
+            if (paymentStateContext.getTransactionType() != null && paymentTransactionModelDao.getTransactionType() != paymentStateContext.getTransactionType()) {
+                throw new PaymentApiException(ErrorCode.PAYMENT_INVALID_PARAMETER, paymentTransactionModelDao.getId(), "has a transaction type of " + paymentTransactionModelDao.getTransactionType() +
+                                                                                                                       " instead of requested " + paymentStateContext.getTransactionType());
+            }
         }
     }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
index 2c849ca..2053ba1 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
@@ -444,11 +444,11 @@ public class DefaultPaymentDao implements PaymentDao {
     }
 
     @Override
-    public List<PaymentMethodModelDao> getPaymentMethods(final UUID accountId, final InternalTenantContext context) {
+    public List<PaymentMethodModelDao> getPaymentMethods(final InternalTenantContext context) {
         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentMethodModelDao>>() {
             @Override
             public List<PaymentMethodModelDao> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
-                return entitySqlDaoWrapperFactory.become(PaymentMethodSqlDao.class).getByAccountId(accountId.toString(), context);
+                return entitySqlDaoWrapperFactory.become(PaymentMethodSqlDao.class).getForAccount(context);
             }
         });
     }
@@ -513,8 +513,7 @@ public class DefaultPaymentDao implements PaymentDao {
     }
 
     @Override
-    public List<PaymentMethodModelDao> refreshPaymentMethods(final UUID accountId, final String pluginName,
-                                                             final List<PaymentMethodModelDao> newPaymentMethods, final InternalCallContext context) {
+    public List<PaymentMethodModelDao> refreshPaymentMethods(final String pluginName, final List<PaymentMethodModelDao> newPaymentMethods, final InternalCallContext context) {
         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentMethodModelDao>>() {
 
             @Override
@@ -523,7 +522,7 @@ public class DefaultPaymentDao implements PaymentDao {
                 // Look at all payment methods, including deleted ones. We assume that newPaymentMethods (payment methods returned by the plugin)
                 // is the full set of non-deleted payment methods in the plugin. If a payment method was marked as deleted on our side,
                 // but is still existing in the plugin, we will un-delete it.
-                final List<PaymentMethodModelDao> allPaymentMethodsForAccount = transactional.getByAccountIdIncludedDelete(accountId.toString(), context);
+                final List<PaymentMethodModelDao> allPaymentMethodsForAccount = transactional.getForAccountIncludedDelete(context);
 
                 // Consider only the payment methods for the plugin we are refreshing
                 final Collection<PaymentMethodModelDao> existingPaymentMethods = Collections2.filter(allPaymentMethodsForAccount,
@@ -567,7 +566,7 @@ public class DefaultPaymentDao implements PaymentDao {
                         deletedPaymentMethodInTransaction(entitySqlDaoWrapperFactory, existingPaymentMethod.getId(), context);
                     }
                 }
-                return transactional.getByAccountId(accountId.toString(), context);
+                return transactional.getForAccount(context);
             }
         });
     }
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
index 69b6941..4c3eeee 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
@@ -84,7 +84,7 @@ public interface PaymentDao {
 
     public PaymentMethodModelDao getPaymentMethodByExternalKeyIncludedDeleted(String paymentMethodExternalKey, InternalTenantContext context);
 
-    public List<PaymentMethodModelDao> getPaymentMethods(UUID accountId, InternalTenantContext context);
+    public List<PaymentMethodModelDao> getPaymentMethods(InternalTenantContext context);
 
     public Pagination<PaymentMethodModelDao> getPaymentMethods(String pluginName, Long offset, Long limit, InternalTenantContext context);
 
@@ -92,5 +92,5 @@ public interface PaymentDao {
 
     public void deletedPaymentMethod(UUID paymentMethodId, InternalCallContext context);
 
-    public List<PaymentMethodModelDao> refreshPaymentMethods(UUID accountId, String pluginName, List<PaymentMethodModelDao> paymentMethods, InternalCallContext context);
+    public List<PaymentMethodModelDao> refreshPaymentMethods(String pluginName, List<PaymentMethodModelDao> paymentMethods, InternalCallContext context);
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentMethodSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentMethodSqlDao.java
index 0db4703..ec7cc3d 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentMethodSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentMethodSqlDao.java
@@ -1,7 +1,9 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
  *
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
  * License.  You may obtain a copy of the License at:
  *
@@ -57,10 +59,10 @@ public interface PaymentMethodSqlDao extends EntitySqlDao<PaymentMethodModelDao,
                                                          @BindBean final InternalTenantContext context);
 
     @SqlQuery
-    List<PaymentMethodModelDao> getByAccountId(@Bind("accountId") final String accountId, @BindBean final InternalTenantContext context);
+    List<PaymentMethodModelDao> getForAccount(@BindBean final InternalTenantContext context);
 
     @SqlQuery
-    List<PaymentMethodModelDao> getByAccountIdIncludedDelete(@Bind("accountId") final String accountId, @BindBean final InternalTenantContext context);
+    List<PaymentMethodModelDao> getForAccountIncludedDelete(@BindBean final InternalTenantContext context);
 
     @SqlQuery
     @SmartFetchSize(shouldStream = true)
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentMethodSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentMethodSqlDao.sql.stg
index aeccb57..7d2d59c 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentMethodSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentMethodSqlDao.sql.stg
@@ -75,21 +75,21 @@ where id = :id
 ;
 >>
 
-getByAccountId(accountId) ::= <<
+getForAccount() ::= <<
 select
 <allTableFields()>
 from <tableName()>
-where account_id = :accountId
+where <accountRecordIdField()> = :accountRecordId
 <andCheckSoftDeletionWithComma()>
 <AND_CHECK_TENANT()>
 ;
 >>
 
-getByAccountIdIncludedDelete(accountId) ::= <<
+getForAccountIncludedDelete() ::= <<
 select
 <allTableFields()>
 from <tableName()>
-where account_id = :accountId
+where <accountRecordIdField()> = :accountRecordId
 <AND_CHECK_TENANT()>
 ;
 >>
diff --git a/payment/src/main/resources/org/killbill/billing/payment/ddl.sql b/payment/src/main/resources/org/killbill/billing/payment/ddl.sql
index 98b3c0a..fef6bd7 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/ddl.sql
+++ b/payment/src/main/resources/org/killbill/billing/payment/ddl.sql
@@ -77,7 +77,6 @@ CREATE TABLE payment_methods (
 CREATE UNIQUE INDEX payment_methods_id ON payment_methods(id);
 CREATE UNIQUE INDEX payment_methods_external_key ON payment_methods(external_key, tenant_record_id);
 CREATE INDEX payment_methods_plugin_name ON payment_methods(plugin_name);
-CREATE INDEX payment_methods_active_accnt ON payment_methods(is_active, account_id);
 CREATE INDEX payment_methods_tenant_account_record_id ON payment_methods(tenant_record_id, account_record_id);
 
 DROP TABLE IF EXISTS payment_method_history;
diff --git a/payment/src/main/resources/org/killbill/billing/payment/migration/V20160324060345__revisit_payment_methods_indexes_509.sql b/payment/src/main/resources/org/killbill/billing/payment/migration/V20160324060345__revisit_payment_methods_indexes_509.sql
new file mode 100644
index 0000000..3b74b7a
--- /dev/null
+++ b/payment/src/main/resources/org/killbill/billing/payment/migration/V20160324060345__revisit_payment_methods_indexes_509.sql
@@ -0,0 +1 @@
+drop index payment_methods_active_accnt on payment_methods;
diff --git a/payment/src/test/java/org/killbill/billing/payment/api/TestPaymentApi.java b/payment/src/test/java/org/killbill/billing/payment/api/TestPaymentApi.java
index a3cca2f..42eafdc 100644
--- a/payment/src/test/java/org/killbill/billing/payment/api/TestPaymentApi.java
+++ b/payment/src/test/java/org/killbill/billing/payment/api/TestPaymentApi.java
@@ -1286,6 +1286,34 @@ public class TestPaymentApi extends PaymentTestSuiteWithEmbeddedDB {
         assertTrue(spyLogger.contains("TimeoutException.*" + pluginName, Optional.of(SpyLogger.LOG_LEVEL_WARN)));
     }
 
+
+    @Test(groups = "slow")
+    public void testSanityAcrossTransactionTypes() throws PaymentApiException {
+
+        final BigDecimal requestedAmount = BigDecimal.TEN;
+        final String paymentExternalKey = "ahhhhhhhh";
+        final String transactionExternalKey = "okkkkkkk";
+
+        final Payment pendingPayment = createPayment(TransactionType.AUTHORIZE, null, paymentExternalKey, transactionExternalKey, requestedAmount, PaymentPluginStatus.PENDING);
+        assertNotNull(pendingPayment);
+        Assert.assertEquals(pendingPayment.getExternalKey(), paymentExternalKey);
+        Assert.assertEquals(pendingPayment.getTransactions().size(), 1);
+        Assert.assertEquals(pendingPayment.getTransactions().get(0).getAmount().compareTo(requestedAmount), 0);
+        Assert.assertEquals(pendingPayment.getTransactions().get(0).getProcessedAmount().compareTo(requestedAmount), 0);
+        Assert.assertEquals(pendingPayment.getTransactions().get(0).getCurrency(), account.getCurrency());
+        Assert.assertEquals(pendingPayment.getTransactions().get(0).getExternalKey(), transactionExternalKey);
+        Assert.assertEquals(pendingPayment.getTransactions().get(0).getTransactionStatus(), TransactionStatus.PENDING);
+
+
+        try {
+            createPayment(TransactionType.PURCHASE, null, paymentExternalKey, transactionExternalKey, requestedAmount, PaymentPluginStatus.PENDING);
+            Assert.fail("PURCHASE transaction with same key should have failed");
+        } catch (final PaymentApiException expected) {
+            Assert.assertEquals(expected.getCode(), ErrorCode.PAYMENT_INVALID_PARAMETER.getCode());
+        }
+
+    }
+
     private void verifyRefund(final Payment refund, final String paymentExternalKey, final String paymentTransactionExternalKey, final String refundTransactionExternalKey, final BigDecimal requestedAmount, final BigDecimal refundAmount, final TransactionStatus transactionStatus) {
         Assert.assertEquals(refund.getExternalKey(), paymentExternalKey);
         Assert.assertEquals(refund.getTransactions().size(), 2);
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryablePayment.java b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryablePayment.java
index 5b74e38..5a50314 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryablePayment.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryablePayment.java
@@ -1,6 +1,6 @@
 /*
- * Copyright 2014-2015 Groupon, Inc
- * Copyright 2014-2015 The Billing Project, LLC
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -44,7 +44,6 @@ import org.killbill.billing.payment.core.PaymentProcessor;
 import org.killbill.billing.payment.core.PluginControlPaymentProcessor;
 import org.killbill.billing.payment.core.sm.control.ControlPluginRunner;
 import org.killbill.billing.payment.core.sm.control.PaymentStateControlContext;
-import org.killbill.billing.payment.dao.MockPaymentDao;
 import org.killbill.billing.payment.dao.PaymentAttemptModelDao;
 import org.killbill.billing.payment.dao.PaymentDao;
 import org.killbill.billing.payment.dao.PaymentModelDao;
@@ -157,7 +156,6 @@ public class TestRetryablePayment extends PaymentTestSuiteNoDB {
     @BeforeMethod(groups = "fast")
     public void beforeMethod() throws Exception {
         super.beforeMethod();
-        ((MockPaymentDao) paymentDao).reset();
         this.utcNow = clock.getUTCNow();
 
         runner = new MockRetryablePaymentAutomatonRunner(
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/TestPaymentMethodProcessorNoDB.java b/payment/src/test/java/org/killbill/billing/payment/core/TestPaymentMethodProcessorNoDB.java
index 5da756d..226a34b 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/TestPaymentMethodProcessorNoDB.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/TestPaymentMethodProcessorNoDB.java
@@ -59,11 +59,11 @@ public class TestPaymentMethodProcessorNoDB extends PaymentTestSuiteNoDB {
         Mockito.when(account.getId()).thenReturn(accountId);
         Mockito.when(account.getExternalKey()).thenReturn(accountId.toString());
 
-        Assert.assertEquals(paymentMethodProcessor.getPaymentMethods(account.getId(), false, properties, internalCallContext).size(), 0);
+        Assert.assertEquals(paymentMethodProcessor.getPaymentMethods(false, properties, internalCallContext).size(), 0);
 
         // The first call should create the payment method
         final ExternalPaymentProviderPlugin providerPlugin = paymentMethodProcessor.createPaymentMethodAndGetExternalPaymentProviderPlugin(UUID.randomUUID().toString(), account, properties, callContext, internalCallContext);
-        final List<PaymentMethod> paymentMethods = paymentMethodProcessor.getPaymentMethods(account.getId(), false, properties, internalCallContext);
+        final List<PaymentMethod> paymentMethods = paymentMethodProcessor.getPaymentMethods(false, properties, internalCallContext);
         Assert.assertEquals(paymentMethods.size(), 1);
         Assert.assertEquals(paymentMethods.get(0).getPluginName(), ExternalPaymentProviderPlugin.PLUGIN_NAME);
         Assert.assertEquals(paymentMethods.get(0).getAccountId(), account.getId());
@@ -74,7 +74,7 @@ public class TestPaymentMethodProcessorNoDB extends PaymentTestSuiteNoDB {
             final ExternalPaymentProviderPlugin foundProviderPlugin = paymentMethodProcessor.createPaymentMethodAndGetExternalPaymentProviderPlugin(UUID.randomUUID().toString(), account, properties, callContext, internalCallContext);
             Assert.assertNotNull(foundProviderPlugin);
 
-            final List<PaymentMethod> foundPaymentMethods = paymentMethodProcessor.getPaymentMethods(account.getId(), false, properties, internalCallContext);
+            final List<PaymentMethod> foundPaymentMethods = paymentMethodProcessor.getPaymentMethods(false, properties, internalCallContext);
             Assert.assertEquals(foundPaymentMethods.size(), 1);
             Assert.assertEquals(foundPaymentMethods.get(0).getPluginName(), ExternalPaymentProviderPlugin.PLUGIN_NAME);
             Assert.assertEquals(foundPaymentMethods.get(0).getAccountId(), account.getId());
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
index d479336..3f314c8 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2015 Groupon, Inc
- * Copyright 2014-2015 The Billing Project, LLC
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -50,6 +50,7 @@ public class MockPaymentDao implements PaymentDao {
     private final Map<UUID, PaymentAttemptModelDao> attempts = new HashMap<UUID, PaymentAttemptModelDao>();
 
     private final MockNonEntityDao mockNonEntityDao;
+    private final List<PaymentMethodModelDao> paymentMethods = new LinkedList<PaymentMethodModelDao>();
 
     @Inject
     public MockPaymentDao(final MockNonEntityDao mockNonEntityDao) {
@@ -59,6 +60,7 @@ public class MockPaymentDao implements PaymentDao {
     public void reset() {
         synchronized (this) {
             payments.clear();
+            paymentMethods.clear();
             transactions.clear();
             attempts.clear();
         }
@@ -66,7 +68,7 @@ public class MockPaymentDao implements PaymentDao {
 
     @Override
     public Pagination<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, Long offset, Long limit) {
-        final List<PaymentTransactionModelDao> result=  ImmutableList.copyOf(Iterables.filter(transactions.values(), new Predicate<PaymentTransactionModelDao>() {
+        final List<PaymentTransactionModelDao> result = ImmutableList.copyOf(Iterables.filter(transactions.values(), new Predicate<PaymentTransactionModelDao>() {
             @Override
             public boolean apply(final PaymentTransactionModelDao input) {
                 return Iterables.any(transactionStatuses, new Predicate<TransactionStatus>() {
@@ -311,11 +313,11 @@ public class MockPaymentDao implements PaymentDao {
         }
     }
 
-    private final List<PaymentMethodModelDao> paymentMethods = new LinkedList<PaymentMethodModelDao>();
-
     @Override
     public PaymentMethodModelDao insertPaymentMethod(final PaymentMethodModelDao paymentMethod, final InternalCallContext context) {
         synchronized (this) {
+            paymentMethod.setAccountRecordId(context.getAccountRecordId());
+            paymentMethod.setTenantRecordId(context.getTenantRecordId());
             paymentMethods.add(paymentMethod);
             return paymentMethod;
         }
@@ -346,11 +348,11 @@ public class MockPaymentDao implements PaymentDao {
     }
 
     @Override
-    public List<PaymentMethodModelDao> getPaymentMethods(final UUID accountId, final InternalTenantContext context) {
+    public List<PaymentMethodModelDao> getPaymentMethods(final InternalTenantContext context) {
         synchronized (this) {
             final List<PaymentMethodModelDao> result = new ArrayList<PaymentMethodModelDao>();
             for (final PaymentMethodModelDao cur : paymentMethods) {
-                if (cur.getAccountId().equals(accountId)) {
+                if (cur.getAccountRecordId().equals(context.getAccountRecordId())) {
                     result.add(cur);
                 }
             }
@@ -383,7 +385,7 @@ public class MockPaymentDao implements PaymentDao {
     }
 
     @Override
-    public List<PaymentMethodModelDao> refreshPaymentMethods(final UUID accountId, final String pluginName, final List<PaymentMethodModelDao> paymentMethods, final InternalCallContext context) {
+    public List<PaymentMethodModelDao> refreshPaymentMethods(final String pluginName, final List<PaymentMethodModelDao> paymentMethods, final InternalCallContext context) {
         return ImmutableList.<PaymentMethodModelDao>of();
     }
 
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
index 8bc6634..91a98e1 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
@@ -228,7 +228,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         assertEquals(savedMethod.getPluginName(), pluginName);
         assertEquals(savedMethod.isActive(), isActive);
 
-        final List<PaymentMethodModelDao> result = paymentDao.getPaymentMethods(accountId, internalCallContext);
+        final List<PaymentMethodModelDao> result = paymentDao.getPaymentMethods(internalCallContext);
         assertEquals(result.size(), 1);
         savedMethod = result.get(0);
         assertEquals(savedMethod.getId(), paymentMethodId);
diff --git a/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteNoDB.java b/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteNoDB.java
index 9f16097..1b8eecb 100644
--- a/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteNoDB.java
+++ b/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteNoDB.java
@@ -30,6 +30,7 @@ import org.killbill.billing.payment.core.PaymentProcessor;
 import org.killbill.billing.payment.core.PluginControlPaymentProcessor;
 import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
 import org.killbill.billing.payment.core.sm.PluginControlPaymentAutomatonRunner;
+import org.killbill.billing.payment.dao.MockPaymentDao;
 import org.killbill.billing.payment.dao.PaymentDao;
 import org.killbill.billing.payment.glue.TestPaymentModuleNoDB;
 import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
@@ -104,6 +105,7 @@ public abstract class PaymentTestSuiteNoDB extends GuicyKillbillTestSuiteNoDB {
     public void beforeMethod() throws Exception {
         eventBus.start();
         paymentExecutors.initialize();
+        ((MockPaymentDao) paymentDao).reset();
         Profiling.resetPerThreadProfilingData();
     }
 
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java b/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
index d9a8c92..521911c 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
@@ -34,7 +34,6 @@ import org.killbill.billing.invoice.api.Invoice;
 import org.killbill.billing.payment.api.Payment;
 import org.killbill.billing.payment.api.PaymentApiException;
 import org.killbill.billing.payment.api.PluginProperty;
-import org.killbill.billing.payment.dao.MockPaymentDao;
 import org.killbill.billing.payment.dao.PaymentAttemptModelDao;
 import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
 import org.killbill.billing.payment.invoice.InvoicePaymentControlPluginApi;
@@ -70,7 +69,6 @@ public class TestRetryService extends PaymentTestSuiteNoDB {
         setDefaultPollInterval(Duration.ONE_HUNDRED_MILLISECONDS);
         Awaitility.setDefaultPollDelay(Duration.SAME_AS_POLL_INTERVAL);
 
-        ((MockPaymentDao) paymentDao).reset();
         mockPaymentProviderPlugin = (MockPaymentProviderPlugin) registry.getServiceForName(MockPaymentProviderPlugin.PLUGIN_NAME);
         mockPaymentProviderPlugin.clear();
         retryService.initialize();

pom.xml 2(+1 -1)

diff --git a/pom.xml b/pom.xml
index ff2e049..d1d92d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,7 +24,7 @@
         <version>0.87-SNAPSHOT</version>
     </parent>
     <artifactId>killbill</artifactId>
-    <version>0.16.3-SNAPSHOT</version>
+    <version>0.16.4-SNAPSHOT</version>
     <packaging>pom</packaging>
     <name>killbill</name>
     <description>Library for managing recurring subscriptions and the associated billing</description>
diff --git a/profiles/killbill/pom.xml b/profiles/killbill/pom.xml
index a698349..2bd7aba 100644
--- a/profiles/killbill/pom.xml
+++ b/profiles/killbill/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>killbill-profiles</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-profiles-killbill</artifactId>
diff --git a/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestInvoice.java b/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestInvoice.java
index 7c72920..1ba519f 100644
--- a/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestInvoice.java
+++ b/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestInvoice.java
@@ -118,7 +118,7 @@ public class TestInvoice extends TestJaxrsBase {
                                   "            <tr>\n" +
                                   "                <td />\n" +
                                   "                <td align=right>invoiceNumber</td>\n" +
-                                  "                <td>1</td>\n" +
+                                  "                <td>" + invoiceJson.getInvoiceNumber() + "</td>\n" +
                                   "            </tr>\n" +
                                   "            <tr>\n" +
                                   "                <td>companyName</td>\n" +
diff --git a/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestPayment.java b/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestPayment.java
index b76434b..b46915f 100644
--- a/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestPayment.java
+++ b/profiles/killbill/src/test/java/org/killbill/billing/jaxrs/TestPayment.java
@@ -33,6 +33,7 @@ import org.killbill.billing.client.model.PaymentTransaction;
 import org.killbill.billing.client.model.Payments;
 import org.killbill.billing.client.model.PluginProperty;
 import org.killbill.billing.osgi.api.OSGIServiceRegistration;
+import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.api.TransactionType;
 import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
 import org.killbill.billing.payment.plugin.api.PaymentPluginStatus;
@@ -47,6 +48,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
 
 public class TestPayment extends TestJaxrsBase {
 
@@ -139,6 +141,185 @@ public class TestPayment extends TestJaxrsBase {
     }
 
     @Test(groups = "slow")
+    public void testAuthorizeCompletionUsingPaymentId() throws Exception {
+        final Account account = createAccountWithDefaultPaymentMethod();
+        final UUID paymentMethodId = account.getPaymentMethodId();
+        final BigDecimal amount = BigDecimal.TEN;
+
+        final String pending = PaymentPluginStatus.PENDING.toString();
+        final ImmutableMap<String, String> pendingPluginProperties = ImmutableMap.<String, String>of(MockPaymentProviderPlugin.PLUGIN_PROPERTY_PAYMENT_PLUGIN_STATUS_OVERRIDE, pending);
+
+        final ImmutableMap<String, String> pluginProperties = ImmutableMap.of();
+
+        TransactionType transactionType = TransactionType.AUTHORIZE;
+        final String paymentExternalKey = UUID.randomUUID().toString();
+        final String authTransactionExternalKey = UUID.randomUUID().toString();
+
+        final Payment initialPayment = createVerifyTransaction(account, paymentMethodId, paymentExternalKey, authTransactionExternalKey, transactionType, pending, amount, BigDecimal.ZERO, pendingPluginProperties, 1);
+
+        // Complete operation: first, only specify the payment id
+        final PaymentTransaction completeTransactionByPaymentId = new PaymentTransaction();
+        completeTransactionByPaymentId.setPaymentId(initialPayment.getPaymentId());
+        final Payment completedPaymentByPaymentId = killBillClient.completePayment(completeTransactionByPaymentId, pluginProperties, createdBy, reason, comment);
+        verifyPayment(account, paymentMethodId, completedPaymentByPaymentId, paymentExternalKey, authTransactionExternalKey, transactionType.toString(), TransactionStatus.SUCCESS.name(), amount, amount, BigDecimal.ZERO, BigDecimal.ZERO, 1, 1);
+    }
+
+
+    @Test(groups = "slow")
+    public void testAuthorizeCompletionUsingPaymentIdAndTransactionId() throws Exception {
+        final Account account = createAccountWithDefaultPaymentMethod();
+        final UUID paymentMethodId = account.getPaymentMethodId();
+        final BigDecimal amount = BigDecimal.TEN;
+
+        final String pending = PaymentPluginStatus.PENDING.toString();
+        final ImmutableMap<String, String> pendingPluginProperties = ImmutableMap.<String, String>of(MockPaymentProviderPlugin.PLUGIN_PROPERTY_PAYMENT_PLUGIN_STATUS_OVERRIDE, pending);
+
+        final ImmutableMap<String, String> pluginProperties = ImmutableMap.of();
+
+        TransactionType transactionType = TransactionType.AUTHORIZE;
+        final String paymentExternalKey = UUID.randomUUID().toString();
+        final String authTransactionExternalKey = UUID.randomUUID().toString();
+
+        final Payment initialPayment = createVerifyTransaction(account, paymentMethodId, paymentExternalKey, authTransactionExternalKey, transactionType, pending, amount, BigDecimal.ZERO, pendingPluginProperties, 1);
+
+
+        final PaymentTransaction completeTransactionByPaymentIdAndInvalidTransactionId = new PaymentTransaction();
+        completeTransactionByPaymentIdAndInvalidTransactionId.setPaymentId(initialPayment.getPaymentId());
+        completeTransactionByPaymentIdAndInvalidTransactionId.setTransactionId(UUID.randomUUID());
+        try {
+            killBillClient.completePayment(completeTransactionByPaymentIdAndInvalidTransactionId, pluginProperties, createdBy, reason, comment);
+            fail("Payment completion should fail when invalid transaction id has been provided" );
+        } catch (final KillBillClientException expected) {
+        }
+
+        final PaymentTransaction completeTransactionByPaymentIdAndTransactionId = new PaymentTransaction();
+        completeTransactionByPaymentIdAndTransactionId.setPaymentId(initialPayment.getPaymentId());
+        completeTransactionByPaymentIdAndTransactionId.setTransactionId(initialPayment.getTransactions().get(0).getTransactionId());
+        final Payment completedPaymentByPaymentId = killBillClient.completePayment(completeTransactionByPaymentIdAndTransactionId, pluginProperties, createdBy, reason, comment);
+        verifyPayment(account, paymentMethodId, completedPaymentByPaymentId, paymentExternalKey, authTransactionExternalKey, transactionType.toString(), TransactionStatus.SUCCESS.name(), amount, amount, BigDecimal.ZERO, BigDecimal.ZERO, 1, 1);
+    }
+
+    @Test(groups = "slow")
+    public void testAuthorizeCompletionUsingPaymentIdAndTransactionExternalKey() throws Exception {
+        final Account account = createAccountWithDefaultPaymentMethod();
+        final UUID paymentMethodId = account.getPaymentMethodId();
+        final BigDecimal amount = BigDecimal.TEN;
+
+        final String pending = PaymentPluginStatus.PENDING.toString();
+        final ImmutableMap<String, String> pendingPluginProperties = ImmutableMap.<String, String>of(MockPaymentProviderPlugin.PLUGIN_PROPERTY_PAYMENT_PLUGIN_STATUS_OVERRIDE, pending);
+
+        final ImmutableMap<String, String> pluginProperties = ImmutableMap.of();
+
+        TransactionType transactionType = TransactionType.AUTHORIZE;
+        final String paymentExternalKey = UUID.randomUUID().toString();
+        final String authTransactionExternalKey = UUID.randomUUID().toString();
+
+        final Payment initialPayment = createVerifyTransaction(account, paymentMethodId, paymentExternalKey, authTransactionExternalKey, transactionType, pending, amount, BigDecimal.ZERO, pendingPluginProperties, 1);
+
+        final PaymentTransaction completeTransactionByPaymentIdAndInvalidTransactionExternalKey = new PaymentTransaction();
+        completeTransactionByPaymentIdAndInvalidTransactionExternalKey.setPaymentId(initialPayment.getPaymentId());
+        completeTransactionByPaymentIdAndInvalidTransactionExternalKey.setTransactionExternalKey("bozo");
+        try {
+            killBillClient.completePayment(completeTransactionByPaymentIdAndInvalidTransactionExternalKey, pluginProperties, createdBy, reason, comment);
+            fail("Payment completion should fail when invalid transaction externalKey has been provided" );
+        } catch (final KillBillClientException expected) {
+        }
+
+        final PaymentTransaction completeTransactionByPaymentIdAndTransactionExternalKey = new PaymentTransaction();
+        completeTransactionByPaymentIdAndTransactionExternalKey.setPaymentId(initialPayment.getPaymentId());
+        completeTransactionByPaymentIdAndTransactionExternalKey.setTransactionExternalKey(authTransactionExternalKey);
+        final Payment completedPaymentByPaymentId = killBillClient.completePayment(completeTransactionByPaymentIdAndTransactionExternalKey, pluginProperties, createdBy, reason, comment);
+        verifyPayment(account, paymentMethodId, completedPaymentByPaymentId, paymentExternalKey, authTransactionExternalKey, transactionType.toString(), TransactionStatus.SUCCESS.name(), amount, amount, BigDecimal.ZERO, BigDecimal.ZERO, 1, 1);
+    }
+
+
+    @Test(groups = "slow")
+    public void testAuthorizeCompletionUsingPaymentIdAndTransactionType() throws Exception {
+        final Account account = createAccountWithDefaultPaymentMethod();
+        final UUID paymentMethodId = account.getPaymentMethodId();
+        final BigDecimal amount = BigDecimal.TEN;
+
+        final String pending = PaymentPluginStatus.PENDING.toString();
+        final ImmutableMap<String, String> pendingPluginProperties = ImmutableMap.<String, String>of(MockPaymentProviderPlugin.PLUGIN_PROPERTY_PAYMENT_PLUGIN_STATUS_OVERRIDE, pending);
+
+        final ImmutableMap<String, String> pluginProperties = ImmutableMap.of();
+
+        TransactionType transactionType = TransactionType.AUTHORIZE;
+        final String paymentExternalKey = UUID.randomUUID().toString();
+        final String authTransactionExternalKey = UUID.randomUUID().toString();
+
+        final Payment initialPayment = createVerifyTransaction(account, paymentMethodId, paymentExternalKey, authTransactionExternalKey, transactionType, pending, amount, BigDecimal.ZERO, pendingPluginProperties, 1);
+
+
+        final PaymentTransaction completeTransactionByPaymentIdAndInvalidTransactionType = new PaymentTransaction();
+        completeTransactionByPaymentIdAndInvalidTransactionType.setPaymentId(initialPayment.getPaymentId());
+        completeTransactionByPaymentIdAndInvalidTransactionType.setTransactionType(TransactionType.CAPTURE.name());
+        try {
+            killBillClient.completePayment(completeTransactionByPaymentIdAndInvalidTransactionType, pluginProperties, createdBy, reason, comment);
+            fail("Payment completion should fail when invalid transaction type has been provided" );
+        } catch (final KillBillClientException expected) {
+        }
+
+        final PaymentTransaction completeTransactionByPaymentIdAndTransactionType = new PaymentTransaction();
+        completeTransactionByPaymentIdAndTransactionType.setPaymentId(initialPayment.getPaymentId());
+        completeTransactionByPaymentIdAndTransactionType.setTransactionType(transactionType.name());
+        final Payment completedPaymentByPaymentId = killBillClient.completePayment(completeTransactionByPaymentIdAndTransactionType, pluginProperties, createdBy, reason, comment);
+        verifyPayment(account, paymentMethodId, completedPaymentByPaymentId, paymentExternalKey, authTransactionExternalKey, transactionType.toString(), TransactionStatus.SUCCESS.name(), amount, amount, BigDecimal.ZERO, BigDecimal.ZERO, 1, 1);
+    }
+
+    @Test(groups = "slow")
+    public void testAuthorizeCompletionUsingExternalKey() throws Exception {
+
+        final Account account = createAccountWithDefaultPaymentMethod();
+        final UUID paymentMethodId = account.getPaymentMethodId();
+        final BigDecimal amount = BigDecimal.TEN;
+
+        final String pending = PaymentPluginStatus.PENDING.toString();
+        final ImmutableMap<String, String> pendingPluginProperties = ImmutableMap.<String, String>of(MockPaymentProviderPlugin.PLUGIN_PROPERTY_PAYMENT_PLUGIN_STATUS_OVERRIDE, pending);
+
+        final ImmutableMap<String, String> pluginProperties = ImmutableMap.of();
+
+        TransactionType transactionType = TransactionType.AUTHORIZE;
+        final String paymentExternalKey = UUID.randomUUID().toString();
+        final String authTransactionExternalKey = UUID.randomUUID().toString();
+
+        final Payment initialPayment = createVerifyTransaction(account, paymentMethodId, paymentExternalKey, authTransactionExternalKey, transactionType, pending, amount, BigDecimal.ZERO, pendingPluginProperties, 1);
+
+        final PaymentTransaction completeTransactionWithTypeAndKey = new PaymentTransaction();
+        completeTransactionWithTypeAndKey.setPaymentId(initialPayment.getPaymentId());
+        completeTransactionWithTypeAndKey.setTransactionExternalKey(authTransactionExternalKey);
+        final Payment completedPaymentByPaymentId = killBillClient.completePayment(completeTransactionWithTypeAndKey, pluginProperties, createdBy, reason, comment);
+        verifyPayment(account, paymentMethodId, completedPaymentByPaymentId, paymentExternalKey, authTransactionExternalKey, transactionType.toString(), TransactionStatus.SUCCESS.name(), amount, amount, BigDecimal.ZERO, BigDecimal.ZERO, 1, 1);
+    }
+
+
+    @Test(groups = "slow")
+    public void testAuthorizeInvalidCompletionUsingPaymentId() throws Exception {
+        final Account account = createAccountWithDefaultPaymentMethod();
+        final UUID paymentMethodId = account.getPaymentMethodId();
+        final BigDecimal amount = BigDecimal.TEN;
+
+        final ImmutableMap<String, String> pluginProperties = ImmutableMap.of();
+
+        TransactionType transactionType = TransactionType.AUTHORIZE;
+        final String paymentExternalKey = UUID.randomUUID().toString();
+        final String authTransactionExternalKey = UUID.randomUUID().toString();
+
+        final Payment initialPayment = createVerifyTransaction(account, paymentMethodId, paymentExternalKey, authTransactionExternalKey, transactionType, TransactionStatus.SUCCESS.name(), amount, amount, pluginProperties, 1);
+
+        // The payment was already completed
+        final PaymentTransaction completeTransactionByPaymentId = new PaymentTransaction();
+        completeTransactionByPaymentId.setPaymentId(initialPayment.getPaymentId());
+        try {
+            killBillClient.completePayment(completeTransactionByPaymentId, pluginProperties, createdBy, reason, comment);
+            fail("Completion should not succeed, there is no PENDING payment transaction");
+        } catch (final KillBillClientException expected) {
+            // Invalid parameter paymentId: XXXX
+        }
+    }
+
+
+    @Test(groups = "slow")
     public void testCompletionForSubsequentTransaction() throws Exception {
         final Account account = createAccountWithDefaultPaymentMethod();
         final UUID paymentMethodId = account.getPaymentMethodId();
@@ -163,27 +344,7 @@ public class TestPayment extends TestJaxrsBase {
         final Payment refundPayment = killBillClient.refundPayment(refundTransaction, null, pluginProperties, createdBy, reason, comment);
         verifyPaymentWithPendingRefund(account, paymentMethodId, paymentExternalKey, purchaseTransactionExternalKey, purchaseAmount, refundTransactionExternalKey, refundPayment);
 
-        // We cannot complete using just the payment id as JAX-RS doesn't know which transaction to complete
-        try {
-            final PaymentTransaction completeTransactionByPaymentId = new PaymentTransaction();
-            completeTransactionByPaymentId.setPaymentId(refundPayment.getPaymentId());
-            killBillClient.completePayment(completeTransactionByPaymentId, pluginProperties, createdBy, reason, comment);
-            Assert.fail();
-        } catch (final KillBillClientException e) {
-            assertEquals(e.getMessage(), "PaymentTransactionJson transactionType and externalKey need to be set");
-        }
-
-        // We cannot complete using just the payment external key as JAX-RS doesn't know which transaction to complete
-        try {
-            final PaymentTransaction completeTransactionByPaymentExternalKey = new PaymentTransaction();
-            completeTransactionByPaymentExternalKey.setPaymentExternalKey(refundPayment.getPaymentExternalKey());
-            killBillClient.completePayment(completeTransactionByPaymentExternalKey, pluginProperties, createdBy, reason, comment);
-            Assert.fail();
-        } catch (final KillBillClientException e) {
-            assertEquals(e.getMessage(), "PaymentTransactionJson transactionType and externalKey need to be set");
-        }
 
-        // Finally, it should work if we specify the payment id and transaction external key
         final PaymentTransaction completeTransactionWithTypeAndKey = new PaymentTransaction();
         completeTransactionWithTypeAndKey.setPaymentId(refundPayment.getPaymentId());
         completeTransactionWithTypeAndKey.setTransactionExternalKey(refundTransactionExternalKey);
diff --git a/profiles/killpay/pom.xml b/profiles/killpay/pom.xml
index 829b07a..f9da86c 100644
--- a/profiles/killpay/pom.xml
+++ b/profiles/killpay/pom.xml
@@ -20,7 +20,7 @@
     <parent>
         <artifactId>killbill-profiles</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-profiles-killpay</artifactId>

profiles/pom.xml 2(+1 -1)

diff --git a/profiles/pom.xml b/profiles/pom.xml
index 9bf3c9f..d2544f3 100644
--- a/profiles/pom.xml
+++ b/profiles/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-profiles</artifactId>
diff --git a/subscription/pom.xml b/subscription/pom.xml
index 5ae804d..5da6278 100644
--- a/subscription/pom.xml
+++ b/subscription/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-subscription</artifactId>

tenant/pom.xml 2(+1 -1)

diff --git a/tenant/pom.xml b/tenant/pom.xml
index 9662fc8..6a61b24 100644
--- a/tenant/pom.xml
+++ b/tenant/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-tenant</artifactId>

usage/pom.xml 2(+1 -1)

diff --git a/usage/pom.xml b/usage/pom.xml
index d316e77..3e6a370 100644
--- a/usage/pom.xml
+++ b/usage/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-usage</artifactId>

util/pom.xml 32(+31 -1)

diff --git a/util/pom.xml b/util/pom.xml
index 0e250e7..04d9e7b 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.16.3-SNAPSHOT</version>
+        <version>0.16.4-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-util</artifactId>
@@ -111,6 +111,12 @@
             <artifactId>shiro-guice</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.flywaydb</groupId>
+            <artifactId>flyway-core</artifactId>
+            <version>4.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.jdbi</groupId>
             <artifactId>jdbi</artifactId>
         </dependency>
@@ -253,6 +259,30 @@
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>assemble-migrator</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <finalName>killbill</finalName>
+                            <archive>
+                                <manifest>
+                                    <mainClass>org.killbill.billing.util.migration.Migrator</mainClass>
+                                </manifest>
+                            </archive>
+                        </configuration>
+                    </execution>
+                </executions>
+                <configuration>
+                    <descriptor>src/main/assembly/migrator.xml</descriptor>
+                </configuration>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>
diff --git a/util/src/main/assembly/migrator.xml b/util/src/main/assembly/migrator.xml
new file mode 100644
index 0000000..0e7eefe
--- /dev/null
+++ b/util/src/main/assembly/migrator.xml
@@ -0,0 +1,28 @@
+<assembly
+        xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
+    <id>flyway</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/</outputDirectory>
+            <useProjectArtifact>true</useProjectArtifact>
+            <unpack>true</unpack>
+            <scope>test</scope>
+        </dependencySet>
+    </dependencySets>
+    <fileSets>
+        <fileSet>
+            <directory>${project.build.directory}/test-classes</directory>
+            <outputDirectory>/</outputDirectory>
+            <includes>
+                <include>**/*.class</include>
+            </includes>
+            <useDefaultExcludes>true</useDefaultExcludes>
+        </fileSet>
+    </fileSets>
+</assembly>
diff --git a/util/src/test/java/org/flywaydb/core/FlywayWithDryRun.java b/util/src/test/java/org/flywaydb/core/FlywayWithDryRun.java
new file mode 100644
index 0000000..0cf27a2
--- /dev/null
+++ b/util/src/test/java/org/flywaydb/core/FlywayWithDryRun.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2016 Groupon, Inc
+ * Copyright 2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.flywaydb.core;
+
+import java.sql.Connection;
+import java.util.List;
+
+import org.flywaydb.core.api.FlywayException;
+import org.flywaydb.core.api.callback.FlywayCallback;
+import org.flywaydb.core.api.resolver.MigrationResolver;
+import org.flywaydb.core.internal.dbsupport.DbSupport;
+import org.flywaydb.core.internal.dbsupport.Schema;
+import org.flywaydb.core.internal.dbsupport.SqlStatement;
+import org.flywaydb.core.internal.dbsupport.Table;
+import org.flywaydb.core.internal.metadatatable.MetaDataTable;
+import org.flywaydb.core.internal.util.PlaceholderReplacer;
+import org.killbill.billing.util.migration.DbMigrateWithDryRun;
+
+public class FlywayWithDryRun extends Flyway {
+
+    private final List<SqlStatement> sqlStatements;
+
+    public FlywayWithDryRun(final List<SqlStatement> sqlStatements) {
+        this.sqlStatements = sqlStatements;
+    }
+
+    // Note: we assume the schemas have already been created and baseline() has already been called
+    public int dryRunMigrate() throws FlywayException {
+        final PlaceholderReplacer placeholderReplacer = new PlaceholderReplacer(getPlaceholders(),
+                                                                                getPlaceholderPrefix(),
+                                                                                getPlaceholderSuffix());
+        return execute(new Command<Integer>() {
+            public Integer execute(final Connection connectionMetaDataTable,
+                                   final Connection connectionUserObjects,
+                                   final MigrationResolver migrationResolver,
+                                   final MetaDataTable metaDataTable,
+                                   final DbSupport dbSupport,
+                                   final Schema[] schemas,
+                                   final FlywayCallback[] flywayCallbacks) {
+                final Table metaDataDBTable = schemas[0].getTable(getTable());
+
+                final DbMigrateWithDryRun dbMigrate = new DbMigrateWithDryRun(sqlStatements,
+                                                                              placeholderReplacer,
+                                                                              getEncoding(),
+                                                                              metaDataDBTable,
+                                                                              connectionMetaDataTable,
+                                                                              connectionUserObjects,
+                                                                              dbSupport,
+                                                                              metaDataTable,
+                                                                              schemas[0],
+                                                                              migrationResolver,
+                                                                              getTarget(),
+                                                                              isIgnoreFutureMigrations(),
+                                                                              false,
+                                                                              isOutOfOrder(),
+                                                                              flywayCallbacks);
+                return dbMigrate.dryRunMigrate();
+            }
+        });
+    }
+}
diff --git a/util/src/test/java/org/killbill/billing/util/migration/CapturingMetaDataTable.java b/util/src/test/java/org/killbill/billing/util/migration/CapturingMetaDataTable.java
new file mode 100644
index 0000000..164a58a
--- /dev/null
+++ b/util/src/test/java/org/killbill/billing/util/migration/CapturingMetaDataTable.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2016 Groupon, Inc
+ * Copyright 2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.migration;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.flywaydb.core.api.MigrationVersion;
+import org.flywaydb.core.internal.dbsupport.DbSupport;
+import org.flywaydb.core.internal.dbsupport.SqlStatement;
+import org.flywaydb.core.internal.dbsupport.Table;
+import org.flywaydb.core.internal.metadatatable.AppliedMigration;
+import org.flywaydb.core.internal.metadatatable.MetaDataTableImpl;
+
+public class CapturingMetaDataTable extends MetaDataTableImpl {
+
+    private final List<SqlStatement> sqlStatements;
+    private final DbSupport dbSupport;
+    private final Table table;
+
+    /**
+     * Creates a new instance of the metadata table support.
+     *
+     * @param sqlStatements The current list of all pending migrations.
+     * @param dbSupport     Database-specific functionality.
+     * @param table         The metadata table used by flyway.
+     */
+    public CapturingMetaDataTable(final List<SqlStatement> sqlStatements, final DbSupport dbSupport, final Table table) {
+        super(dbSupport, table);
+        this.sqlStatements = sqlStatements;
+        this.dbSupport = dbSupport;
+        this.table = table;
+    }
+
+    @Override
+    public void addAppliedMigration(final AppliedMigration appliedMigration) {
+        final MigrationVersion version = appliedMigration.getVersion();
+        final String versionStr = version == null ? null : version.toString();
+        final int calculateInstalledRank;
+        try {
+            calculateInstalledRank = calculateInstalledRank();
+        } catch (final SQLException e) {
+            throw new RuntimeException(e);
+        }
+
+        final String sql = new StringBuilder().append("INSERT INTO ")
+                                              .append(table)
+                                              .append(" (")
+                                              .append(dbSupport.quote("installed_rank")).append(",")
+                                              .append(dbSupport.quote("version")).append(",")
+                                              .append(dbSupport.quote("description")).append(",")
+                                              .append(dbSupport.quote("type")).append(",")
+                                              .append(dbSupport.quote("script")).append(",")
+                                              .append(dbSupport.quote("checksum")).append(",")
+                                              .append(dbSupport.quote("installed_by")).append(",")
+                                              .append(dbSupport.quote("execution_time")).append(",")
+                                              .append(dbSupport.quote("success"))
+                                              .append(")")
+                                              .append(" VALUES (")
+                                              .append(calculateInstalledRank + appliedMigration.getInstalledRank()).append(",")
+                                              .append("'").append(versionStr).append("',")
+                                              .append("'").append(appliedMigration.getDescription()).append("',")
+                                              .append("'").append(appliedMigration.getType().name()).append("',")
+                                              .append("'").append(appliedMigration.getScript()).append("',")
+                                              .append(appliedMigration.getChecksum()).append(",")
+                                              .append(dbSupport.getCurrentUserFunction()).append(",")
+                                              .append(appliedMigration.getExecutionTime()).append(",")
+                                              .append(appliedMigration.isSuccess())
+                                              .append(")")
+                                              .toString();
+
+        sqlStatements.add(new SqlStatement(0, sql, false));
+    }
+
+    /**
+     * Calculates the installed rank for the new migration to be inserted.
+     *
+     * @return The installed rank.
+     */
+    private int calculateInstalledRank() throws SQLException {
+        final int currentMax = dbSupport.getJdbcTemplate().queryForInt("SELECT MAX(" + dbSupport.quote("installed_rank") + ")" + " FROM " + table);
+        return currentMax + 1;
+    }
+}
diff --git a/util/src/test/java/org/killbill/billing/util/migration/CapturingSqlMigrationExecutor.java b/util/src/test/java/org/killbill/billing/util/migration/CapturingSqlMigrationExecutor.java
new file mode 100644
index 0000000..6360b75
--- /dev/null
+++ b/util/src/test/java/org/killbill/billing/util/migration/CapturingSqlMigrationExecutor.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2016 Groupon, Inc
+ * Copyright 2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.migration;
+
+import java.sql.Connection;
+import java.util.List;
+
+import org.flywaydb.core.api.resolver.MigrationExecutor;
+import org.flywaydb.core.internal.dbsupport.DbSupport;
+import org.flywaydb.core.internal.dbsupport.SqlScript;
+import org.flywaydb.core.internal.dbsupport.SqlStatement;
+import org.flywaydb.core.internal.util.PlaceholderReplacer;
+import org.flywaydb.core.internal.util.scanner.Resource;
+
+public class CapturingSqlMigrationExecutor implements MigrationExecutor {
+
+    private final List<SqlStatement> sqlStatements;
+    private final DbSupport dbSupport;
+    private final PlaceholderReplacer placeholderReplacer;
+    private final Resource sqlScriptResource;
+    private final String encoding;
+
+    /**
+     * Creates a new sql script migration based on this sql script.
+     *
+     * @param sqlStatements       The current list of all pending migrations.
+     * @param dbSupport           The database-specific support.
+     * @param sqlScriptResource   The resource containing the sql script.
+     * @param placeholderReplacer The placeholder replacer to apply to sql migration scripts.
+     * @param encoding            The encoding of this Sql migration.
+     */
+    public CapturingSqlMigrationExecutor(final List<SqlStatement> sqlStatements,
+                                         final DbSupport dbSupport,
+                                         final Resource sqlScriptResource,
+                                         final PlaceholderReplacer placeholderReplacer,
+                                         final String encoding) {
+        this.sqlStatements = sqlStatements;
+        this.dbSupport = dbSupport;
+        this.sqlScriptResource = sqlScriptResource;
+        this.encoding = encoding;
+        this.placeholderReplacer = placeholderReplacer;
+    }
+
+    @Override
+    public void execute(final Connection connection) {
+        final SqlScript sqlScript = new SqlScript(dbSupport, sqlScriptResource, placeholderReplacer, encoding);
+        for (final SqlStatement sqlStatement : sqlScript.getSqlStatements()) {
+            sqlStatements.add(sqlStatement);
+        }
+    }
+
+    @Override
+    public boolean executeInTransaction() {
+        return true;
+    }
+}
diff --git a/util/src/test/java/org/killbill/billing/util/migration/DbMigrateWithDryRun.java b/util/src/test/java/org/killbill/billing/util/migration/DbMigrateWithDryRun.java
new file mode 100644
index 0000000..7b81a39
--- /dev/null
+++ b/util/src/test/java/org/killbill/billing/util/migration/DbMigrateWithDryRun.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2016 Groupon, Inc
+ * Copyright 2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.migration;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.flywaydb.core.api.FlywayException;
+import org.flywaydb.core.api.MigrationInfo;
+import org.flywaydb.core.api.MigrationVersion;
+import org.flywaydb.core.api.callback.FlywayCallback;
+import org.flywaydb.core.api.resolver.MigrationExecutor;
+import org.flywaydb.core.api.resolver.MigrationResolver;
+import org.flywaydb.core.internal.command.DbMigrate;
+import org.flywaydb.core.internal.dbsupport.DbSupport;
+import org.flywaydb.core.internal.dbsupport.DbSupportFactory;
+import org.flywaydb.core.internal.dbsupport.Schema;
+import org.flywaydb.core.internal.dbsupport.SqlStatement;
+import org.flywaydb.core.internal.dbsupport.Table;
+import org.flywaydb.core.internal.info.MigrationInfoImpl;
+import org.flywaydb.core.internal.info.MigrationInfoServiceImpl;
+import org.flywaydb.core.internal.metadatatable.AppliedMigration;
+import org.flywaydb.core.internal.metadatatable.MetaDataTable;
+import org.flywaydb.core.internal.util.PlaceholderReplacer;
+import org.flywaydb.core.internal.util.jdbc.TransactionCallback;
+import org.flywaydb.core.internal.util.jdbc.TransactionTemplate;
+import org.flywaydb.core.internal.util.logging.Log;
+import org.flywaydb.core.internal.util.logging.LogFactory;
+import org.flywaydb.core.internal.util.scanner.filesystem.FileSystemResource;
+
+public class DbMigrateWithDryRun extends DbMigrate {
+
+    private static final Log LOG = LogFactory.getLog(DbMigrateWithDryRun.class);
+
+    private final List<SqlStatement> sqlStatements;
+    private final PlaceholderReplacer placeholderReplacer;
+    private final String encoding;
+    private final MigrationVersion target;
+    private final DbSupport dbSupport;
+    private final MetaDataTable metaDataTableForDryRun;
+    private final Schema schema;
+    private final MigrationResolver migrationResolver;
+    private final Connection connectionMetaDataTable;
+    private final Connection connectionUserObjects;
+    private final boolean outOfOrder;
+    private final FlywayCallback[] callbacks;
+    private final DbSupport dbSupportUserObjects;
+
+    /**
+     * Creates a new database migrator.
+     *
+     * @param sqlStatements               The current list of all pending migrations.
+     * @param placeholderReplacer         The placeholder replacer to apply to sql migration scripts.
+     * @param encoding                    The encoding of Sql migrations.
+     * @param metaDataDBTable             The database metadata DB Table.
+     * @param connectionMetaDataTable     The connection to use.
+     * @param connectionUserObjects       The connection to use to perform the actual database migrations.
+     * @param dbSupport                   Database-specific functionality.
+     * @param metaDataTable               The database metadata table.
+     * @param migrationResolver           The migration resolver.
+     * @param target                      The target version of the migration.
+     * @param ignoreFutureMigrations      Flag whether to ignore future migrations or not.
+     * @param ignoreFailedFutureMigration Flag whether to ignore failed future migrations or not.
+     * @param outOfOrder                  Allows migrations to be run "out of order".
+     */
+    public DbMigrateWithDryRun(final List<SqlStatement> sqlStatements,
+                               final PlaceholderReplacer placeholderReplacer,
+                               final String encoding,
+                               final Table metaDataDBTable,
+                               final Connection connectionMetaDataTable,
+                               final Connection connectionUserObjects,
+                               final DbSupport dbSupport,
+                               final MetaDataTable metaDataTable,
+                               final Schema schema,
+                               final MigrationResolver migrationResolver,
+                               final MigrationVersion target,
+                               final boolean ignoreFutureMigrations,
+                               final boolean ignoreFailedFutureMigration,
+                               final boolean outOfOrder,
+                               final FlywayCallback[] callbacks) {
+        super(connectionMetaDataTable, connectionUserObjects, dbSupport, metaDataTable, schema, migrationResolver, target, ignoreFutureMigrations, ignoreFailedFutureMigration, outOfOrder, callbacks);
+        this.sqlStatements = sqlStatements;
+        this.placeholderReplacer = placeholderReplacer;
+        this.encoding = encoding;
+        this.connectionMetaDataTable = connectionMetaDataTable;
+        this.connectionUserObjects = connectionUserObjects;
+        this.dbSupport = dbSupport;
+        this.schema = schema;
+        this.migrationResolver = migrationResolver;
+        this.target = target;
+        this.outOfOrder = outOfOrder;
+        this.callbacks = callbacks;
+
+        this.dbSupportUserObjects = DbSupportFactory.createDbSupport(connectionUserObjects, false);
+
+        // PIERRE: change MetaDataTable to capture the SQL
+        this.metaDataTableForDryRun = new CapturingMetaDataTable(sqlStatements, dbSupport, metaDataDBTable);
+    }
+
+    public int dryRunMigrate() throws FlywayException {
+        try {
+            for (final FlywayCallback callback : callbacks) {
+                new TransactionTemplate(connectionUserObjects).execute(new TransactionCallback<Object>() {
+                    @Override
+                    public Object doInTransaction() throws SQLException {
+                        dbSupportUserObjects.changeCurrentSchemaTo(schema);
+                        callback.beforeMigrate(connectionUserObjects);
+                        return null;
+                    }
+                });
+            }
+
+            // PIERRE: perform a single query to the metadata table
+            final MigrationInfoServiceImpl infoService = new MigrationInfoServiceImpl(migrationResolver, metaDataTableForDryRun, target, outOfOrder, true, true);
+            infoService.refresh();
+
+            final MigrationInfoImpl[] pendingMigrations = infoService.pending();
+            new TransactionTemplate(connectionMetaDataTable, false).execute(new TransactionCallback<Boolean>() {
+                public Boolean doInTransaction() {
+                    int i = 1;
+                    for (final MigrationInfoImpl migrationInfo : pendingMigrations) {
+                        applyMigration(i, migrationInfo);
+                        i++;
+                    }
+
+                    return true;
+                }
+            });
+
+            for (final FlywayCallback callback : callbacks) {
+                new TransactionTemplate(connectionUserObjects).execute(new TransactionCallback<Object>() {
+                    @Override
+                    public Object doInTransaction() throws SQLException {
+                        dbSupportUserObjects.changeCurrentSchemaTo(schema);
+                        callback.afterMigrate(connectionUserObjects);
+                        return null;
+                    }
+                });
+            }
+
+            return pendingMigrations.length;
+        } finally {
+            dbSupportUserObjects.restoreCurrentSchema();
+        }
+    }
+
+    private void applyMigration(final int installedRnk, final MigrationInfoImpl migration) {
+        final MigrationVersion version = migration.getVersion();
+        final String migrationText;
+        if (version != null) {
+            migrationText = "schema " + schema + " to version " + version + " - " + migration.getDescription();
+        } else {
+            migrationText = "schema " + schema + " with repeatable migration " + migration.getDescription();
+        }
+        LOG.info("Migrating " + migrationText);
+
+        // PIERRE: override the executor to capture the SQL
+        final FileSystemResource sqlScriptResource = new FileSystemResource(migration.getResolvedMigration().getPhysicalLocation());
+        final MigrationExecutor migrationExecutor = new CapturingSqlMigrationExecutor(sqlStatements,
+                                                                                      dbSupport,
+                                                                                      sqlScriptResource,
+                                                                                      placeholderReplacer,
+                                                                                      encoding);
+        try {
+            doMigrate(migration, migrationExecutor, migrationText);
+        } catch (final SQLException e) {
+            throw new FlywayException("Unable to apply migration", e);
+        }
+
+        final AppliedMigration appliedMigration = new AppliedMigration(installedRnk,
+                                                                       version,
+                                                                       migration.getDescription(),
+                                                                       migration.getType(),
+                                                                       migration.getScript(),
+                                                                       migration.getResolvedMigration().getChecksum(),
+                                                                       null,
+                                                                       null,
+                                                                       -1,
+                                                                       true);
+        metaDataTableForDryRun.addAppliedMigration(appliedMigration);
+    }
+
+    private void doMigrate(final MigrationInfo migration, final MigrationExecutor migrationExecutor, final String migrationText) throws SQLException {
+        for (final FlywayCallback callback : callbacks) {
+            dbSupportUserObjects.changeCurrentSchemaTo(schema);
+            callback.beforeEachMigrate(connectionUserObjects, migration);
+        }
+
+        dbSupportUserObjects.changeCurrentSchemaTo(schema);
+        migrationExecutor.execute(connectionUserObjects);
+        LOG.debug("Successfully completed migration of " + migrationText);
+
+        for (final FlywayCallback callback : callbacks) {
+            dbSupportUserObjects.changeCurrentSchemaTo(schema);
+            callback.afterEachMigrate(connectionUserObjects, migration);
+        }
+    }
+}
diff --git a/util/src/test/java/org/killbill/billing/util/migration/Migrator.java b/util/src/test/java/org/killbill/billing/util/migration/Migrator.java
new file mode 100644
index 0000000..0b5b6e8
--- /dev/null
+++ b/util/src/test/java/org/killbill/billing/util/migration/Migrator.java
@@ -0,0 +1,540 @@
+/*
+ * Copyright 2016 Groupon, Inc
+ * Copyright 2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.migration;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.flywaydb.core.FlywayWithDryRun;
+import org.flywaydb.core.api.FlywayException;
+import org.flywaydb.core.internal.dbsupport.SqlStatement;
+import org.flywaydb.core.internal.info.MigrationInfoDumper;
+import org.flywaydb.core.internal.util.ClassUtils;
+import org.flywaydb.core.internal.util.FileCopyUtils;
+import org.flywaydb.core.internal.util.StringUtils;
+import org.flywaydb.core.internal.util.VersionPrinter;
+import org.flywaydb.core.internal.util.logging.Log;
+import org.flywaydb.core.internal.util.logging.LogFactory;
+import org.flywaydb.core.internal.util.logging.console.ConsoleLog.Level;
+import org.flywaydb.core.internal.util.logging.console.ConsoleLogCreator;
+import org.flywaydb.core.internal.util.scanner.classpath.ClassPathResource;
+
+// Copied over from org.flywaydb.commandline.Main (not easily extensible unfortunately) to support dry-run
+public class Migrator {
+
+    /**
+     * The property name for the directory containing a list of jars to load on the classpath.
+     */
+    private static final String PROPERTY_JAR_DIRS = "flyway.jarDirs";
+    private static Log LOG;
+
+    /**
+     * Initializes the logging.
+     *
+     * @param level The minimum level to log at.
+     */
+    private static void initLogging(final Level level) {
+        LogFactory.setLogCreator(new ConsoleLogCreator(level));
+        LOG = LogFactory.getLog(Migrator.class);
+    }
+
+    /**
+     * Main method.
+     *
+     * @param args The command-line arguments.
+     */
+    public static void main(final String[] args) {
+        final Level logLevel = getLogLevel(args);
+        initLogging(logLevel);
+
+        try {
+            if (isPrintVersionAndExit(args)) {
+                printVersion();
+                System.exit(0);
+            }
+
+            final List<String> operations = determineOperations(args);
+            if (operations.isEmpty()) {
+                printUsage();
+                return;
+            }
+
+            final Properties properties = new Properties();
+            initializeDefaults(properties);
+            loadConfiguration(properties, args);
+            overrideConfiguration(properties, args);
+            dumpConfiguration(properties);
+
+            loadJdbcDrivers();
+            loadJavaMigrationsFromJarDirs(properties);
+
+            final List<SqlStatement> sqlStatements = new LinkedList<SqlStatement>();
+            final FlywayWithDryRun flyway = new FlywayWithDryRun(sqlStatements);
+            filterProperties(properties);
+            flyway.configure(properties);
+
+            for (final String operation : operations) {
+                executeOperation(flyway, operation, sqlStatements);
+            }
+        } catch (final Exception e) {
+            if (logLevel == Level.DEBUG) {
+                LOG.error("Unexpected error", e);
+            } else {
+                if (e instanceof FlywayException) {
+                    LOG.error(e.getMessage());
+                } else {
+                    LOG.error(e.toString());
+                }
+            }
+            System.exit(1);
+        }
+    }
+
+    private static boolean isPrintVersionAndExit(final String[] args) {
+        for (final String arg : args) {
+            if ("-v".equals(arg)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Executes this operation on this Flyway instance.
+     *
+     * @param flyway        The Flyway instance.
+     * @param operation     The operation to execute.
+     * @param sqlStatements The current list of all pending migrations.
+     */
+    private static void executeOperation(final FlywayWithDryRun flyway, final String operation, final Iterable<SqlStatement> sqlStatements) {
+        if ("clean".equals(operation)) {
+            flyway.clean();
+        } else if ("baseline".equals(operation)) {
+            flyway.baseline();
+        } else if ("migrate".equals(operation)) {
+            flyway.migrate();
+        } else if ("dryRunMigrate".equals(operation)) {
+            flyway.dryRunMigrate();
+
+            final StringBuilder stringBuilder = new StringBuilder("BEGIN;\n");
+            for (final SqlStatement sqlStatement : sqlStatements) {
+                stringBuilder.append(sqlStatement.getSql())
+                             .append(";\n");
+            }
+            stringBuilder.append("COMMIT;");
+            LOG.info("\n" + stringBuilder.toString());
+        } else if ("validate".equals(operation)) {
+            flyway.validate();
+        } else if ("info".equals(operation)) {
+            LOG.info("\n" + MigrationInfoDumper.dumpToAsciiTable(flyway.info().all()));
+        } else if ("repair".equals(operation)) {
+            flyway.repair();
+        } else {
+            LOG.error("Invalid operation: " + operation);
+            printUsage();
+            System.exit(1);
+        }
+    }
+
+    /**
+     * Checks the desired log level.
+     *
+     * @param args The command-line arguments.
+     * @return The desired log level.
+     */
+    private static Level getLogLevel(final String[] args) {
+        for (final String arg : args) {
+            if ("-X".equals(arg)) {
+                return Level.DEBUG;
+            }
+            if ("-q".equals(arg)) {
+                return Level.WARN;
+            }
+        }
+        return Level.INFO;
+    }
+
+    /**
+     * Initializes the properties with the default configuration for the command-line tool.
+     *
+     * @param properties The properties object to initialize.
+     */
+    private static void initializeDefaults(final Properties properties) {
+        properties.put("flyway.locations", "filesystem:" + new File(getInstallationDir(), "sql").getAbsolutePath());
+        properties.put(PROPERTY_JAR_DIRS, new File(getInstallationDir(), "jars").getAbsolutePath());
+    }
+
+    /**
+     * Filters there properties to remove the Flyway Commandline-specific ones.
+     *
+     * @param properties The properties to filter.
+     */
+    private static void filterProperties(final Properties properties) {
+        properties.remove(PROPERTY_JAR_DIRS);
+        properties.remove("flyway.configFile");
+        properties.remove("flyway.configFileEncoding");
+    }
+
+    /**
+     * Prints the version number on the console.
+     *
+     * @throws IOException when the version could not be read.
+     */
+    private static void printVersion() throws IOException {
+        final String version = new ClassPathResource("org/flywaydb/core/internal/version.txt", VersionPrinter.class.getClassLoader()).loadAsString("UTF-8");
+        LOG.info("Flyway " + version + " for Kill Bill");
+
+        LOG.debug("Java " + System.getProperty("java.version") + " (" + System.getProperty("java.vendor") + ")");
+        LOG.debug(System.getProperty("os.name") + " " + System.getProperty("os.version") + " " + System.getProperty("os.arch") + "\n");
+    }
+
+    /**
+     * Prints the usage instructions on the console.
+     */
+    private static void printUsage() {
+        LOG.info("Usage");
+        LOG.info("=====");
+        LOG.info("");
+        LOG.info("flyway [options] command");
+        LOG.info("");
+        LOG.info("By default, the configuration will be read from conf/flyway.conf.");
+        LOG.info("Options passed from the command-line override the configuration.");
+        LOG.info("");
+        LOG.info("Commands");
+        LOG.info("--------");
+        LOG.info("migrate        : Migrates the database");
+        LOG.info("dryRunMigrate  : Migrates the database (dry-run)");
+        LOG.info("clean          : Drops all objects in the configured schemas");
+        LOG.info("info           : Prints the information about applied, current and pending migrations");
+        LOG.info("validate       : Validates the applied migrations against the ones on the classpath");
+        LOG.info("baseline       : Baselines an existing database at the baselineVersion");
+        LOG.info("repair         : Repairs the metadata table");
+        LOG.info("");
+        LOG.info("Options (Format: -key=value)");
+        LOG.info("-------");
+        LOG.info("driver                       : Fully qualified classname of the jdbc driver");
+        LOG.info("url                          : Jdbc url to use to connect to the database");
+        LOG.info("user                         : User to use to connect to the database");
+        LOG.info("password                     : Password to use to connect to the database");
+        LOG.info("schemas                      : Comma-separated list of the schemas managed by Flyway");
+        LOG.info("table                        : Name of Flyway's metadata table");
+        LOG.info("locations                    : Classpath locations to scan recursively for migrations");
+        LOG.info("resolvers                    : Comma-separated list of custom MigrationResolvers");
+        LOG.info("skipDefaultResolvers         : Skips default resolvers (jdbc, sql and Spring-jdbc)");
+        LOG.info("sqlMigrationPrefix           : File name prefix for sql migrations");
+        LOG.info("repeatableSqlMigrationPrefix : File name prefix for repeatable sql migrations");
+        LOG.info("sqlMigrationSeparator        : File name separator for sql migrations");
+        LOG.info("sqlMigrationSuffix           : File name suffix for sql migrations");
+        LOG.info("encoding                     : Encoding of sql migrations");
+        LOG.info("placeholderReplacement       : Whether placeholders should be replaced");
+        LOG.info("placeholders                 : Placeholders to replace in sql migrations");
+        LOG.info("placeholderPrefix            : Prefix of every placeholder");
+        LOG.info("placeholderSuffix            : Suffix of every placeholder");
+        LOG.info("target                       : Target version up to which Flyway should use migrations");
+        LOG.info("outOfOrder                   : Allows migrations to be run \"out of order\"");
+        LOG.info("callbacks                    : Comma-separated list of FlywayCallback classes");
+        LOG.info("skipDefaultCallbacks         : Skips default callbacks (sql)");
+        LOG.info("validateOnMigrate            : Validate when running migrate");
+        LOG.info("ignoreFutureMigrations       : Allow future migrations when validating");
+        LOG.info("cleanOnValidationError       : Automatically clean on a validation error");
+        LOG.info("cleanDisabled                : Whether to disable clean");
+        LOG.info("baselineVersion              : Version to tag schema with when executing baseline");
+        LOG.info("baselineDescription          : Description to tag schema with when executing baseline");
+        LOG.info("baselineOnMigrate            : Baseline on migrate against uninitialized non-empty schema");
+        LOG.info("configFile                   : Config file to use (default: conf/flyway.properties)");
+        LOG.info("configFileEncoding           : Encoding of the config file (default: UTF-8)");
+        LOG.info("jarDirs                      : Dirs for Jdbc drivers & Java migrations (default: jars)");
+        LOG.info("");
+        LOG.info("Add -X to print debug output");
+        LOG.info("Add -q to suppress all output, except for errors and warnings");
+        LOG.info("Add -v to print the Flyway version and exit");
+        LOG.info("");
+        LOG.info("Example");
+        LOG.info("-------");
+        LOG.info("flyway -user=myuser -password=s3cr3t -url=jdbc:h2:mem -placeholders.abc=def migrate");
+        LOG.info("");
+        LOG.info("More info at https://flywaydb.org/documentation/commandline");
+    }
+
+    /**
+     * Loads all the driver jars contained in the drivers folder. (For Jdbc drivers)
+     *
+     * @throws IOException When the jars could not be loaded.
+     */
+    private static void loadJdbcDrivers() throws IOException {
+        final File driversDir = new File(getInstallationDir(), "drivers");
+        final File[] files = driversDir.listFiles(new FilenameFilter() {
+            public boolean accept(final File dir, final String name) {
+                return name.endsWith(".jar");
+            }
+        });
+
+        // see javadoc of listFiles(): null if given path is not a real directory
+        if (files == null) {
+            return;
+        }
+
+        for (final File file : files) {
+            addJarOrDirectoryToClasspath(file.getPath());
+        }
+    }
+
+    /**
+     * Loads all the jars contained in the jars folder. (For Java Migrations)
+     * This will also indirectly load custom driver jars.
+     *
+     * @param properties The configured properties.
+     * @throws IOException When the jars could not be loaded.
+     */
+    private static void loadJavaMigrationsFromJarDirs(final Properties properties) throws IOException {
+        String jarDirs = properties.getProperty(PROPERTY_JAR_DIRS);
+        if (!StringUtils.hasLength(jarDirs)) {
+            return;
+        }
+
+        jarDirs = jarDirs.replace(File.pathSeparator, ",");
+        final String[] dirs = StringUtils.tokenizeToStringArray(jarDirs, ",");
+
+        for (final String dirName : dirs) {
+            final File dir = new File(dirName);
+            final File[] files = dir.listFiles(new FilenameFilter() {
+                public boolean accept(final File dir, final String name) {
+                    return name.endsWith(".jar");
+                }
+            });
+
+            // see javadoc of listFiles(): null if given path is not a real directory
+            if (files == null) {
+                continue;
+            }
+
+            for (final File file : files) {
+                addJarOrDirectoryToClasspath(file.getPath());
+            }
+        }
+    }
+
+    /**
+     * Adds a jar or a directory with this name to the classpath.
+     *
+     * @param name The name of the jar or directory to add.
+     * @throws IOException when the jar or directory could not be found.
+     */
+    private static void addJarOrDirectoryToClasspath(final String name) throws IOException {
+        LOG.debug("Adding location to classpath: " + name);
+
+        try {
+            final URL url = new File(name).toURI().toURL();
+            final URLClassLoader sysloader = (URLClassLoader) ClassLoader.getSystemClassLoader();
+            final Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
+            method.setAccessible(true);
+            method.invoke(sysloader, url);
+        } catch (final Exception e) {
+            throw new FlywayException("Unable to load " + name, e);
+        }
+    }
+
+    /**
+     * Loads the configuration from the various possible locations.
+     *
+     * @param properties The properties object to load to configuration into.
+     * @param args       The command-line arguments passed in.
+     */
+    private static void loadConfiguration(final Properties properties, final String[] args) {
+        final String encoding = determineConfigurationFileEncoding(args);
+
+        loadConfigurationFile(properties, getInstallationDir() + "/conf/flyway.conf", encoding, false);
+        loadConfigurationFile(properties, System.getProperty("user.home") + "/flyway.conf", encoding, false);
+        loadConfigurationFile(properties, "flyway.conf", encoding, false);
+
+        final String configFile = determineConfigurationFileArgument(args);
+        if (configFile != null) {
+            loadConfigurationFile(properties, configFile, encoding, true);
+        }
+    }
+
+    /**
+     * Loads the configuration from the configuration file. If a configuration file is specified using the -configfile
+     * argument it will be used, otherwise the default config file (conf/flyway.properties) will be loaded.
+     *
+     * @param properties    The properties object to load to configuration into.
+     * @param file          The configuration file to load.
+     * @param encoding      The encoding of the configuration file.
+     * @param failIfMissing Whether to fail if the file is missing.
+     * @return Whether the file was loaded successfully.
+     * @throws FlywayException when the configuration file could not be loaded.
+     */
+    private static boolean loadConfigurationFile(final Properties properties, final String file, final String encoding, final boolean failIfMissing) throws FlywayException {
+        final File configFile = new File(file);
+        final String errorMessage = "Unable to load config file: " + configFile.getAbsolutePath();
+
+        if (!configFile.isFile() || !configFile.canRead()) {
+            if (!failIfMissing) {
+                LOG.debug(errorMessage);
+                return false;
+            }
+            throw new FlywayException(errorMessage);
+        }
+
+        LOG.debug("Loading config file: " + configFile.getAbsolutePath());
+        try {
+            final String contents = FileCopyUtils.copyToString(new InputStreamReader(new FileInputStream(configFile), encoding));
+            properties.load(new StringReader(contents.replace("\\", "\\\\")));
+            return true;
+        } catch (final IOException e) {
+            throw new FlywayException(errorMessage, e);
+        }
+    }
+
+    /**
+     * Dumps the configuration to the console when debug output is activated.
+     *
+     * @param properties The configured properties.
+     */
+    private static void dumpConfiguration(final Properties properties) {
+        LOG.debug("Using configuration:");
+        for (final Map.Entry<Object, Object> entry : properties.entrySet()) {
+            String value = entry.getValue().toString();
+            value = "flyway.password".equals(entry.getKey()) ? StringUtils.trimOrPad("", value.length(), '*') : value;
+            LOG.debug(entry.getKey() + " -> " + value);
+        }
+    }
+
+    /**
+     * Determines the file to use for loading the configuration.
+     *
+     * @param args The command-line arguments passed in.
+     * @return The path of the configuration file on disk.
+     */
+    private static String determineConfigurationFileArgument(final String[] args) {
+        for (final String arg : args) {
+            if (isPropertyArgument(arg) && "configFile".equals(getArgumentProperty(arg))) {
+                return getArgumentValue(arg);
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * @return The installation directory of the Flyway Command-line tool.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private static String getInstallationDir() {
+        final String path = ClassUtils.getLocationOnDisk(Migrator.class);
+        return new File(path).getParentFile().getParentFile().getAbsolutePath();
+    }
+
+    /**
+     * Determines the encoding to use for loading the configuration.
+     *
+     * @param args The command-line arguments passed in.
+     * @return The encoding. (default: UTF-8)
+     */
+    private static String determineConfigurationFileEncoding(final String[] args) {
+        for (final String arg : args) {
+            if (isPropertyArgument(arg) && "configFileEncoding".equals(getArgumentProperty(arg))) {
+                return getArgumentValue(arg);
+            }
+        }
+
+        return "UTF-8";
+    }
+
+    /**
+     * Overrides the configuration from the config file with the properties passed in directly from the command-line.
+     *
+     * @param properties The properties to override.
+     * @param args       The command-line arguments that were passed in.
+     */
+    private static void overrideConfiguration(final Properties properties, final String[] args) {
+        for (final String arg : args) {
+            if (isPropertyArgument(arg)) {
+                properties.put("flyway." + getArgumentProperty(arg), getArgumentValue(arg));
+            }
+        }
+    }
+
+    /**
+     * Checks whether this command-line argument tries to set a property.
+     *
+     * @param arg The command-line argument to check.
+     * @return {@code true} if it does, {@code false} if not.
+     */
+    private static boolean isPropertyArgument(final String arg) {
+        return arg.startsWith("-") && arg.contains("=");
+    }
+
+    /**
+     * Retrieves the property this command-line argument tries to assign.
+     *
+     * @param arg The command-line argument to check, typically in the form -key=value.
+     * @return The property.
+     */
+    private static String getArgumentProperty(final String arg) {
+        final int index = arg.indexOf("=");
+
+        return arg.substring(1, index);
+    }
+
+    /**
+     * Retrieves the value this command-line argument tries to assign.
+     *
+     * @param arg The command-line argument to check, typically in the form -key=value.
+     * @return The value or an empty string if no value is assigned.
+     */
+    private static String getArgumentValue(final String arg) {
+        final int index = arg.indexOf("=");
+
+        if ((index < 0) || (index == arg.length())) {
+            return "";
+        }
+
+        return arg.substring(index + 1);
+    }
+
+    /**
+     * Determine the operations Flyway should execute.
+     *
+     * @param args The command-line arguments passed in.
+     * @return The operations. An empty list if none.
+     */
+    private static List<String> determineOperations(final String[] args) {
+        final List<String> operations = new ArrayList<String>();
+
+        for (final String arg : args) {
+            if (!arg.startsWith("-")) {
+                operations.add(arg);
+            }
+        }
+
+        return operations;
+    }
+}