Details
diff --git a/api/src/main/java/com/ning/billing/meter/MeterService.java b/api/src/main/java/com/ning/billing/meter/MeterService.java
new file mode 100644
index 0000000..8e87107
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/meter/MeterService.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter;
+
+import com.ning.billing.lifecycle.KillbillService;
+
+public interface MeterService extends KillbillService {
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/DefaultMeterService.java b/meter/src/main/java/com/ning/billing/meter/DefaultMeterService.java
new file mode 100644
index 0000000..725da04
--- /dev/null
+++ b/meter/src/main/java/com/ning/billing/meter/DefaultMeterService.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter;
+
+import javax.inject.Inject;
+
+import com.ning.billing.lifecycle.LifecycleHandlerType;
+import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
+import com.ning.billing.meter.timeline.BackgroundDBChunkWriter;
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.aggregator.TimelineAggregator;
+import com.ning.billing.util.callcontext.CallOrigin;
+import com.ning.billing.util.callcontext.InternalCallContext;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.callcontext.UserType;
+import com.ning.billing.util.config.MeterConfig;
+
+public class DefaultMeterService implements MeterService {
+
+ public static final String METER_SERVICE_NAME = "meter-service";
+
+ private final BackgroundDBChunkWriter backgroundDBChunkWriter;
+ private final TimelineEventHandler timelineEventHandler;
+ private final TimelineAggregator timelineAggregator;
+ private final InternalCallContext internalCallContext;
+ private final MeterConfig config;
+
+ @Inject
+ public DefaultMeterService(final BackgroundDBChunkWriter backgroundDBChunkWriter, final TimelineEventHandler timelineEventHandler, final InternalCallContextFactory internalCallContextFactory, final TimelineAggregator timelineAggregator, final MeterConfig config) {
+ this.backgroundDBChunkWriter = backgroundDBChunkWriter;
+ this.timelineEventHandler = timelineEventHandler;
+ this.timelineAggregator = timelineAggregator;
+ this.config = config;
+ this.internalCallContext = internalCallContextFactory.createInternalCallContext(InternalCallContextFactory.INTERNAL_TENANT_RECORD_ID, null, "MeterService", CallOrigin.INTERNAL, UserType.SYSTEM, null);
+ }
+
+ @Override
+ public String getName() {
+ return METER_SERVICE_NAME;
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+ public void start() {
+ // Replay any log files that might not have been committed in the db-- should only occur if we crashed previously
+ timelineEventHandler.replay(config.getSpoolDir(), internalCallContext);
+ // Start the aggregation thread
+ timelineAggregator.runAggregationThread();
+ // Start the backgroundDBChunkWriter thread
+ backgroundDBChunkWriter.runBackgroundWriteThread();
+ // Start the purger thread to delete old log files
+ timelineEventHandler.startPurgeThread();
+
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+ public void stop() {
+ // Stop the aggregation thread
+ timelineAggregator.stopAggregationThread();
+ // . Depending on shutdown mode, commit in memory timeline accumulators
+ // . Will flush current direct buffer
+ // . Will stop the backgroundDBChunkWriter thread
+ //. Will stop the purger thread
+ timelineEventHandler.commitAndShutdown(internalCallContext);
+ }
+}
diff --git a/meter/src/main/java/com/ning/billing/meter/glue/MeterModule.java b/meter/src/main/java/com/ning/billing/meter/glue/MeterModule.java
index 20c725f..e406802 100644
--- a/meter/src/main/java/com/ning/billing/meter/glue/MeterModule.java
+++ b/meter/src/main/java/com/ning/billing/meter/glue/MeterModule.java
@@ -22,6 +22,8 @@ import org.skife.config.ConfigSource;
import org.skife.config.ConfigurationObjectFactory;
import org.skife.config.SimplePropertyConfigSource;
+import com.ning.billing.meter.DefaultMeterService;
+import com.ning.billing.meter.MeterService;
import com.ning.billing.meter.api.MeterUserApi;
import com.ning.billing.meter.api.user.DefaultMeterUserApi;
import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
@@ -77,19 +79,20 @@ public class MeterModule extends AbstractModule {
bind(MeterUserApi.class).to(DefaultMeterUserApi.class).asEagerSingleton();
}
+
+ protected void installMeterService() {
+ bind(MeterService.class).to(DefaultMeterService.class).asEagerSingleton();
+ }
+
@Override
protected void configure() {
final MeterConfig config = installConfig();
+ installMeterService();
configureFileBackedBuffer(config);
configureDao();
configureTimelineObjects();
- // TODO
- //configureTimelineAggregator();
- //configureBackgroundDBChunkWriter();
- //configureReplayer();
-
installMeterUserApi();
}
}
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/Replayer.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/Replayer.java
index 486861f..e646ef7 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/Replayer.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/Replayer.java
@@ -98,7 +98,7 @@ public class Replayer {
}
public int readAll(final boolean deleteFiles, @Nullable final DateTime minStartTime, final Function<SourceSamplesForTimestamp, Void> fn) {
- final Collection<File> files = ImmutableList.<File>copyOf(findCandidates());
+ final List<File> files = findCandidates();
int filesSkipped = 0;
for (final File file : FILE_ORDERING.sortedCopy(files)) {
try {
@@ -141,7 +141,7 @@ public class Replayer {
public void purgeOldFiles(final DateTime purgeIfOlderDate) {
- final File[] candidates = findCandidates();
+ final List<File> candidates = findCandidates();
for (final File file : candidates) {
if (file.lastModified() <= purgeIfOlderDate.getMillis()) {
if (!file.delete()) {
@@ -151,7 +151,7 @@ public class Replayer {
}
}
- private File[] findCandidates() {
+ private List<File> findCandidates() {
final File root = new File(path);
final FilenameFilter filter = new FilenameFilter() {
@Override
@@ -160,6 +160,7 @@ public class Replayer {
}
};
- return root.listFiles(filter);
+ final File [] foundFiles = root.listFiles(filter);
+ return foundFiles == null ? ImmutableList.<File>of() : ImmutableList.<File>copyOf(foundFiles);
}
}
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java
index 0d4349e..eea392d 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java
@@ -469,14 +469,13 @@ public class TimelineEventHandler {
replayer.purgeOldFiles(purgeFilesIfBefore);
}
- public void startHandlerThreads() {
+ public void startPurgeThread() {
purgeThread.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
purgeFilesAndAccumulators();
}
- },
- config.getTimelineLength().getMillis(),
+ }, config.getTimelineLength().getMillis(),
config.getTimelineLength().getMillis(),
TimeUnit.MILLISECONDS);
}
diff --git a/server/src/main/java/com/ning/billing/server/modules/KillbillServerModule.java b/server/src/main/java/com/ning/billing/server/modules/KillbillServerModule.java
index a1b3303..73e24ba 100644
--- a/server/src/main/java/com/ning/billing/server/modules/KillbillServerModule.java
+++ b/server/src/main/java/com/ning/billing/server/modules/KillbillServerModule.java
@@ -55,6 +55,7 @@ import com.ning.billing.util.glue.CustomFieldModule;
import com.ning.billing.util.glue.ExportModule;
import com.ning.billing.util.glue.GlobalLockerModule;
import com.ning.billing.util.glue.NotificationQueueModule;
+import com.ning.billing.util.glue.TagStoreModule;
import com.google.inject.AbstractModule;
@@ -123,6 +124,7 @@ public class KillbillServerModule extends AbstractModule {
install(new TenantModule());
install(new ExportModule());
install(new MeterModule());
+ install(new TagStoreModule());
installClock();
}
}