/*
* Copyright 2012 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.viewer.hdfs;
import azkaban.security.commons.HadoopSecurityManager;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.SequenceFile.ValueBytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableName;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
/**
* Our forked version of hadoop sequence file.
* it is to deal with sequence file possibly keep open connections and snap up
* ports
*/
@SuppressWarnings("deprecation")
public class AzkabanSequenceFileReader {
private final static Logger LOG = Logger
.getLogger(HadoopSecurityManager.class);
private static final byte BLOCK_COMPRESS_VERSION = (byte) 4;
private static final byte CUSTOM_COMPRESS_VERSION = (byte) 5;
private static final byte VERSION_WITH_METADATA = (byte) 6;
private static final int SYNC_ESCAPE = -1; // "length" of sync entries
private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash
/** The number of bytes between sync points. */
public static final int SYNC_INTERVAL = 100 * SYNC_SIZE;
private static final byte[] VERSION = new byte[]{(byte) 'S', (byte) 'E',
(byte) 'Q', VERSION_WITH_METADATA};
private AzkabanSequenceFileReader() {
} // no public ctor
/**
* The compression type used to compress key/value pairs in the
* {@link AzkabanSequenceFileReader}.
*
* @see AzkabanSequenceFileReader.Writer
*/
public static enum CompressionType {
/** Do not compress records. */
NONE,
/** Compress values only, each separately. */
RECORD,
/** Compress sequences of records together in blocks. */
BLOCK
}
private static class UncompressedBytes implements ValueBytes {
private int dataSize;
private byte[] data;
private UncompressedBytes() {
this.data = null;
this.dataSize = 0;
}
private void reset(final DataInputStream in, final int length) throws IOException {
this.data = new byte[length];
this.dataSize = -1;
in.readFully(this.data);
this.dataSize = this.data.length;
}
@Override
public int getSize() {
return this.dataSize;
}
@Override
public void writeUncompressedBytes(final DataOutputStream outStream)
throws IOException {
outStream.write(this.data, 0, this.dataSize);
}
@Override
public void writeCompressedBytes(final DataOutputStream outStream)
throws IllegalArgumentException, IOException {
throw new IllegalArgumentException(
"UncompressedBytes cannot be compressed!");
}
} // UncompressedBytes
private static class CompressedBytes implements ValueBytes {
DataInputBuffer rawData = null;
CompressionCodec codec = null;
CompressionInputStream decompressedStream = null;
private int dataSize;
private byte[] data;
private CompressedBytes(final CompressionCodec codec) {
this.data = null;
this.dataSize = 0;
this.codec = codec;
}
private void reset(final DataInputStream in, final int length) throws IOException {
this.data = new byte[length];
this.dataSize = -1;
in.readFully(this.data);
this.dataSize = this.data.length;
}
@Override
public int getSize() {
return this.dataSize;
}
@Override
public void writeUncompressedBytes(final DataOutputStream outStream)
throws IOException {
if (this.decompressedStream == null) {
this.rawData = new DataInputBuffer();
this.decompressedStream = this.codec.createInputStream(this.rawData);
} else {
this.decompressedStream.resetState();
}
this.rawData.reset(this.data, 0, this.dataSize);
final byte[] buffer = new byte[8192];
int bytesRead = 0;
while ((bytesRead = this.decompressedStream.read(buffer, 0, 8192)) != -1) {
outStream.write(buffer, 0, bytesRead);
}
}
@Override
public void writeCompressedBytes(final DataOutputStream outStream)
throws IllegalArgumentException, IOException {
outStream.write(this.data, 0, this.dataSize);
}
} // CompressedBytes
/** Reads key/value pairs from a sequence-format file. */
public static class Reader implements Closeable {
private final Path file;
private final DataOutputBuffer outBuf = new DataOutputBuffer();
private final byte[] sync = new byte[SYNC_HASH_SIZE];
private final byte[] syncCheck = new byte[SYNC_HASH_SIZE];
private final long end;
private final Configuration conf;
private final boolean lazyDecompress = true;
private FSDataInputStream in;
private byte version;
private String keyClassName;
private String valClassName;
@SuppressWarnings("rawtypes")
private Class keyClass;
@SuppressWarnings("rawtypes")
private Class valClass;
private CompressionCodec codec = null;
private Metadata metadata = null;
private boolean syncSeen;
private int keyLength;
private int recordLength;
private boolean decompress;
private boolean blockCompressed;
private int noBufferedRecords = 0;
private boolean valuesDecompressed = true;
private int noBufferedKeys = 0;
private int noBufferedValues = 0;
private DataInputBuffer keyLenBuffer = null;
private CompressionInputStream keyLenInFilter = null;
private DataInputStream keyLenIn = null;
private Decompressor keyLenDecompressor = null;
private DataInputBuffer keyBuffer = null;
private CompressionInputStream keyInFilter = null;
private DataInputStream keyIn = null;
private Decompressor keyDecompressor = null;
private DataInputBuffer valLenBuffer = null;
private CompressionInputStream valLenInFilter = null;
private DataInputStream valLenIn = null;
private Decompressor valLenDecompressor = null;
private DataInputBuffer valBuffer = null;
private CompressionInputStream valInFilter = null;
private DataInputStream valIn = null;
private Decompressor valDecompressor = null;
@SuppressWarnings("rawtypes")
private Deserializer keyDeserializer;
@SuppressWarnings("rawtypes")
private Deserializer valDeserializer;
/** Open the named file. */
public Reader(final FileSystem fs, final Path file, final Configuration conf)
throws IOException {
this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false);
}
private Reader(final FileSystem fs, final Path file, final int bufferSize,
final Configuration conf, final boolean tempReader) throws IOException {
this(fs, file, bufferSize, 0, fs.getLength(file), conf, tempReader);
}
private Reader(final FileSystem fs, final Path file, final int bufferSize, final long start,
final long length, final Configuration conf, final boolean tempReader) throws IOException {
this.file = file;
try {
this.in = openFile(fs, file, bufferSize, length);
this.conf = conf;
seek(start);
this.end = this.in.getPos() + length;
init(tempReader);
} catch (final IOException e) {
if (this.in != null) {
this.in.close();
}
throw e;
}
}
/**
* Override this method to specialize the type of {@link FSDataInputStream}
* returned.
*/
protected FSDataInputStream openFile(final FileSystem fs, final Path file,
final int bufferSize, final long length) throws IOException {
return fs.open(file, bufferSize);
}
/**
* Initialize the {@link Reader}
*
* @param tmpReader <code>true</code> if we are constructing a temporary
* reader
* {@link AzkabanSequenceFileReader.Sorter.cloneFileAttributes},
* and hence do not initialize every component; <code>false</code>
* otherwise.
* @throws IOException
*/
private void init(final boolean tempReader) throws IOException {
final byte[] versionBlock = new byte[VERSION.length];
this.in.readFully(versionBlock);
if ((versionBlock[0] != VERSION[0]) || (versionBlock[1] != VERSION[1])
|| (versionBlock[2] != VERSION[2])) {
throw new IOException(this.file + " not a SequenceFile");
}
// Set 'version'
this.version = versionBlock[3];
if (this.version > VERSION[3]) {
throw new VersionMismatchException(VERSION[3], this.version);
}
if (this.version < BLOCK_COMPRESS_VERSION) {
final UTF8 className = new UTF8();
className.readFields(this.in);
this.keyClassName = className.toString(); // key class name
className.readFields(this.in);
this.valClassName = className.toString(); // val class name
} else {
this.keyClassName = Text.readString(this.in);
this.valClassName = Text.readString(this.in);
}
if (this.version > 2) { // if version > 2
this.decompress = this.in.readBoolean(); // is compressed?
} else {
this.decompress = false;
}
if (this.version >= BLOCK_COMPRESS_VERSION) { // if version >= 4
this.blockCompressed = this.in.readBoolean(); // is block-compressed?
} else {
this.blockCompressed = false;
}
// if version >= 5
// setup the compression codec
if (this.decompress) {
if (this.version >= CUSTOM_COMPRESS_VERSION) {
final String codecClassname = Text.readString(this.in);
try {
final Class<? extends CompressionCodec> codecClass =
this.conf.getClassByName(codecClassname).asSubclass(
CompressionCodec.class);
this.codec = ReflectionUtils.newInstance(codecClass, this.conf);
} catch (final ClassNotFoundException cnfe) {
throw new IllegalArgumentException("Unknown codec: "
+ codecClassname, cnfe);
}
} else {
this.codec = new DefaultCodec();
((Configurable) this.codec).setConf(this.conf);
}
}
this.metadata = new Metadata();
if (this.version >= VERSION_WITH_METADATA) { // if version >= 6
this.metadata.readFields(this.in);
}
if (this.version > 1) { // if version > 1
this.in.readFully(this.sync); // read sync bytes
}
// Initialize... *not* if this we are constructing a temporary Reader
if (!tempReader) {
this.valBuffer = new DataInputBuffer();
if (this.decompress) {
this.valDecompressor = CodecPool.getDecompressor(this.codec);
this.valInFilter = this.codec.createInputStream(this.valBuffer, this.valDecompressor);
this.valIn = new DataInputStream(this.valInFilter);
} else {
this.valIn = this.valBuffer;
}
if (this.blockCompressed) {
this.keyLenBuffer = new DataInputBuffer();
this.keyBuffer = new DataInputBuffer();
this.valLenBuffer = new DataInputBuffer();
this.keyLenDecompressor = CodecPool.getDecompressor(this.codec);
this.keyLenInFilter =
this.codec.createInputStream(this.keyLenBuffer, this.keyLenDecompressor);
this.keyLenIn = new DataInputStream(this.keyLenInFilter);
this.keyDecompressor = CodecPool.getDecompressor(this.codec);
this.keyInFilter = this.codec.createInputStream(this.keyBuffer, this.keyDecompressor);
this.keyIn = new DataInputStream(this.keyInFilter);
this.valLenDecompressor = CodecPool.getDecompressor(this.codec);
this.valLenInFilter =
this.codec.createInputStream(this.valLenBuffer, this.valLenDecompressor);
this.valLenIn = new DataInputStream(this.valLenInFilter);
}
final SerializationFactory serializationFactory =
new SerializationFactory(this.conf);
this.keyDeserializer =
getDeserializer(serializationFactory, getKeyClass());
if (!this.blockCompressed) {
this.keyDeserializer.open(this.valBuffer);
} else {
this.keyDeserializer.open(this.keyIn);
}
this.valDeserializer =
getDeserializer(serializationFactory, getValueClass());
this.valDeserializer.open(this.valIn);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private Deserializer getDeserializer(final SerializationFactory sf, final Class c) {
return sf.getDeserializer(c);
}
/** Close the file. */
@Override
public synchronized void close() throws IOException {
// Return the decompressors to the pool
CodecPool.returnDecompressor(this.keyLenDecompressor);
CodecPool.returnDecompressor(this.keyDecompressor);
CodecPool.returnDecompressor(this.valLenDecompressor);
CodecPool.returnDecompressor(this.valDecompressor);
this.keyLenDecompressor = this.keyDecompressor = null;
this.valLenDecompressor = this.valDecompressor = null;
if (this.keyDeserializer != null) {
this.keyDeserializer.close();
}
if (this.valDeserializer != null) {
this.valDeserializer.close();
}
// Close the input-stream
if (this.in != null) {
this.in.close();
}
}
/** Returns the name of the key class. */
public String getKeyClassName() {
return this.keyClassName;
}
/** Returns the class of keys in this file. */
public synchronized Class<?> getKeyClass() {
if (null == this.keyClass) {
try {
this.keyClass = WritableName.getClass(getKeyClassName(), this.conf);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
return this.keyClass;
}
/** Returns the name of the value class. */
public String getValueClassName() {
return this.valClassName;
}
/** Returns the class of values in this file. */
public synchronized Class<?> getValueClass() {
if (null == this.valClass) {
try {
this.valClass = WritableName.getClass(getValueClassName(), this.conf);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
return this.valClass;
}
/** Returns true if values are compressed. */
public boolean isCompressed() {
return this.decompress;
}
/** Returns true if records are block-compressed. */
public boolean isBlockCompressed() {
return this.blockCompressed;
}
/** Returns the compression codec of data in this file. */
public CompressionCodec getCompressionCodec() {
return this.codec;
}
/** Returns the metadata object of the file */
public Metadata getMetadata() {
return this.metadata;
}
/** Returns the configuration used for this file. */
Configuration getConf() {
return this.conf;
}
/** Read a compressed buffer */
private synchronized void readBuffer(final DataInputBuffer buffer,
final CompressionInputStream filter) throws IOException {
// Read data into a temporary buffer
final DataOutputBuffer dataBuffer = new DataOutputBuffer();
try {
final int dataBufferLength = WritableUtils.readVInt(this.in);
dataBuffer.write(this.in, dataBufferLength);
// Set up 'buffer' connected to the input-stream
buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
} finally {
dataBuffer.close();
}
// Reset the codec
filter.resetState();
}
/** Read the next 'compressed' block */
private synchronized void readBlock() throws IOException {
// Check if we need to throw away a whole block of
// 'values' due to 'lazy decompression'
if (this.lazyDecompress && !this.valuesDecompressed) {
this.in.seek(WritableUtils.readVInt(this.in) + this.in.getPos());
this.in.seek(WritableUtils.readVInt(this.in) + this.in.getPos());
}
// Reset internal states
this.noBufferedKeys = 0;
this.noBufferedValues = 0;
this.noBufferedRecords = 0;
this.valuesDecompressed = false;
// Process sync
if (this.sync != null) {
this.in.readInt();
this.in.readFully(this.syncCheck); // read syncCheck
if (!Arrays.equals(this.sync, this.syncCheck)) // check it
throw new IOException("File is corrupt!");
}
this.syncSeen = true;
// Read number of records in this block
this.noBufferedRecords = WritableUtils.readVInt(this.in);
// Read key lengths and keys
readBuffer(this.keyLenBuffer, this.keyLenInFilter);
readBuffer(this.keyBuffer, this.keyInFilter);
this.noBufferedKeys = this.noBufferedRecords;
// Read value lengths and values
if (!this.lazyDecompress) {
readBuffer(this.valLenBuffer, this.valLenInFilter);
readBuffer(this.valBuffer, this.valInFilter);
this.noBufferedValues = this.noBufferedRecords;
this.valuesDecompressed = true;
}
}
/**
* Position valLenIn/valIn to the 'value' corresponding to the 'current' key
*/
private synchronized void seekToCurrentValue() throws IOException {
if (!this.blockCompressed) {
if (this.decompress) {
this.valInFilter.resetState();
}
this.valBuffer.reset();
} else {
// Check if this is the first value in the 'block' to be read
if (this.lazyDecompress && !this.valuesDecompressed) {
// Read the value lengths and values
readBuffer(this.valLenBuffer, this.valLenInFilter);
readBuffer(this.valBuffer, this.valInFilter);
this.noBufferedValues = this.noBufferedRecords;
this.valuesDecompressed = true;
}
// Calculate the no. of bytes to skip
// Note: 'current' key has already been read!
int skipValBytes = 0;
final int currentKey = this.noBufferedKeys + 1;
for (int i = this.noBufferedValues; i > currentKey; --i) {
skipValBytes += WritableUtils.readVInt(this.valLenIn);
--this.noBufferedValues;
}
// Skip to the 'val' corresponding to 'current' key
if (skipValBytes > 0) {
if (this.valIn.skipBytes(skipValBytes) != skipValBytes) {
throw new IOException("Failed to seek to " + currentKey
+ "(th) value!");
}
}
}
}
/**
* Get the 'value' corresponding to the last read 'key'.
*
* @param val : The 'value' to be read.
* @throws IOException
*/
public synchronized void getCurrentValue(final Writable val) throws IOException {
if (val instanceof Configurable) {
((Configurable) val).setConf(this.conf);
}
// Position stream to 'current' value
seekToCurrentValue();
if (!this.blockCompressed) {
val.readFields(this.valIn);
if (this.valIn.read() > 0) {
LOG.info("available bytes: " + this.valIn.available());
throw new IOException(val + " read "
+ (this.valBuffer.getPosition() - this.keyLength) + " bytes, should read "
+ (this.valBuffer.getLength() - this.keyLength));
}
} else {
// Get the value
final int valLength = WritableUtils.readVInt(this.valLenIn);
val.readFields(this.valIn);
// Read another compressed 'value'
--this.noBufferedValues;
// Sanity check
if (valLength < 0) {
LOG.debug(val + " is a zero-length value");
}
}
}
/**
* Get the 'value' corresponding to the last read 'key'.
*
* @param val : The 'value' to be read.
* @throws IOException
*/
public synchronized Object getCurrentValue(Object val) throws IOException {
if (val instanceof Configurable) {
((Configurable) val).setConf(this.conf);
}
// Position stream to 'current' value
seekToCurrentValue();
if (!this.blockCompressed) {
val = deserializeValue(val);
if (this.valIn.read() > 0) {
LOG.info("available bytes: " + this.valIn.available());
throw new IOException(val + " read "
+ (this.valBuffer.getPosition() - this.keyLength) + " bytes, should read "
+ (this.valBuffer.getLength() - this.keyLength));
}
} else {
// Get the value
final int valLength = WritableUtils.readVInt(this.valLenIn);
val = deserializeValue(val);
// Read another compressed 'value'
--this.noBufferedValues;
// Sanity check
if (valLength < 0) {
LOG.debug(val + " is a zero-length value");
}
}
return val;
}
@SuppressWarnings("unchecked")
private Object deserializeValue(final Object val) throws IOException {
return this.valDeserializer.deserialize(val);
}
/**
* Read the next key in the file into <code>key</code>, skipping its value.
* True if another entry exists, and false at end of file.
*/
public synchronized boolean next(final Writable key) throws IOException {
if (key.getClass() != getKeyClass())
throw new IOException("wrong key class: " + key.getClass().getName()
+ " is not " + this.keyClass);
if (!this.blockCompressed) {
this.outBuf.reset();
this.keyLength = next(this.outBuf);
if (this.keyLength < 0)
return false;
this.valBuffer.reset(this.outBuf.getData(), this.outBuf.getLength());
key.readFields(this.valBuffer);
this.valBuffer.mark(0);
if (this.valBuffer.getPosition() != this.keyLength) {
throw new IOException(key + " read " + this.valBuffer.getPosition()
+ " bytes, should read " + this.keyLength);
}
} else {
// Reset syncSeen
this.syncSeen = false;
if (this.noBufferedKeys == 0) {
try {
readBlock();
} catch (final EOFException eof) {
return false;
}
}
final int keyLength = WritableUtils.readVInt(this.keyLenIn);
// Sanity check
if (keyLength < 0) {
return false;
}
// Read another compressed 'key'
key.readFields(this.keyIn);
--this.noBufferedKeys;
}
return true;
}
/**
* Read the next key/value pair in the file into <code>key</code> and
* <code>val</code>. Returns true if such a pair exists and false when at
* end of file
*/
public synchronized boolean next(final Writable key, final Writable val)
throws IOException {
if (val.getClass() != getValueClass())
throw new IOException("wrong value class: " + val + " is not "
+ this.valClass);
final boolean more = next(key);
if (more) {
getCurrentValue(val);
}
return more;
}
/**
* Read and return the next record length, potentially skipping over a sync
* block.
*
* @return the length of the next record or -1 if there is no next record
* @throws IOException
*/
private synchronized int readRecordLength() throws IOException {
if (this.in.getPos() >= this.end) {
return -1;
}
int length = this.in.readInt();
if (this.version > 1 && this.sync != null && length == SYNC_ESCAPE) { // process a
// sync entry
this.in.readFully(this.syncCheck); // read syncCheck
if (!Arrays.equals(this.sync, this.syncCheck)) // check it
throw new IOException("File is corrupt!");
this.syncSeen = true;
if (this.in.getPos() >= this.end) {
return -1;
}
length = this.in.readInt(); // re-read length
} else {
this.syncSeen = false;
}
return length;
}
/**
* Read the next key/value pair in the file into <code>buffer</code>.
* Returns the length of the key read, or -1 if at end of file. The length
* of the value may be computed by calling buffer.getLength() before and
* after calls to this method.
*/
/**
* @deprecated Call
* {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}.
*/
public synchronized int next(final DataOutputBuffer buffer) throws IOException {
// Unsupported for block-compressed sequence files
if (this.blockCompressed) {
throw new IOException(
"Unsupported call for block-compressed"
+ " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
}
try {
final int length = readRecordLength();
if (length == -1) {
return -1;
}
final int keyLength = this.in.readInt();
buffer.write(this.in, length);
return keyLength;
} catch (final ChecksumException e) { // checksum failure
handleChecksumException(e);
return next(buffer);
}
}
public ValueBytes createValueBytes() {
ValueBytes val = null;
if (!this.decompress || this.blockCompressed) {
val = new UncompressedBytes();
} else {
val = new CompressedBytes(this.codec);
}
return val;
}
/**
* Read 'raw' records.
*
* @param key - The buffer into which the key is read
* @param val - The 'raw' value
* @return Returns the total record length or -1 for end of file
* @throws IOException
*/
public synchronized int nextRaw(final DataOutputBuffer key, final ValueBytes val)
throws IOException {
if (!this.blockCompressed) {
final int length = readRecordLength();
if (length == -1) {
return -1;
}
final int keyLength = this.in.readInt();
final int valLength = length - keyLength;
key.write(this.in, keyLength);
if (this.decompress) {
final CompressedBytes value = (CompressedBytes) val;
value.reset(this.in, valLength);
} else {
final UncompressedBytes value = (UncompressedBytes) val;
value.reset(this.in, valLength);
}
return length;
} else {
// Reset syncSeen
this.syncSeen = false;
// Read 'key'
if (this.noBufferedKeys == 0) {
if (this.in.getPos() >= this.end)
return -1;
try {
readBlock();
} catch (final EOFException eof) {
return -1;
}
}
final int keyLength = WritableUtils.readVInt(this.keyLenIn);
if (keyLength < 0) {
throw new IOException("zero length key found!");
}
key.write(this.keyIn, keyLength);
--this.noBufferedKeys;
// Read raw 'value'
seekToCurrentValue();
final int valLength = WritableUtils.readVInt(this.valLenIn);
final UncompressedBytes rawValue = (UncompressedBytes) val;
rawValue.reset(this.valIn, valLength);
--this.noBufferedValues;
return (keyLength + valLength);
}
}
/**
* Read 'raw' keys.
*
* @param key - The buffer into which the key is read
* @return Returns the key length or -1 for end of file
* @throws IOException
*/
public int nextRawKey(final DataOutputBuffer key) throws IOException {
if (!this.blockCompressed) {
this.recordLength = readRecordLength();
if (this.recordLength == -1) {
return -1;
}
this.keyLength = this.in.readInt();
key.write(this.in, this.keyLength);
return this.keyLength;
} else {
// Reset syncSeen
this.syncSeen = false;
// Read 'key'
if (this.noBufferedKeys == 0) {
if (this.in.getPos() >= this.end)
return -1;
try {
readBlock();
} catch (final EOFException eof) {
return -1;
}
}
final int keyLength = WritableUtils.readVInt(this.keyLenIn);
if (keyLength < 0) {
throw new IOException("zero length key found!");
}
key.write(this.keyIn, keyLength);
--this.noBufferedKeys;
return keyLength;
}
}
/**
* Read the next key in the file, skipping its value. Return null at end of
* file.
*/
public synchronized Object next(Object key) throws IOException {
if (key != null && key.getClass() != getKeyClass()) {
throw new IOException("wrong key class: " + key.getClass().getName()
+ " is not " + this.keyClass);
}
if (!this.blockCompressed) {
this.outBuf.reset();
this.keyLength = next(this.outBuf);
if (this.keyLength < 0)
return null;
this.valBuffer.reset(this.outBuf.getData(), this.outBuf.getLength());
key = deserializeKey(key);
this.valBuffer.mark(0);
if (this.valBuffer.getPosition() != this.keyLength) {
throw new IOException(key + " read " + this.valBuffer.getPosition()
+ " bytes, should read " + this.keyLength);
}
} else {
// Reset syncSeen
this.syncSeen = false;
if (this.noBufferedKeys == 0) {
try {
readBlock();
} catch (final EOFException eof) {
return null;
}
}
final int keyLength = WritableUtils.readVInt(this.keyLenIn);
// Sanity check
if (keyLength < 0) {
return null;
}
// Read another compressed 'key'
key = deserializeKey(key);
--this.noBufferedKeys;
}
return key;
}
@SuppressWarnings("unchecked")
private Object deserializeKey(final Object key) throws IOException {
return this.keyDeserializer.deserialize(key);
}
/**
* Read 'raw' values.
*
* @param val - The 'raw' value
* @return Returns the value length
* @throws IOException
*/
public synchronized int nextRawValue(final ValueBytes val) throws IOException {
// Position stream to current value
seekToCurrentValue();
if (!this.blockCompressed) {
final int valLength = this.recordLength - this.keyLength;
if (this.decompress) {
final CompressedBytes value = (CompressedBytes) val;
value.reset(this.in, valLength);
} else {
final UncompressedBytes value = (UncompressedBytes) val;
value.reset(this.in, valLength);
}
return valLength;
} else {
final int valLength = WritableUtils.readVInt(this.valLenIn);
final UncompressedBytes rawValue = (UncompressedBytes) val;
rawValue.reset(this.valIn, valLength);
--this.noBufferedValues;
return valLength;
}
}
private void handleChecksumException(final ChecksumException e)
throws IOException {
if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
LOG.warn("Bad checksum at " + getPosition() + ". Skipping entries.");
sync(getPosition() + this.conf.getInt("io.bytes.per.checksum", 512));
} else {
throw e;
}
}
/**
* Set the current byte position in the input file.
*
* <p>
* The position passed must be a position returned by
* {@link AzkabanSequenceFileReader.Writer#getLength()} when writing this
* file. To seek to an arbitrary position, use
* {@link Reader#sync(long)}.
*/
public synchronized void seek(final long position) throws IOException {
this.in.seek(position);
if (this.blockCompressed) { // trigger block read
this.noBufferedKeys = 0;
this.valuesDecompressed = true;
}
}
/** Seek to the next sync mark past a given position. */
public synchronized void sync(final long position) throws IOException {
if (position + SYNC_SIZE >= this.end) {
seek(this.end);
return;
}
try {
seek(position + 4); // skip escape
this.in.readFully(this.syncCheck);
final int syncLen = this.sync.length;
for (int i = 0; this.in.getPos() < this.end; i++) {
int j = 0;
for (; j < syncLen; j++) {
if (this.sync[j] != this.syncCheck[(i + j) % syncLen])
break;
}
if (j == syncLen) {
this.in.seek(this.in.getPos() - SYNC_SIZE); // position before sync
return;
}
this.syncCheck[i % syncLen] = this.in.readByte();
}
} catch (final ChecksumException e) { // checksum failure
handleChecksumException(e);
}
}
/** Returns true iff the previous call to next passed a sync mark. */
public boolean syncSeen() {
return this.syncSeen;
}
/** Return the current byte position in the input file. */
public synchronized long getPosition() throws IOException {
return this.in.getPos();
}
@Override
/** Returns the name of the file. */
public String toString() {
return this.file.toString();
}
}
} // SequenceFile