killbill-aplcache
Details
pom.xml 10(+10 -0)
diff --git a/pom.xml b/pom.xml
index d323ef7..03f99e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -222,6 +222,11 @@
<version>2.0.0</version>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-smile</artifactId>
+ <version>2.0.1</version>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>2.0.1</version>
@@ -232,6 +237,11 @@
<version>2.0.0</version>
</dependency>
<dependency>
+ <groupId>com.fasterxml.util</groupId>
+ <artifactId>low-gc-membuffers</artifactId>
+ <version>0.9.0</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>12.0</version>
usage/pom.xml 16(+16 -0)
diff --git a/usage/pom.xml b/usage/pom.xml
index 1c1183a..f97c4fa 100644
--- a/usage/pom.xml
+++ b/usage/pom.xml
@@ -26,6 +26,22 @@
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-joda</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-smile</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.util</groupId>
+ <artifactId>low-gc-membuffers</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<scope>provided</scope>
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/persistent/FileBackedBuffer.java b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/FileBackedBuffer.java
new file mode 100644
index 0000000..9040b0d
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/FileBackedBuffer.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.usage.timeline.sources.SourceSamplesForTimestamp;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
+import com.fasterxml.util.membuf.MemBuffersForBytes;
+import com.fasterxml.util.membuf.StreamyBytesMemBuffer;
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Backing buffer for a single TimelineSourceEventAccumulator that spools to disk
+ */
+public class FileBackedBuffer {
+
+ private static final Logger log = LoggerFactory.getLogger(FileBackedBuffer.class);
+
+ private static final SmileFactory smileFactory = new SmileFactory();
+ private static final ObjectMapper smileObjectMapper = new ObjectMapper(smileFactory);
+
+ static {
+ // Disable all magic for now as we don't write the Smile header (we share the same smileGenerator
+ // across multiple backend files)
+ smileFactory.configure(SmileGenerator.Feature.CHECK_SHARED_NAMES, false);
+ smileFactory.configure(SmileGenerator.Feature.CHECK_SHARED_STRING_VALUES, false);
+ }
+
+ private final String basePath;
+ private final String prefix;
+ private final boolean deleteFilesOnClose;
+ private final AtomicLong samplesforTimestampWritten = new AtomicLong();
+ private final Object recyclingMonitor = new Object();
+
+ private final StreamyBytesMemBuffer inputBuffer;
+ private StreamyBytesPersistentOutputStream out = null;
+ private SmileGenerator smileGenerator;
+
+ public FileBackedBuffer(final String basePath, final String prefix, final int segmentsSize, final int maxNbSegments) throws IOException {
+ this(basePath, prefix, true, segmentsSize, maxNbSegments);
+ }
+
+ public FileBackedBuffer(final String basePath, final String prefix, final boolean deleteFilesOnClose, final int segmentsSize, final int maxNbSegments) throws IOException {
+ this.basePath = basePath;
+ this.prefix = prefix;
+ this.deleteFilesOnClose = deleteFilesOnClose;
+
+ final MemBuffersForBytes bufs = new MemBuffersForBytes(segmentsSize, 1, maxNbSegments);
+ inputBuffer = bufs.createStreamyBuffer(1, maxNbSegments);
+
+ recycle();
+ }
+
+ public boolean append(final SourceSamplesForTimestamp sourceSamplesForTimestamp) {
+ try {
+ synchronized (recyclingMonitor) {
+ smileObjectMapper.writeValue(smileGenerator, sourceSamplesForTimestamp);
+ samplesforTimestampWritten.incrementAndGet();
+ return true;
+ }
+ } catch (IOException e) {
+ log.warn("Unable to backup samples", e);
+ return false;
+ }
+ }
+
+ /**
+ * Discard in-memory and on-disk data
+ */
+ public void discard() {
+ try {
+ recycle();
+ } catch (IOException e) {
+ log.warn("Exception discarding buffer", e);
+ }
+ }
+
+ private void recycle() throws IOException {
+ synchronized (recyclingMonitor) {
+ if (out != null && !out.isEmpty()) {
+ out.close();
+ }
+
+ out = new StreamyBytesPersistentOutputStream(basePath, prefix, inputBuffer, deleteFilesOnClose);
+ smileGenerator = smileFactory.createJsonGenerator(out, JsonEncoding.UTF8);
+ // Drop the Smile header
+ smileGenerator.flush();
+ out.reset();
+
+ samplesforTimestampWritten.set(0);
+ }
+ }
+
+ //@MonitorableManaged(description = "Return the approximate size of bytes on disk for samples not yet in the database", monitored = true, monitoringType = {MonitoringType.VALUE})
+ public long getBytesOnDisk() {
+ return out.getBytesOnDisk();
+ }
+
+ //@MonitorableManaged(description = "Return the approximate size of bytes in memory for samples not yet in the database", monitored = true, monitoringType = {MonitoringType.VALUE})
+ public long getBytesInMemory() {
+ return out.getBytesInMemory();
+ }
+
+ //@MonitorableManaged(description = "Return the approximate size of bytes available in memory (before spilling over to disk) for samples not yet in the database", monitored = true, monitoringType = {MonitoringType.VALUE})
+ public long getInMemoryAvailableSpace() {
+ return out.getInMemoryAvailableSpace();
+ }
+
+ @VisibleForTesting
+ public long getFilesCreated() {
+ return out.getCreatedFiles().size();
+ }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/persistent/Replayer.java b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/Replayer.java
new file mode 100644
index 0000000..5896216
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/Replayer.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.usage.timeline.sources.SourceSamplesForTimestamp;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.fasterxml.jackson.dataformat.smile.SmileParser;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+
+public class Replayer {
+
+ private static final Logger log = LoggerFactory.getLogger(Replayer.class);
+ private static final SmileFactory smileFactory = new SmileFactory();
+ private static final ObjectMapper smileMapper = new ObjectMapper(smileFactory);
+
+ static {
+ smileFactory.configure(SmileParser.Feature.REQUIRE_HEADER, false);
+ smileFactory.setCodec(smileMapper);
+ }
+
+ @VisibleForTesting
+ public static final Ordering<File> FILE_ORDERING = new Ordering<File>() {
+ @Override
+ public int compare(@Nullable final File left, @Nullable final File right) {
+ if (left == null || right == null) {
+ throw new NullPointerException();
+ }
+
+ // Order by the nano time
+ return left.getAbsolutePath().compareTo(right.getAbsolutePath());
+ }
+ };
+
+ private final String path;
+ private AtomicBoolean shuttingDown = new AtomicBoolean();
+
+ public Replayer(final String path) {
+ this.path = path;
+ }
+
+ // This method is only used by test code
+ public List<SourceSamplesForTimestamp> readAll() {
+ final List<SourceSamplesForTimestamp> samples = new ArrayList<SourceSamplesForTimestamp>();
+
+ readAll(true, null, new Function<SourceSamplesForTimestamp, Void>() {
+ @Override
+ public Void apply(@Nullable final SourceSamplesForTimestamp input) {
+ if (input != null) {
+ samples.add(input);
+ }
+ return null;
+ }
+ });
+
+ return samples;
+ }
+
+ public void initiateShutdown() {
+ shuttingDown.set(true);
+ }
+
+ public int readAll(final boolean deleteFiles, @Nullable final DateTime minStartTime, final Function<SourceSamplesForTimestamp, Void> fn) {
+ final Collection<File> files = ImmutableList.<File>copyOf(findCandidates());
+ int filesSkipped = 0;
+ for (final File file : FILE_ORDERING.sortedCopy(files)) {
+ try {
+ // Skip files whose last modification date is is earlier than the first start time.
+ if (minStartTime != null && file.lastModified() < minStartTime.getMillis()) {
+ filesSkipped++;
+ continue;
+ }
+ read(file, fn);
+ if (shuttingDown.get()) {
+ break;
+ }
+
+ if (deleteFiles) {
+ if (!file.delete()) {
+ log.warn("Unable to delete file: {}", file.getAbsolutePath());
+ }
+ }
+ } catch (IOException e) {
+ log.warn("Exception replaying file: {}", file.getAbsolutePath(), e);
+ }
+ }
+ return filesSkipped;
+ }
+
+ @VisibleForTesting
+ public void read(final File file, final Function<SourceSamplesForTimestamp, Void> fn) throws IOException {
+ final JsonParser smileParser = smileFactory.createJsonParser(file);
+ if (smileParser.nextToken() != JsonToken.START_ARRAY) {
+ return;
+ }
+
+ while (!shuttingDown.get() && smileParser.nextToken() != JsonToken.END_ARRAY) {
+ final SourceSamplesForTimestamp sourceSamplesForTimestamp = smileParser.readValueAs(SourceSamplesForTimestamp.class);
+ fn.apply(sourceSamplesForTimestamp);
+ }
+
+ smileParser.close();
+ }
+
+
+ public void purgeOldFiles(final DateTime purgeIfOlderDate) {
+ final File[] candidates = findCandidates();
+ for (final File file : candidates) {
+ if (file.lastModified() <= purgeIfOlderDate.getMillis()) {
+ if (!file.delete()) {
+ log.warn("Unable to delete file: {}", file.getAbsolutePath());
+ }
+ }
+ }
+ }
+
+ private File[] findCandidates() {
+ final File root = new File(path);
+ final FilenameFilter filter = new FilenameFilter() {
+ @Override
+ public boolean accept(final File file, final String s) {
+ return s.endsWith("bin");
+ }
+ };
+
+ return root.listFiles(filter);
+ }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/persistent/StreamyBytesPersistentOutputStream.java b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/StreamyBytesPersistentOutputStream.java
new file mode 100644
index 0000000..1256583
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/StreamyBytesPersistentOutputStream.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.dataformat.smile.SmileConstants;
+import com.fasterxml.util.membuf.StreamyBytesMemBuffer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.Files;
+
+public class StreamyBytesPersistentOutputStream extends OutputStream {
+
+ private static final Logger log = LoggerFactory.getLogger(StreamyBytesPersistentOutputStream.class);
+ private static final int BUF_SIZE = 0x1000; // 4K
+
+ private final String basePath;
+ private final String prefix;
+ private final StreamyBytesMemBuffer inputBuffer;
+ private final boolean deleteFilesOnClose;
+ private final List<String> createdFiles = new ArrayList<String>();
+
+ private long bytesOnDisk = 0L;
+
+ public StreamyBytesPersistentOutputStream(String basePath, final String prefix, final StreamyBytesMemBuffer inputBuffer, final boolean deleteFilesOnClose) {
+ if (!basePath.endsWith("/")) {
+ basePath += "/";
+ }
+ this.basePath = basePath;
+
+ this.prefix = prefix;
+ this.inputBuffer = inputBuffer;
+ this.deleteFilesOnClose = deleteFilesOnClose;
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ final byte data = (byte) b;
+ write(new byte[]{data}, 0, 1);
+ }
+
+ @Override
+ public void write(final byte[] data, final int off, final int len) throws IOException {
+ if (!inputBuffer.tryAppend(data, off, len)) {
+ // Buffer full - need to flush
+ flushUnderlyingBufferAndReset();
+
+ if (!inputBuffer.tryAppend(data, off, len)) {
+ log.warn("Unable to append data: 1 byte lost");
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Cleanup volatile data
+ inputBuffer.clear();
+
+ // Cleanup persistent data
+ if (deleteFilesOnClose) {
+ for (final String path : createdFiles) {
+ log.info("Discarding file: {}", path);
+ if (!new File(path).delete()) {
+ log.warn("Unable to discard file: {}", path);
+ }
+ }
+ }
+ }
+
+ private void flushUnderlyingBufferAndReset() {
+ synchronized (inputBuffer) {
+ if (inputBuffer.available() == 0) {
+ // Somebody beat you to it
+ return;
+ }
+
+ final String pathname = getFileName();
+ createdFiles.add(pathname);
+ log.debug("Flushing in-memory buffer to disk: {}", pathname);
+
+ try {
+ final File out = new File(pathname);
+ flushToFile(out);
+ } catch (IOException e) {
+ log.warn("Error flushing data", e);
+ } finally {
+ reset();
+ }
+ }
+ }
+
+ @VisibleForTesting
+ String getFileName() {
+ return basePath + "arecibo." + prefix + "." + System.nanoTime() + ".bin";
+ }
+
+ private void flushToFile(final File out) throws IOException {
+ final byte[] buf = new byte[BUF_SIZE];
+ FileOutputStream transfer = null;
+
+ int bytesTransferred = 0;
+ try {
+ transfer = Files.newOutputStreamSupplier(out).getOutput();
+
+ while (true) {
+ final int r = inputBuffer.readIfAvailable(buf);
+ if (r == 0) {
+ break;
+ }
+ transfer.write(buf, 0, r);
+ bytesTransferred += r;
+ }
+ } finally {
+ if (transfer != null) {
+ try {
+ transfer.write(SmileConstants.TOKEN_LITERAL_END_ARRAY);
+ bytesTransferred++;
+ bytesOnDisk += bytesTransferred;
+ } finally {
+ transfer.flush();
+ }
+ }
+ }
+ log.debug("Saved {} bytes to disk", bytesTransferred);
+ }
+
+ public void reset() {
+ inputBuffer.clear();
+ try {
+ write(SmileConstants.TOKEN_LITERAL_START_ARRAY);
+ } catch (IOException e) {
+ // Not sure how to recover?
+ }
+ }
+
+ public List<String> getCreatedFiles() {
+ return createdFiles;
+ }
+
+ public long getBytesOnDisk() {
+ return bytesOnDisk;
+ }
+
+ public long getBytesInMemory() {
+ return inputBuffer.getTotalPayloadLength();
+ }
+
+ public long getInMemoryAvailableSpace() {
+ return inputBuffer.getMaximumAvailableSpace();
+ }
+
+ public boolean isEmpty() {
+ return inputBuffer.isEmpty();
+ }
+}