killbill-aplcache

usage: add file backed buffer Signed-off-by: Pierre-Alexandre

7/28/2012 6:41:53 PM

Details

pom.xml 10(+10 -0)

diff --git a/pom.xml b/pom.xml
index d323ef7..03f99e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -222,6 +222,11 @@
                 <version>2.0.0</version>
             </dependency>
             <dependency>
+                <groupId>com.fasterxml.jackson.dataformat</groupId>
+                <artifactId>jackson-dataformat-smile</artifactId>
+                <version>2.0.1</version>
+            </dependency>
+            <dependency>
                 <groupId>com.fasterxml.jackson.datatype</groupId>
                 <artifactId>jackson-datatype-joda</artifactId>
                 <version>2.0.1</version>
@@ -232,6 +237,11 @@
                 <version>2.0.0</version>
             </dependency>
             <dependency>
+                <groupId>com.fasterxml.util</groupId>
+                <artifactId>low-gc-membuffers</artifactId>
+                <version>0.9.0</version>
+            </dependency>
+            <dependency>
                 <groupId>com.google.guava</groupId>
                 <artifactId>guava</artifactId>
                 <version>12.0</version>

usage/pom.xml 16(+16 -0)

diff --git a/usage/pom.xml b/usage/pom.xml
index 1c1183a..f97c4fa 100644
--- a/usage/pom.xml
+++ b/usage/pom.xml
@@ -26,6 +26,22 @@
             <artifactId>jdbi</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-joda</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-smile</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.util</groupId>
+            <artifactId>low-gc-membuffers</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.google.inject</groupId>
             <artifactId>guice</artifactId>
             <scope>provided</scope>
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/persistent/FileBackedBuffer.java b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/FileBackedBuffer.java
new file mode 100644
index 0000000..9040b0d
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/FileBackedBuffer.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.usage.timeline.sources.SourceSamplesForTimestamp;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
+import com.fasterxml.util.membuf.MemBuffersForBytes;
+import com.fasterxml.util.membuf.StreamyBytesMemBuffer;
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Backing buffer for a single TimelineSourceEventAccumulator that spools to disk
+ */
+public class FileBackedBuffer {
+
+    private static final Logger log = LoggerFactory.getLogger(FileBackedBuffer.class);
+
+    private static final SmileFactory smileFactory = new SmileFactory();
+    private static final ObjectMapper smileObjectMapper = new ObjectMapper(smileFactory);
+
+    static {
+        // Disable all magic for now as we don't write the Smile header (we share the same smileGenerator
+        // across multiple backend files)
+        smileFactory.configure(SmileGenerator.Feature.CHECK_SHARED_NAMES, false);
+        smileFactory.configure(SmileGenerator.Feature.CHECK_SHARED_STRING_VALUES, false);
+    }
+
+    private final String basePath;
+    private final String prefix;
+    private final boolean deleteFilesOnClose;
+    private final AtomicLong samplesforTimestampWritten = new AtomicLong();
+    private final Object recyclingMonitor = new Object();
+
+    private final StreamyBytesMemBuffer inputBuffer;
+    private StreamyBytesPersistentOutputStream out = null;
+    private SmileGenerator smileGenerator;
+
+    public FileBackedBuffer(final String basePath, final String prefix, final int segmentsSize, final int maxNbSegments) throws IOException {
+        this(basePath, prefix, true, segmentsSize, maxNbSegments);
+    }
+
+    public FileBackedBuffer(final String basePath, final String prefix, final boolean deleteFilesOnClose, final int segmentsSize, final int maxNbSegments) throws IOException {
+        this.basePath = basePath;
+        this.prefix = prefix;
+        this.deleteFilesOnClose = deleteFilesOnClose;
+
+        final MemBuffersForBytes bufs = new MemBuffersForBytes(segmentsSize, 1, maxNbSegments);
+        inputBuffer = bufs.createStreamyBuffer(1, maxNbSegments);
+
+        recycle();
+    }
+
+    public boolean append(final SourceSamplesForTimestamp sourceSamplesForTimestamp) {
+        try {
+            synchronized (recyclingMonitor) {
+                smileObjectMapper.writeValue(smileGenerator, sourceSamplesForTimestamp);
+                samplesforTimestampWritten.incrementAndGet();
+                return true;
+            }
+        } catch (IOException e) {
+            log.warn("Unable to backup samples", e);
+            return false;
+        }
+    }
+
+    /**
+     * Discard in-memory and on-disk data
+     */
+    public void discard() {
+        try {
+            recycle();
+        } catch (IOException e) {
+            log.warn("Exception discarding buffer", e);
+        }
+    }
+
+    private void recycle() throws IOException {
+        synchronized (recyclingMonitor) {
+            if (out != null && !out.isEmpty()) {
+                out.close();
+            }
+
+            out = new StreamyBytesPersistentOutputStream(basePath, prefix, inputBuffer, deleteFilesOnClose);
+            smileGenerator = smileFactory.createJsonGenerator(out, JsonEncoding.UTF8);
+            // Drop the Smile header
+            smileGenerator.flush();
+            out.reset();
+
+            samplesforTimestampWritten.set(0);
+        }
+    }
+
+    //@MonitorableManaged(description = "Return the approximate size of bytes on disk for samples not yet in the database", monitored = true, monitoringType = {MonitoringType.VALUE})
+    public long getBytesOnDisk() {
+        return out.getBytesOnDisk();
+    }
+
+    //@MonitorableManaged(description = "Return the approximate size of bytes in memory for samples not yet in the database", monitored = true, monitoringType = {MonitoringType.VALUE})
+    public long getBytesInMemory() {
+        return out.getBytesInMemory();
+    }
+
+    //@MonitorableManaged(description = "Return the approximate size of bytes available in memory (before spilling over to disk) for samples not yet in the database", monitored = true, monitoringType = {MonitoringType.VALUE})
+    public long getInMemoryAvailableSpace() {
+        return out.getInMemoryAvailableSpace();
+    }
+
+    @VisibleForTesting
+    public long getFilesCreated() {
+        return out.getCreatedFiles().size();
+    }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/persistent/Replayer.java b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/Replayer.java
new file mode 100644
index 0000000..5896216
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/Replayer.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.usage.timeline.sources.SourceSamplesForTimestamp;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.fasterxml.jackson.dataformat.smile.SmileParser;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+
+public class Replayer {
+
+    private static final Logger log = LoggerFactory.getLogger(Replayer.class);
+    private static final SmileFactory smileFactory = new SmileFactory();
+    private static final ObjectMapper smileMapper = new ObjectMapper(smileFactory);
+
+    static {
+        smileFactory.configure(SmileParser.Feature.REQUIRE_HEADER, false);
+        smileFactory.setCodec(smileMapper);
+    }
+
+    @VisibleForTesting
+    public static final Ordering<File> FILE_ORDERING = new Ordering<File>() {
+        @Override
+        public int compare(@Nullable final File left, @Nullable final File right) {
+            if (left == null || right == null) {
+                throw new NullPointerException();
+            }
+
+            // Order by the nano time
+            return left.getAbsolutePath().compareTo(right.getAbsolutePath());
+        }
+    };
+
+    private final String path;
+    private AtomicBoolean shuttingDown = new AtomicBoolean();
+
+    public Replayer(final String path) {
+        this.path = path;
+    }
+
+    // This method is only used by test code
+    public List<SourceSamplesForTimestamp> readAll() {
+        final List<SourceSamplesForTimestamp> samples = new ArrayList<SourceSamplesForTimestamp>();
+
+        readAll(true, null, new Function<SourceSamplesForTimestamp, Void>() {
+            @Override
+            public Void apply(@Nullable final SourceSamplesForTimestamp input) {
+                if (input != null) {
+                    samples.add(input);
+                }
+                return null;
+            }
+        });
+
+        return samples;
+    }
+
+    public void initiateShutdown() {
+        shuttingDown.set(true);
+    }
+
+    public int readAll(final boolean deleteFiles, @Nullable final DateTime minStartTime, final Function<SourceSamplesForTimestamp, Void> fn) {
+        final Collection<File> files = ImmutableList.<File>copyOf(findCandidates());
+        int filesSkipped = 0;
+        for (final File file : FILE_ORDERING.sortedCopy(files)) {
+            try {
+                // Skip files whose last modification date is is earlier than the first start time.
+                if (minStartTime != null && file.lastModified() < minStartTime.getMillis()) {
+                    filesSkipped++;
+                    continue;
+                }
+                read(file, fn);
+                if (shuttingDown.get()) {
+                    break;
+                }
+
+                if (deleteFiles) {
+                    if (!file.delete()) {
+                        log.warn("Unable to delete file: {}", file.getAbsolutePath());
+                    }
+                }
+            } catch (IOException e) {
+                log.warn("Exception replaying file: {}", file.getAbsolutePath(), e);
+            }
+        }
+        return filesSkipped;
+    }
+
+    @VisibleForTesting
+    public void read(final File file, final Function<SourceSamplesForTimestamp, Void> fn) throws IOException {
+        final JsonParser smileParser = smileFactory.createJsonParser(file);
+        if (smileParser.nextToken() != JsonToken.START_ARRAY) {
+            return;
+        }
+
+        while (!shuttingDown.get() && smileParser.nextToken() != JsonToken.END_ARRAY) {
+            final SourceSamplesForTimestamp sourceSamplesForTimestamp = smileParser.readValueAs(SourceSamplesForTimestamp.class);
+            fn.apply(sourceSamplesForTimestamp);
+        }
+
+        smileParser.close();
+    }
+
+
+    public void purgeOldFiles(final DateTime purgeIfOlderDate) {
+        final File[] candidates = findCandidates();
+        for (final File file : candidates) {
+            if (file.lastModified() <= purgeIfOlderDate.getMillis()) {
+                if (!file.delete()) {
+                    log.warn("Unable to delete file: {}", file.getAbsolutePath());
+                }
+            }
+        }
+    }
+
+    private File[] findCandidates() {
+        final File root = new File(path);
+        final FilenameFilter filter = new FilenameFilter() {
+            @Override
+            public boolean accept(final File file, final String s) {
+                return s.endsWith("bin");
+            }
+        };
+
+        return root.listFiles(filter);
+    }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/persistent/StreamyBytesPersistentOutputStream.java b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/StreamyBytesPersistentOutputStream.java
new file mode 100644
index 0000000..1256583
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/StreamyBytesPersistentOutputStream.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.dataformat.smile.SmileConstants;
+import com.fasterxml.util.membuf.StreamyBytesMemBuffer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.Files;
+
+public class StreamyBytesPersistentOutputStream extends OutputStream {
+
+    private static final Logger log = LoggerFactory.getLogger(StreamyBytesPersistentOutputStream.class);
+    private static final int BUF_SIZE = 0x1000; // 4K
+
+    private final String basePath;
+    private final String prefix;
+    private final StreamyBytesMemBuffer inputBuffer;
+    private final boolean deleteFilesOnClose;
+    private final List<String> createdFiles = new ArrayList<String>();
+
+    private long bytesOnDisk = 0L;
+
+    public StreamyBytesPersistentOutputStream(String basePath, final String prefix, final StreamyBytesMemBuffer inputBuffer, final boolean deleteFilesOnClose) {
+        if (!basePath.endsWith("/")) {
+            basePath += "/";
+        }
+        this.basePath = basePath;
+
+        this.prefix = prefix;
+        this.inputBuffer = inputBuffer;
+        this.deleteFilesOnClose = deleteFilesOnClose;
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        final byte data = (byte) b;
+        write(new byte[]{data}, 0, 1);
+    }
+
+    @Override
+    public void write(final byte[] data, final int off, final int len) throws IOException {
+        if (!inputBuffer.tryAppend(data, off, len)) {
+            // Buffer full - need to flush
+            flushUnderlyingBufferAndReset();
+
+            if (!inputBuffer.tryAppend(data, off, len)) {
+                log.warn("Unable to append data: 1 byte lost");
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Cleanup volatile data
+        inputBuffer.clear();
+
+        // Cleanup persistent data
+        if (deleteFilesOnClose) {
+            for (final String path : createdFiles) {
+                log.info("Discarding file: {}", path);
+                if (!new File(path).delete()) {
+                    log.warn("Unable to discard file: {}", path);
+                }
+            }
+        }
+    }
+
+    private void flushUnderlyingBufferAndReset() {
+        synchronized (inputBuffer) {
+            if (inputBuffer.available() == 0) {
+                // Somebody beat you to it
+                return;
+            }
+
+            final String pathname = getFileName();
+            createdFiles.add(pathname);
+            log.debug("Flushing in-memory buffer to disk: {}", pathname);
+
+            try {
+                final File out = new File(pathname);
+                flushToFile(out);
+            } catch (IOException e) {
+                log.warn("Error flushing data", e);
+            } finally {
+                reset();
+            }
+        }
+    }
+
+    @VisibleForTesting
+    String getFileName() {
+        return basePath + "arecibo." + prefix + "." + System.nanoTime() + ".bin";
+    }
+
+    private void flushToFile(final File out) throws IOException {
+        final byte[] buf = new byte[BUF_SIZE];
+        FileOutputStream transfer = null;
+
+        int bytesTransferred = 0;
+        try {
+            transfer = Files.newOutputStreamSupplier(out).getOutput();
+
+            while (true) {
+                final int r = inputBuffer.readIfAvailable(buf);
+                if (r == 0) {
+                    break;
+                }
+                transfer.write(buf, 0, r);
+                bytesTransferred += r;
+            }
+        } finally {
+            if (transfer != null) {
+                try {
+                    transfer.write(SmileConstants.TOKEN_LITERAL_END_ARRAY);
+                    bytesTransferred++;
+                    bytesOnDisk += bytesTransferred;
+                } finally {
+                    transfer.flush();
+                }
+            }
+        }
+        log.debug("Saved {} bytes to disk", bytesTransferred);
+    }
+
+    public void reset() {
+        inputBuffer.clear();
+        try {
+            write(SmileConstants.TOKEN_LITERAL_START_ARRAY);
+        } catch (IOException e) {
+            // Not sure how to recover?
+        }
+    }
+
+    public List<String> getCreatedFiles() {
+        return createdFiles;
+    }
+
+    public long getBytesOnDisk() {
+        return bytesOnDisk;
+    }
+
+    public long getBytesInMemory() {
+        return inputBuffer.getTotalPayloadLength();
+    }
+
+    public long getInMemoryAvailableSpace() {
+        return inputBuffer.getMaximumAvailableSpace();
+    }
+
+    public boolean isEmpty() {
+        return inputBuffer.isEmpty();
+    }
+}