killbill-memoizeit
Changes
payment/src/main/java/org/killbill/billing/payment/core/sm/PluginControlPaymentAutomatonRunner.java 13(+5 -8)
Details
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
index 3b06de1..2c21d9c 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
@@ -22,12 +22,11 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
-import javax.inject.Named;
import org.joda.time.DateTime;
import org.killbill.billing.events.PaymentInternalEvent;
+import org.killbill.billing.payment.core.PaymentExecutors;
import org.killbill.billing.payment.glue.DefaultPaymentService;
-import org.killbill.billing.payment.glue.PaymentModule;
import org.killbill.billing.util.config.PaymentConfig;
import org.killbill.notificationq.api.NotificationEvent;
import org.killbill.notificationq.api.NotificationQueue;
@@ -49,27 +48,28 @@ public class Janitor {
public static final String QUEUE_NAME = "janitor";
private final NotificationQueueService notificationQueueService;
- private final ScheduledExecutorService janitorExecutor;
private final PaymentConfig paymentConfig;
private final IncompletePaymentAttemptTask incompletePaymentAttemptTask;
private final IncompletePaymentTransactionTask incompletePaymentTransactionTask;
+ private final PaymentExecutors paymentExecutors;
private NotificationQueue janitorQueue;
+ private ScheduledExecutorService janitorExecutor;
private volatile boolean isStopped;
@Inject
public Janitor(final PaymentConfig paymentConfig,
final NotificationQueueService notificationQueueService,
- @Named(PaymentModule.JANITOR_EXECUTOR_NAMED) final ScheduledExecutorService janitorExecutor,
+ final PaymentExecutors paymentExecutors,
final IncompletePaymentAttemptTask incompletePaymentAttemptTask,
final IncompletePaymentTransactionTask incompletePaymentTransactionTask) {
this.notificationQueueService = notificationQueueService;
- this.janitorExecutor = janitorExecutor;
+ this.paymentExecutors = paymentExecutors;
this.paymentConfig = paymentConfig;
this.incompletePaymentAttemptTask = incompletePaymentAttemptTask;
this.incompletePaymentTransactionTask = incompletePaymentTransactionTask;
- this.isStopped = false;
+
}
public void initialize() throws NotificationQueueAlreadyExists {
@@ -95,10 +95,10 @@ public class Janitor {
}
public void start() {
- if (isStopped) {
- log.warn("Janitor is not a restartable service, and was already started, aborting");
- return;
- }
+
+ this.isStopped = false;
+
+ janitorExecutor = paymentExecutors.getJanitorExecutorService();
janitorQueue.startQueue();
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PaymentExecutors.java b/payment/src/main/java/org/killbill/billing/payment/core/PaymentExecutors.java
new file mode 100644
index 0000000..bd4ca08
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PaymentExecutors.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.commons.concurrent.Executors;
+import org.killbill.commons.concurrent.WithProfilingThreadPoolExecutor;
+
+public class PaymentExecutors {
+
+ private static final long TIMEOUT_EXECUTOR_SEC = 3L;
+
+ private static final String PLUGIN_THREAD_PREFIX = "Plugin-th-";
+ private static final String PAYMENT_PLUGIN_TH_GROUP_NAME = "pay-plugin-grp";
+
+ public static final String JANITOR_EXECUTOR_NAMED = "JanitorExecutor";
+ public static final String PLUGIN_EXECUTOR_NAMED = "PluginExecutor";
+
+ private final PaymentConfig paymentConfig;
+
+ private volatile ExecutorService pluginExecutorService;
+ private volatile ScheduledExecutorService janitorExecutorService;
+
+ @Inject
+ public PaymentExecutors(PaymentConfig paymentConfig) {
+ this.paymentConfig = paymentConfig;
+
+ }
+
+ public void initialize() {
+ this.pluginExecutorService = createPluginExecutorService();
+ this.janitorExecutorService = createJanitorExecutorService();
+ }
+
+
+ public void stop() throws InterruptedException {
+ pluginExecutorService.shutdownNow();
+ janitorExecutorService.shutdownNow();
+
+ pluginExecutorService.awaitTermination(TIMEOUT_EXECUTOR_SEC, TimeUnit.SECONDS);
+ pluginExecutorService = null;
+
+ janitorExecutorService.awaitTermination(TIMEOUT_EXECUTOR_SEC, TimeUnit.SECONDS);
+ janitorExecutorService = null;
+ }
+
+ public ExecutorService getPluginExecutorService() {
+ return pluginExecutorService;
+ }
+
+ public ScheduledExecutorService getJanitorExecutorService() {
+ return janitorExecutorService;
+ }
+
+ private ExecutorService createPluginExecutorService() {
+ return new WithProfilingThreadPoolExecutor(paymentConfig.getPaymentPluginThreadNb(),
+ paymentConfig.getPaymentPluginThreadNb(),
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactory() {
+
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread th = new Thread(new ThreadGroup(PAYMENT_PLUGIN_TH_GROUP_NAME), r);
+ th.setName(PLUGIN_THREAD_PREFIX + th.getId());
+ return th;
+ }
+ });
+
+ }
+
+ private ScheduledExecutorService createJanitorExecutorService() {
+ return Executors.newSingleThreadScheduledExecutor("PaymentJanitor");
+ }
+}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PaymentGatewayProcessor.java b/payment/src/main/java/org/killbill/billing/payment/core/PaymentGatewayProcessor.java
index dbc7541..cf9e41f 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/PaymentGatewayProcessor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PaymentGatewayProcessor.java
@@ -19,7 +19,6 @@ package org.killbill.billing.payment.core;
import java.util.UUID;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
@@ -47,13 +46,8 @@ import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.config.PaymentConfig;
import org.killbill.clock.Clock;
import org.killbill.commons.locker.GlobalLocker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
-import com.google.inject.name.Named;
-
-import static org.killbill.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
// We don't take any lock here because the call needs to be re-entrant
// from the plugin: for example, the BitPay plugin will create the payment during the
@@ -65,8 +59,6 @@ public class PaymentGatewayProcessor extends ProcessorBase {
private final PluginDispatcher<HostedPaymentPageFormDescriptor> paymentPluginFormDispatcher;
private final PluginDispatcher<GatewayNotification> paymentPluginNotificationDispatcher;
- private static final Logger log = LoggerFactory.getLogger(PaymentGatewayProcessor.class);
-
@Inject
public PaymentGatewayProcessor(final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry,
final AccountInternalApi accountUserApi,
@@ -75,13 +67,13 @@ public class PaymentGatewayProcessor extends ProcessorBase {
final PaymentDao paymentDao,
final GlobalLocker locker,
final PaymentConfig paymentConfig,
- @Named(PLUGIN_EXECUTOR_NAMED) final ExecutorService executor,
+ final PaymentExecutors executors,
final InternalCallContextFactory internalCallContextFactory,
final Clock clock) {
- super(pluginRegistry, accountUserApi, paymentDao, tagUserApi, locker, executor, internalCallContextFactory, invoiceApi, clock);
+ super(pluginRegistry, accountUserApi, paymentDao, tagUserApi, locker, internalCallContextFactory, invoiceApi, clock);
final long paymentPluginTimeoutSec = TimeUnit.SECONDS.convert(paymentConfig.getPaymentPluginTimeout().getPeriod(), paymentConfig.getPaymentPluginTimeout().getUnit());
- this.paymentPluginFormDispatcher = new PluginDispatcher<HostedPaymentPageFormDescriptor>(paymentPluginTimeoutSec, executor);
- this.paymentPluginNotificationDispatcher = new PluginDispatcher<GatewayNotification>(paymentPluginTimeoutSec, executor);
+ this.paymentPluginFormDispatcher = new PluginDispatcher<HostedPaymentPageFormDescriptor>(paymentPluginTimeoutSec, executors);
+ this.paymentPluginNotificationDispatcher = new PluginDispatcher<GatewayNotification>(paymentPluginTimeoutSec, executors);
}
public GatewayNotification processNotification(final String notification, final String pluginName, final Iterable<PluginProperty> properties, final CallContext callContext) throws PaymentApiException {
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PaymentMethodProcessor.java b/payment/src/main/java/org/killbill/billing/payment/core/PaymentMethodProcessor.java
index 4babea1..102c3d2 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/PaymentMethodProcessor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PaymentMethodProcessor.java
@@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -52,6 +51,7 @@ import org.killbill.billing.payment.provider.DefaultNoOpPaymentMethodPlugin;
import org.killbill.billing.payment.provider.DefaultPaymentMethodInfoPlugin;
import org.killbill.billing.payment.provider.ExternalPaymentProviderPlugin;
import org.killbill.billing.tag.TagInternalApi;
+import org.killbill.billing.util.UUIDs;
import org.killbill.billing.util.callcontext.CallContext;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.TenantContext;
@@ -69,10 +69,7 @@ import com.google.common.base.Objects;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
-import com.google.inject.name.Named;
-import static org.killbill.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
-import org.killbill.billing.util.UUIDs;
import static org.killbill.billing.util.entity.dao.DefaultPaginationHelper.getEntityPagination;
import static org.killbill.billing.util.entity.dao.DefaultPaginationHelper.getEntityPaginationFromPlugins;
@@ -90,12 +87,12 @@ public class PaymentMethodProcessor extends ProcessorBase {
final TagInternalApi tagUserApi,
final GlobalLocker locker,
final PaymentConfig paymentConfig,
- @Named(PLUGIN_EXECUTOR_NAMED) final ExecutorService executor,
+ final PaymentExecutors executors,
final InternalCallContextFactory internalCallContextFactory,
final Clock clock) {
- super(pluginRegistry, accountInternalApi, paymentDao, tagUserApi, locker, executor, internalCallContextFactory, invoiceApi, clock);
+ super(pluginRegistry, accountInternalApi, paymentDao, tagUserApi, locker, internalCallContextFactory, invoiceApi, clock);
final long paymentPluginTimeoutSec = TimeUnit.SECONDS.convert(paymentConfig.getPaymentPluginTimeout().getPeriod(), paymentConfig.getPaymentPluginTimeout().getUnit());
- this.uuidPluginNotificationDispatcher = new PluginDispatcher<UUID>(paymentPluginTimeoutSec, executor);
+ this.uuidPluginNotificationDispatcher = new PluginDispatcher<UUID>(paymentPluginTimeoutSec, executors);
}
public UUID addPaymentMethod(final String paymentMethodExternalKey, final String paymentPluginServiceName, final Account account,
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
index 3326bd3..7b39cf1 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
@@ -27,7 +27,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import javax.inject.Inject;
@@ -75,9 +74,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
-import com.google.inject.name.Named;
-import static org.killbill.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
import static org.killbill.billing.util.entity.dao.DefaultPaginationHelper.getEntityPagination;
import static org.killbill.billing.util.entity.dao.DefaultPaginationHelper.getEntityPaginationFromPlugins;
@@ -98,11 +95,10 @@ public class PaymentProcessor extends ProcessorBase {
final PaymentDao paymentDao,
final InternalCallContextFactory internalCallContextFactory,
final GlobalLocker locker,
- @Named(PLUGIN_EXECUTOR_NAMED) final ExecutorService executor,
final PaymentAutomatonRunner paymentAutomatonRunner,
final IncompletePaymentTransactionTask incompletePaymentTransactionTask,
final Clock clock) {
- super(pluginRegistry, accountUserApi, paymentDao, tagUserApi, locker, executor, internalCallContextFactory, invoiceApi, clock);
+ super(pluginRegistry, accountUserApi, paymentDao, tagUserApi, locker, internalCallContextFactory, invoiceApi, clock);
this.paymentAutomatonRunner = paymentAutomatonRunner;
this.incompletePaymentTransactionTask = incompletePaymentTransactionTask;
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PluginControlPaymentProcessor.java b/payment/src/main/java/org/killbill/billing/payment/core/PluginControlPaymentProcessor.java
index fd66fe9..96a6eda 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/PluginControlPaymentProcessor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PluginControlPaymentProcessor.java
@@ -21,7 +21,6 @@ import java.math.BigDecimal;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import javax.inject.Inject;
@@ -55,9 +54,6 @@ import org.killbill.commons.locker.GlobalLocker;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
-import com.google.inject.name.Named;
-
-import static org.killbill.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
public class PluginControlPaymentProcessor extends ProcessorBase {
@@ -73,12 +69,11 @@ public class PluginControlPaymentProcessor extends ProcessorBase {
final TagInternalApi tagUserApi,
final PaymentDao paymentDao,
final GlobalLocker locker,
- @Named(PLUGIN_EXECUTOR_NAMED) final ExecutorService executor,
final InternalCallContextFactory internalCallContextFactory,
final PluginControlPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner,
final PaymentControlStateMachineHelper paymentControlStateMachineHelper,
final Clock clock) {
- super(pluginRegistry, accountInternalApi, paymentDao, tagUserApi, locker, executor, internalCallContextFactory, invoiceApi, clock);
+ super(pluginRegistry, accountInternalApi, paymentDao, tagUserApi, locker, internalCallContextFactory, invoiceApi, clock);
this.paymentControlStateMachineHelper = paymentControlStateMachineHelper;
this.pluginControlledPaymentAutomatonRunner = pluginControlledPaymentAutomatonRunner;
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java b/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java
index 5a23cab..e657269 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java
@@ -37,11 +37,8 @@ import org.killbill.billing.callcontext.InternalTenantContext;
import org.killbill.billing.invoice.api.InvoiceInternalApi;
import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.api.PaymentApiException;
-import org.killbill.billing.payment.api.TransactionStatus;
import org.killbill.billing.payment.dao.PaymentDao;
import org.killbill.billing.payment.dao.PaymentMethodModelDao;
-import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
-import org.killbill.billing.payment.dispatcher.CallableWithRequestData;
import org.killbill.billing.payment.dispatcher.PluginDispatcher;
import org.killbill.billing.payment.dispatcher.PluginDispatcher.PluginDispatcherReturnType;
import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
@@ -57,15 +54,12 @@ import org.killbill.clock.Clock;
import org.killbill.commons.locker.GlobalLock;
import org.killbill.commons.locker.GlobalLocker;
import org.killbill.commons.locker.LockFailedException;
-import org.killbill.commons.request.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.base.Objects;
-import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
-import com.google.common.collect.Iterables;
public abstract class ProcessorBase {
@@ -74,7 +68,6 @@ public abstract class ProcessorBase {
protected final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry;
protected final AccountInternalApi accountInternalApi;
protected final GlobalLocker locker;
- protected final ExecutorService executor;
protected final PaymentDao paymentDao;
protected final InternalCallContextFactory internalCallContextFactory;
protected final TagInternalApi tagInternalApi;
@@ -88,7 +81,6 @@ public abstract class ProcessorBase {
final PaymentDao paymentDao,
final TagInternalApi tagInternalApi,
final GlobalLocker locker,
- final ExecutorService executor,
final InternalCallContextFactory internalCallContextFactory,
final InvoiceInternalApi invoiceApi,
final Clock clock) {
@@ -96,7 +88,6 @@ public abstract class ProcessorBase {
this.accountInternalApi = accountInternalApi;
this.paymentDao = paymentDao;
this.locker = locker;
- this.executor = executor;
this.tagInternalApi = tagInternalApi;
this.internalCallContextFactory = internalCallContextFactory;
this.invoiceApi = invoiceApi;
@@ -168,6 +159,7 @@ public abstract class ProcessorBase {
// TODO Rename - there is no lock!
public interface WithAccountLockCallback<PluginDispatcherReturnType, ExceptionType extends Exception> {
+
public PluginDispatcherReturnType doOperation() throws ExceptionType;
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentAutomatonRunner.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentAutomatonRunner.java
index 308e6a2..d0f56a0 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentAutomatonRunner.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentAutomatonRunner.java
@@ -19,7 +19,6 @@ package org.killbill.billing.payment.core.sm;
import java.math.BigDecimal;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -36,7 +35,6 @@ import org.killbill.automaton.State;
import org.killbill.automaton.State.EnteringStateCallback;
import org.killbill.automaton.State.LeavingStateCallback;
import org.killbill.automaton.StateMachine;
-import org.killbill.automaton.StateMachineConfig;
import org.killbill.billing.ErrorCode;
import org.killbill.billing.account.api.Account;
import org.killbill.billing.callcontext.InternalCallContext;
@@ -45,6 +43,7 @@ import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.api.PaymentApiException;
import org.killbill.billing.payment.api.PluginProperty;
import org.killbill.billing.payment.api.TransactionType;
+import org.killbill.billing.payment.core.PaymentExecutors;
import org.killbill.billing.payment.core.sm.payments.AuthorizeCompleted;
import org.killbill.billing.payment.core.sm.payments.AuthorizeInitiated;
import org.killbill.billing.payment.core.sm.payments.AuthorizeOperation;
@@ -69,7 +68,6 @@ import org.killbill.billing.payment.core.sm.payments.VoidOperation;
import org.killbill.billing.payment.dao.PaymentDao;
import org.killbill.billing.payment.dao.PaymentModelDao;
import org.killbill.billing.payment.dispatcher.PluginDispatcher;
-import org.killbill.billing.payment.glue.PaymentModule;
import org.killbill.billing.payment.invoice.InvoicePaymentControlPluginApi;
import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
import org.killbill.billing.util.callcontext.CallContext;
@@ -82,9 +80,6 @@ import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
-import com.google.inject.name.Named;
-
-import static org.killbill.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
public class PaymentAutomatonRunner {
@@ -97,13 +92,12 @@ public class PaymentAutomatonRunner {
private final PersistentBus eventBus;
@Inject
- public PaymentAutomatonRunner(@javax.inject.Named(PaymentModule.STATE_MACHINE_PAYMENT) final StateMachineConfig stateMachineConfig,
- final PaymentConfig paymentConfig,
+ public PaymentAutomatonRunner(final PaymentConfig paymentConfig,
final PaymentDao paymentDao,
final GlobalLocker locker,
final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry,
final Clock clock,
- @Named(PLUGIN_EXECUTOR_NAMED) final ExecutorService executor,
+ final PaymentExecutors executors,
final PersistentBus eventBus,
final PaymentStateMachineHelper paymentSMHelper) {
this.paymentSMHelper = paymentSMHelper;
@@ -114,7 +108,7 @@ public class PaymentAutomatonRunner {
this.eventBus = eventBus;
final long paymentPluginTimeoutSec = TimeUnit.SECONDS.convert(paymentConfig.getPaymentPluginTimeout().getPeriod(), paymentConfig.getPaymentPluginTimeout().getUnit());
- this.paymentPluginDispatcher = new PluginDispatcher<OperationResult>(paymentPluginTimeoutSec, executor);
+ this.paymentPluginDispatcher = new PluginDispatcher<OperationResult>(paymentPluginTimeoutSec, executors);
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/PluginControlPaymentAutomatonRunner.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/PluginControlPaymentAutomatonRunner.java
index f3ca0ca..4f58817 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/PluginControlPaymentAutomatonRunner.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/PluginControlPaymentAutomatonRunner.java
@@ -20,7 +20,6 @@ package org.killbill.billing.payment.core.sm;
import java.math.BigDecimal;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import javax.inject.Inject;
@@ -32,16 +31,17 @@ import org.killbill.automaton.OperationException;
import org.killbill.automaton.State;
import org.killbill.automaton.State.EnteringStateCallback;
import org.killbill.automaton.State.LeavingStateCallback;
-import org.killbill.automaton.StateMachineConfig;
import org.killbill.billing.ErrorCode;
import org.killbill.billing.account.api.Account;
import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.control.plugin.api.PaymentControlPluginApi;
import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.api.Payment;
import org.killbill.billing.payment.api.PaymentApiException;
import org.killbill.billing.payment.api.PluginProperty;
import org.killbill.billing.payment.api.TransactionType;
+import org.killbill.billing.payment.core.PaymentExecutors;
import org.killbill.billing.payment.core.PaymentProcessor;
import org.killbill.billing.payment.core.sm.control.AuthorizeControlOperation;
import org.killbill.billing.payment.core.sm.control.CaptureControlOperation;
@@ -57,10 +57,8 @@ import org.killbill.billing.payment.core.sm.control.PurchaseControlOperation;
import org.killbill.billing.payment.core.sm.control.RefundControlOperation;
import org.killbill.billing.payment.core.sm.control.VoidControlOperation;
import org.killbill.billing.payment.dao.PaymentDao;
-import org.killbill.billing.payment.glue.PaymentModule;
import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
import org.killbill.billing.payment.retry.BaseRetryService.RetryServiceScheduler;
-import org.killbill.billing.control.plugin.api.PaymentControlPluginApi;
import org.killbill.billing.util.callcontext.CallContext;
import org.killbill.billing.util.config.PaymentConfig;
import org.killbill.bus.api.PersistentBus;
@@ -70,7 +68,6 @@ import org.killbill.commons.locker.GlobalLocker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
-import static org.killbill.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
import static org.killbill.billing.payment.glue.PaymentModule.RETRYABLE_NAMED;
public class PluginControlPaymentAutomatonRunner extends PaymentAutomatonRunner {
@@ -82,11 +79,11 @@ public class PluginControlPaymentAutomatonRunner extends PaymentAutomatonRunner
private final ControlPluginRunner controlPluginRunner;
@Inject
- public PluginControlPaymentAutomatonRunner(@Named(PaymentModule.STATE_MACHINE_PAYMENT) final StateMachineConfig stateMachineConfig, final PaymentDao paymentDao, final GlobalLocker locker, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry,
+ public PluginControlPaymentAutomatonRunner(final PaymentDao paymentDao, final GlobalLocker locker, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry,
final OSGIServiceRegistration<PaymentControlPluginApi> paymentControlPluginRegistry, final Clock clock, final PaymentProcessor paymentProcessor, @Named(RETRYABLE_NAMED) final RetryServiceScheduler retryServiceScheduler,
- final PaymentConfig paymentConfig, @Named(PLUGIN_EXECUTOR_NAMED) final ExecutorService executor, final PaymentStateMachineHelper paymentSMHelper, final PaymentControlStateMachineHelper paymentControlStateMachineHelper,
+ final PaymentConfig paymentConfig, final PaymentExecutors executors, final PaymentStateMachineHelper paymentSMHelper, final PaymentControlStateMachineHelper paymentControlStateMachineHelper,
final ControlPluginRunner controlPluginRunner, final PersistentBus eventBus) {
- super(stateMachineConfig, paymentConfig, paymentDao, locker, pluginRegistry, clock, executor, eventBus, paymentSMHelper);
+ super(paymentConfig, paymentDao, locker, pluginRegistry, clock, executors, eventBus, paymentSMHelper);
this.paymentProcessor = paymentProcessor;
this.paymentControlPluginRegistry = paymentControlPluginRegistry;
this.retryServiceScheduler = retryServiceScheduler;
diff --git a/payment/src/main/java/org/killbill/billing/payment/dispatcher/PluginDispatcher.java b/payment/src/main/java/org/killbill/billing/payment/dispatcher/PluginDispatcher.java
index f8b6ca5..bda52e0 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dispatcher/PluginDispatcher.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dispatcher/PluginDispatcher.java
@@ -20,11 +20,13 @@ package org.killbill.billing.payment.dispatcher;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.killbill.billing.payment.core.PaymentExecutors;
import org.killbill.commons.profiling.Profiling;
import org.killbill.commons.profiling.ProfilingData;
import org.killbill.commons.request.Request;
@@ -34,11 +36,11 @@ public class PluginDispatcher<ReturnType> {
private final TimeUnit DEEFAULT_PLUGIN_TIMEOUT_UNIT = TimeUnit.SECONDS;
private final long timeoutSeconds;
- private final ExecutorService executor;
+ private final PaymentExecutors paymentExecutors;
- public PluginDispatcher(final long timeoutSeconds, final ExecutorService executor) {
+ public PluginDispatcher(final long timeoutSeconds, final PaymentExecutors paymentExecutors) {
this.timeoutSeconds = timeoutSeconds;
- this.executor = executor;
+ this.paymentExecutors = paymentExecutors;
}
// TODO Once we switch fully to automata, should this throw PaymentPluginApiException instead?
@@ -49,10 +51,12 @@ public class PluginDispatcher<ReturnType> {
public ReturnType dispatchWithTimeout(final Callable<PluginDispatcherReturnType<ReturnType>> task, final long timeout, final TimeUnit unit)
throws TimeoutException, ExecutionException, InterruptedException {
+ final ExecutorService pluginExecutor = paymentExecutors.getPluginExecutorService();
+
// Wrap existing callable to keep the original requestId
final Callable<PluginDispatcherReturnType<ReturnType>> callableWithRequestData = new CallableWithRequestData(Request.getPerThreadRequestData(), task);
- final Future<PluginDispatcherReturnType<ReturnType>> future = executor.submit(callableWithRequestData);
+ final Future<PluginDispatcherReturnType<ReturnType>> future = pluginExecutor.submit(callableWithRequestData);
final PluginDispatcherReturnType<ReturnType> pluginDispatcherResult = future.get(timeout, unit);
if (pluginDispatcherResult instanceof WithProfilingPluginDispatcherReturnType) {
diff --git a/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java b/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
index d003f74..93603b1 100644
--- a/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
+++ b/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
@@ -21,6 +21,7 @@ package org.killbill.billing.payment.glue;
import org.killbill.billing.payment.api.PaymentApi;
import org.killbill.billing.payment.api.PaymentService;
import org.killbill.billing.payment.bus.PaymentBusEventHandler;
+import org.killbill.billing.payment.core.PaymentExecutors;
import org.killbill.billing.payment.invoice.PaymentTagHandler;
import org.killbill.billing.payment.core.janitor.Janitor;
import org.killbill.billing.payment.retry.DefaultRetryService;
@@ -46,6 +47,7 @@ public class DefaultPaymentService implements PaymentService {
private final PaymentApi api;
private final DefaultRetryService retryService;
private final Janitor janitor;
+ private final PaymentExecutors paymentExecutors;
@Inject
public DefaultPaymentService(final PaymentBusEventHandler paymentBusEventHandler,
@@ -53,13 +55,15 @@ public class DefaultPaymentService implements PaymentService {
final PaymentApi api,
final DefaultRetryService retryService,
final PersistentBus eventBus,
- final Janitor janitor) {
+ final Janitor janitor,
+ final PaymentExecutors paymentExecutors) {
this.paymentBusEventHandler = paymentBusEventHandler;
this.tagHandler = tagHandler;
this.eventBus = eventBus;
this.api = api;
this.retryService = retryService;
this.janitor = janitor;
+ this.paymentExecutors = paymentExecutors;
}
@Override
@@ -75,6 +79,7 @@ public class DefaultPaymentService implements PaymentService {
} catch (final PersistentBus.EventBusException e) {
log.error("Unable to register with the EventBus!", e);
}
+ paymentExecutors.initialize();
retryService.initialize();
janitor.initialize();
}
@@ -95,6 +100,12 @@ public class DefaultPaymentService implements PaymentService {
}
retryService.stop();
janitor.stop();
+ try {
+ paymentExecutors.stop();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("PaymentService got interrupted", e);
+ }
}
@Override
diff --git a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
index b70276b..b249c8b 100644
--- a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
+++ b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
@@ -18,16 +18,11 @@
package org.killbill.billing.payment.glue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
import javax.inject.Provider;
import org.killbill.automaton.DefaultStateMachineConfig;
import org.killbill.automaton.StateMachineConfig;
+import org.killbill.billing.control.plugin.api.PaymentControlPluginApi;
import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.api.AdminPaymentApi;
import org.killbill.billing.payment.api.DefaultAdminPaymentApi;
@@ -37,6 +32,7 @@ import org.killbill.billing.payment.api.PaymentApi;
import org.killbill.billing.payment.api.PaymentGatewayApi;
import org.killbill.billing.payment.api.PaymentService;
import org.killbill.billing.payment.bus.PaymentBusEventHandler;
+import org.killbill.billing.payment.core.PaymentExecutors;
import org.killbill.billing.payment.core.PaymentGatewayProcessor;
import org.killbill.billing.payment.core.PaymentMethodProcessor;
import org.killbill.billing.payment.core.PaymentProcessor;
@@ -58,10 +54,8 @@ import org.killbill.billing.payment.retry.DefaultRetryService;
import org.killbill.billing.payment.retry.DefaultRetryService.DefaultRetryServiceScheduler;
import org.killbill.billing.payment.retry.RetryService;
import org.killbill.billing.platform.api.KillbillConfigSource;
-import org.killbill.billing.control.plugin.api.PaymentControlPluginApi;
import org.killbill.billing.util.config.PaymentConfig;
import org.killbill.billing.util.glue.KillBillModule;
-import org.killbill.commons.concurrent.WithProfilingThreadPoolExecutor;
import org.killbill.xmlloader.XMLLoader;
import org.skife.config.ConfigurationObjectFactory;
@@ -73,11 +67,7 @@ import com.google.inject.name.Names;
public class PaymentModule extends KillBillModule {
- private static final String PLUGIN_THREAD_PREFIX = "Plugin-th-";
- private static final String PAYMENT_PLUGIN_TH_GROUP_NAME = "pay-plugin-grp";
- public static final String JANITOR_EXECUTOR_NAMED = "JanitorExecutor";
- public static final String PLUGIN_EXECUTOR_NAMED = "PluginExecutor";
public static final String RETRYABLE_NAMED = "Retryable";
public static final String STATE_MACHINE_RETRY = "RetryStateMachine";
@@ -102,9 +92,6 @@ public class PaymentModule extends KillBillModule {
}
protected void installJanitor() {
- final ScheduledExecutorService janitorExecutor = org.killbill.commons.concurrent.Executors.newSingleThreadScheduledExecutor("PaymentJanitor");
- bind(ScheduledExecutorService.class).annotatedWith(Names.named(JANITOR_EXECUTOR_NAMED)).toInstance(janitorExecutor);
-
bind(IncompletePaymentTransactionTask.class).asEagerSingleton();
bind(IncompletePaymentAttemptTask.class).asEagerSingleton();
bind(Janitor.class).asEagerSingleton();
@@ -136,22 +123,6 @@ public class PaymentModule extends KillBillModule {
}
protected void installProcessors(final PaymentConfig paymentConfig) {
-
- final ExecutorService pluginExecutorService = new WithProfilingThreadPoolExecutor(paymentConfig.getPaymentPluginThreadNb(),
- paymentConfig.getPaymentPluginThreadNb(),
- 0L,
- TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new ThreadFactory() {
-
- @Override
- public Thread newThread(final Runnable r) {
- final Thread th = new Thread(new ThreadGroup(PAYMENT_PLUGIN_TH_GROUP_NAME), r);
- th.setName(PLUGIN_THREAD_PREFIX + th.getId());
- return th;
- }
- });
- bind(ExecutorService.class).annotatedWith(Names.named(PLUGIN_EXECUTOR_NAMED)).toInstance(pluginExecutorService);
bind(PaymentProcessor.class).asEagerSingleton();
bind(PluginControlPaymentProcessor.class).asEagerSingleton();
bind(PaymentGatewayProcessor.class).asEagerSingleton();
@@ -173,6 +144,7 @@ public class PaymentModule extends KillBillModule {
bind(PaymentBusEventHandler.class).asEagerSingleton();
bind(PaymentTagHandler.class).asEagerSingleton();
bind(PaymentService.class).to(DefaultPaymentService.class).asEagerSingleton();
+ bind(PaymentExecutors.class).asEagerSingleton();
installPaymentProviderPlugins(paymentConfig);
installPaymentDao();
installProcessors(paymentConfig);
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/sm/MockRetryablePaymentAutomatonRunner.java b/payment/src/test/java/org/killbill/billing/payment/core/sm/MockRetryablePaymentAutomatonRunner.java
index 14f6758..07c8031 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/sm/MockRetryablePaymentAutomatonRunner.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/sm/MockRetryablePaymentAutomatonRunner.java
@@ -20,7 +20,6 @@ package org.killbill.billing.payment.core.sm;
import java.math.BigDecimal;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import javax.inject.Inject;
@@ -28,23 +27,22 @@ import javax.inject.Named;
import org.killbill.automaton.Operation.OperationCallback;
import org.killbill.automaton.OperationResult;
-import org.killbill.automaton.StateMachineConfig;
import org.killbill.billing.account.api.Account;
import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.control.plugin.api.PaymentControlPluginApi;
import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.api.PaymentApiException;
import org.killbill.billing.payment.api.PluginProperty;
import org.killbill.billing.payment.api.TransactionType;
+import org.killbill.billing.payment.core.PaymentExecutors;
import org.killbill.billing.payment.core.PaymentProcessor;
import org.killbill.billing.payment.core.sm.control.ControlPluginRunner;
import org.killbill.billing.payment.core.sm.control.PaymentStateControlContext;
import org.killbill.billing.payment.dao.PaymentDao;
import org.killbill.billing.payment.dispatcher.PluginDispatcher;
-import org.killbill.billing.payment.glue.PaymentModule;
import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
import org.killbill.billing.payment.retry.BaseRetryService.RetryServiceScheduler;
-import org.killbill.billing.control.plugin.api.PaymentControlPluginApi;
import org.killbill.billing.tag.TagInternalApi;
import org.killbill.billing.util.callcontext.CallContext;
import org.killbill.billing.util.config.PaymentConfig;
@@ -52,7 +50,6 @@ import org.killbill.bus.api.PersistentBus;
import org.killbill.clock.Clock;
import org.killbill.commons.locker.GlobalLocker;
-import static org.killbill.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
import static org.killbill.billing.payment.glue.PaymentModule.RETRYABLE_NAMED;
public class MockRetryablePaymentAutomatonRunner extends PluginControlPaymentAutomatonRunner {
@@ -61,10 +58,10 @@ public class MockRetryablePaymentAutomatonRunner extends PluginControlPaymentAut
private PaymentStateControlContext context;
@Inject
- public MockRetryablePaymentAutomatonRunner(@Named(PaymentModule.STATE_MACHINE_PAYMENT) final StateMachineConfig stateMachineConfig, @Named(PaymentModule.STATE_MACHINE_RETRY) final StateMachineConfig retryStateMachine, final PaymentDao paymentDao, final GlobalLocker locker, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry, final OSGIServiceRegistration<PaymentControlPluginApi> retryPluginRegistry, final Clock clock, final TagInternalApi tagApi, final PaymentProcessor paymentProcessor,
- @Named(RETRYABLE_NAMED) final RetryServiceScheduler retryServiceScheduler, final PaymentConfig paymentConfig, @com.google.inject.name.Named(PLUGIN_EXECUTOR_NAMED) final ExecutorService executor,
+ public MockRetryablePaymentAutomatonRunner(final PaymentDao paymentDao, final GlobalLocker locker, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry, final OSGIServiceRegistration<PaymentControlPluginApi> retryPluginRegistry, final Clock clock, final TagInternalApi tagApi, final PaymentProcessor paymentProcessor,
+ @Named(RETRYABLE_NAMED) final RetryServiceScheduler retryServiceScheduler, final PaymentConfig paymentConfig, final PaymentExecutors executors,
final PaymentStateMachineHelper paymentSMHelper, final PaymentControlStateMachineHelper retrySMHelper, final ControlPluginRunner controlPluginRunner, final PersistentBus eventBus) {
- super(stateMachineConfig, paymentDao, locker, pluginRegistry, retryPluginRegistry, clock, paymentProcessor, retryServiceScheduler, paymentConfig, executor, paymentSMHelper, retrySMHelper, controlPluginRunner, eventBus);
+ super(paymentDao, locker, pluginRegistry, retryPluginRegistry, clock, paymentProcessor, retryServiceScheduler, paymentConfig, executors, paymentSMHelper, retrySMHelper, controlPluginRunner, eventBus);
}
@Override
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestPaymentOperation.java b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestPaymentOperation.java
index 586e759..7e4e1b6 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestPaymentOperation.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestPaymentOperation.java
@@ -19,7 +19,6 @@ package org.killbill.billing.payment.core.sm;
import java.math.BigDecimal;
import java.util.UUID;
-import java.util.concurrent.Executors;
import javax.annotation.Nullable;
@@ -104,7 +103,7 @@ public class TestPaymentOperation extends PaymentTestSuiteNoDB {
private void setUp(final PaymentPluginStatus paymentPluginStatus) throws Exception {
final GlobalLocker locker = new MemoryGlobalLocker();
- final PluginDispatcher<OperationResult> paymentPluginDispatcher = new PluginDispatcher<OperationResult>(1, Executors.newCachedThreadPool());
+ final PluginDispatcher<OperationResult> paymentPluginDispatcher = new PluginDispatcher<OperationResult>(1, paymentExecutors);
paymentStateContext = new PaymentStateContext(true,
UUID.randomUUID(),
null, null,
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestPluginOperation.java b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestPluginOperation.java
index f004f02..9c73489 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestPluginOperation.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestPluginOperation.java
@@ -184,7 +184,7 @@ public class TestPluginOperation extends PaymentTestSuiteNoDB {
}
private PaymentOperation getPluginOperation(final boolean shouldLockAccount, final int timeoutSeconds) throws PaymentApiException {
- final PluginDispatcher<OperationResult> paymentPluginDispatcher = new PluginDispatcher<OperationResult>(timeoutSeconds, Executors.newCachedThreadPool());
+ final PluginDispatcher<OperationResult> paymentPluginDispatcher = new PluginDispatcher<OperationResult>(timeoutSeconds, paymentExecutors);
final PaymentStateContext paymentStateContext = new PaymentStateContext(true, UUID.randomUUID(),
null, null,
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryablePayment.java b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryablePayment.java
index 433806e..b796607 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryablePayment.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryablePayment.java
@@ -20,7 +20,6 @@ package org.killbill.billing.payment.core.sm;
import java.math.BigDecimal;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
import javax.inject.Named;
@@ -32,6 +31,7 @@ import org.killbill.billing.ErrorCode;
import org.killbill.billing.account.api.Account;
import org.killbill.billing.callcontext.InternalTenantContext;
import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.control.plugin.api.PaymentControlPluginApi;
import org.killbill.billing.osgi.api.OSGIServiceDescriptor;
import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.PaymentTestSuiteNoDB;
@@ -39,6 +39,7 @@ import org.killbill.billing.payment.api.PaymentApiException;
import org.killbill.billing.payment.api.PluginProperty;
import org.killbill.billing.payment.api.TransactionStatus;
import org.killbill.billing.payment.api.TransactionType;
+import org.killbill.billing.payment.core.PaymentExecutors;
import org.killbill.billing.payment.core.PaymentProcessor;
import org.killbill.billing.payment.core.PluginControlPaymentProcessor;
import org.killbill.billing.payment.core.sm.control.ControlPluginRunner;
@@ -53,7 +54,6 @@ import org.killbill.billing.payment.glue.PaymentModule;
import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
import org.killbill.billing.payment.provider.MockPaymentControlProviderPlugin;
import org.killbill.billing.payment.retry.BaseRetryService.RetryServiceScheduler;
-import org.killbill.billing.control.plugin.api.PaymentControlPluginApi;
import org.killbill.billing.tag.TagInternalApi;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.dao.NonEntityDao;
@@ -72,7 +72,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
-import static org.killbill.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
import static org.killbill.billing.payment.glue.PaymentModule.RETRYABLE_NAMED;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -103,8 +102,7 @@ public class TestRetryablePayment extends PaymentTestSuiteNoDB {
@Named(RETRYABLE_NAMED)
private RetryServiceScheduler retryServiceScheduler;
@Inject
- @Named(PLUGIN_EXECUTOR_NAMED)
- private ExecutorService executor;
+ private PaymentExecutors executors;
@Inject
private PaymentStateMachineHelper paymentSMHelper;
@Inject
@@ -158,8 +156,6 @@ public class TestRetryablePayment extends PaymentTestSuiteNoDB {
this.utcNow = clock.getUTCNow();
runner = new MockRetryablePaymentAutomatonRunner(
- stateMachineConfig,
- retryStateMachineConfig,
paymentDao,
locker,
pluginRegistry,
@@ -169,7 +165,7 @@ public class TestRetryablePayment extends PaymentTestSuiteNoDB {
paymentProcessor,
retryServiceScheduler,
paymentConfig,
- executor,
+ paymentExecutors,
paymentSMHelper,
retrySMHelper,
controlPluginRunner,
@@ -177,18 +173,18 @@ public class TestRetryablePayment extends PaymentTestSuiteNoDB {
paymentStateContext =
new PaymentStateControlContext(ImmutableList.<String>of(MockPaymentControlProviderPlugin.PLUGIN_NAME),
- true,
- null,
- paymentExternalKey,
- paymentTransactionExternalKey,
- TransactionType.AUTHORIZE,
- account,
- paymentMethodId,
- amount,
- currency,
- emptyProperties,
- internalCallContext,
- callContext);
+ true,
+ null,
+ paymentExternalKey,
+ paymentTransactionExternalKey,
+ TransactionType.AUTHORIZE,
+ account,
+ paymentMethodId,
+ amount,
+ currency,
+ emptyProperties,
+ internalCallContext,
+ callContext);
mockRetryAuthorizeOperationCallback =
new MockRetryAuthorizeOperationCallback(locker,
@@ -205,7 +201,6 @@ public class TestRetryablePayment extends PaymentTestSuiteNoDB {
tagApi,
paymentDao,
locker,
- executor,
internalCallContextFactory,
runner,
retrySMHelper,
diff --git a/payment/src/test/java/org/killbill/billing/payment/dispatcher/TestPluginDispatcher.java b/payment/src/test/java/org/killbill/billing/payment/dispatcher/TestPluginDispatcher.java
index aef5eec..86365e5 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dispatcher/TestPluginDispatcher.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dispatcher/TestPluginDispatcher.java
@@ -33,9 +33,9 @@ import org.testng.annotations.Test;
public class TestPluginDispatcher extends PaymentTestSuiteNoDB {
- private final PluginDispatcher<Void> voidPluginDispatcher = new PluginDispatcher<Void>(10, Executors.newSingleThreadExecutor());
+ private final PluginDispatcher<Void> voidPluginDispatcher = new PluginDispatcher<Void>(10, paymentExecutors);
- private final PluginDispatcher<String> stringPluginDispatcher = new PluginDispatcher<String>(1, Executors.newSingleThreadExecutor());
+ private final PluginDispatcher<String> stringPluginDispatcher = new PluginDispatcher<String>(1, paymentExecutors);
@Test(groups = "fast")
public void testDispatchWithTimeout() throws TimeoutException, PaymentApiException {
diff --git a/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteNoDB.java b/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteNoDB.java
index 9c19560..8c45a5a 100644
--- a/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteNoDB.java
+++ b/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteNoDB.java
@@ -24,6 +24,7 @@ import org.killbill.billing.invoice.api.InvoiceInternalApi;
import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.api.PaymentApi;
import org.killbill.billing.payment.api.PaymentGatewayApi;
+import org.killbill.billing.payment.core.PaymentExecutors;
import org.killbill.billing.payment.core.PaymentMethodProcessor;
import org.killbill.billing.payment.core.PaymentProcessor;
import org.killbill.billing.payment.core.PluginControlPaymentProcessor;
@@ -82,6 +83,8 @@ public abstract class PaymentTestSuiteNoDB extends GuicyKillbillTestSuiteNoDB {
protected DefaultRetryService retryService;
@Inject
protected CacheControllerDispatcher cacheControllerDispatcher;
+ @Inject
+ protected PaymentExecutors paymentExecutors;
@Override
protected KillbillConfigSource getConfigSource() {
diff --git a/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java b/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java
index 1ab3f64..aefb209 100644
--- a/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java
+++ b/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java
@@ -24,6 +24,7 @@ import org.killbill.billing.invoice.api.InvoiceInternalApi;
import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.api.PaymentApi;
import org.killbill.billing.payment.api.PaymentGatewayApi;
+import org.killbill.billing.payment.core.PaymentExecutors;
import org.killbill.billing.payment.core.PaymentProcessor;
import org.killbill.billing.payment.core.PaymentMethodProcessor;
import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
@@ -70,6 +71,8 @@ public abstract class PaymentTestSuiteWithEmbeddedDB extends GuicyKillbillTestSu
protected PaymentDao paymentDao;
@Inject
protected TestPaymentHelper testHelper;
+ @Inject
+ protected PaymentExecutors paymentExecutors;
@Override
protected KillbillConfigSource getConfigSource() {
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
index 7ed69f5..8303dc6 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
@@ -114,6 +114,7 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
protected void beforeClass() throws Exception {
super.beforeClass();
mockPaymentProviderPlugin = (MockPaymentProviderPlugin) registry.getServiceForName(MockPaymentProviderPlugin.PLUGIN_NAME);
+ paymentExecutors.initialize();
janitor.initialize();
janitor.start();
}
@@ -121,6 +122,7 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
@AfterClass(groups = "slow")
protected void afterClass() throws Exception {
janitor.stop();
+ paymentExecutors.stop();
}
@BeforeMethod(groups = "slow")