azkaban-aplcache
Details
az-hdfs-viewer/build.gradle 51(+51 -0)
diff --git a/az-hdfs-viewer/build.gradle b/az-hdfs-viewer/build.gradle
new file mode 100644
index 0000000..359dbc8
--- /dev/null
+++ b/az-hdfs-viewer/build.gradle
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+apply plugin: 'distribution'
+
+dependencies {
+ compileOnly project(':az-core')
+ compileOnly project(":azkaban-common")
+ compileOnly project(":azkaban-web-server")
+ compileOnly project(":azkaban-hadoop-security-plugin")
+
+ compile deps.mongoDriver
+ compile deps.parquetAvro
+ compile deps.parquetBundle
+ compileOnly deps.hadoopCommon
+ compileOnly deps.hadoopMRClientCommon
+ compileOnly deps.hadoopMRClientCore
+
+ testCompile deps.hadoopCommon
+ compileOnly deps.hiveMetastore
+ compileOnly(deps.hiveExecCore) {
+ exclude group: 'org.pentaho', module: 'pentaho-aggdesigner-algorithm'
+ exclude group: 'eigenbase', module: 'eigenbase-properties'
+ }
+}
+
+distributions {
+ main {
+ contents {
+ from(jar) {
+ into 'lib'
+ }
+ from(configurations.runtime) {
+ into 'lib'
+ }
+ }
+ }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/AvroFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/AvroFileViewer.java
new file mode 100644
index 0000000..7b63c69
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/AvroFileViewer.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.log4j.Logger;
+
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+/**
+ * This class implements a viewer of avro files
+ *
+ * @author lguo
+ */
+public class AvroFileViewer extends HdfsFileViewer {
+
+ private static Logger logger = Logger.getLogger(AvroFileViewer.class);
+ // Will spend 5 seconds trying to pull data and then stop.
+ private static long STOP_TIME = 2000l;
+
+ private static final String VIEWER_NAME = "Avro";
+
+ @Override
+ public String getName() {
+ return VIEWER_NAME;
+ }
+
+ @Override
+ public Set<Capability> getCapabilities(FileSystem fs, Path path)
+ throws AccessControlException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("path:" + path.toUri().getPath());
+ }
+
+ DataFileStream<Object> avroDataStream = null;
+ try {
+ avroDataStream = getAvroDataStream(fs, path);
+ Schema schema = avroDataStream.getSchema();
+ return (schema != null) ? EnumSet.of(Capability.READ, Capability.SCHEMA)
+ : EnumSet.noneOf(Capability.class);
+ } catch (AccessControlException e) {
+ throw e;
+ } catch (IOException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(path.toUri().getPath() + " is not an avro file.");
+ logger.debug("Error in getting avro schema: ", e);
+ }
+ return EnumSet.noneOf(Capability.class);
+ } finally {
+ try {
+ if (avroDataStream != null) {
+ avroDataStream.close();
+ }
+ } catch (IOException e) {
+ logger.error(e);
+ }
+ }
+ }
+
+ @Override
+ public String getSchema(FileSystem fs, Path path) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("path:" + path.toUri().getPath());
+ }
+
+ DataFileStream<Object> avroDataStream = null;
+ try {
+ avroDataStream = getAvroDataStream(fs, path);
+ Schema schema = avroDataStream.getSchema();
+ return schema.toString(true);
+ } catch (IOException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(path.toUri().getPath() + " is not an avro file.");
+ logger.debug("Error in getting avro schema: ", e);
+ }
+ return null;
+ } finally {
+ try {
+ if (avroDataStream != null) {
+ avroDataStream.close();
+ }
+ } catch (IOException e) {
+ logger.error(e);
+ }
+ }
+ }
+
+ private DataFileStream<Object> getAvroDataStream(FileSystem fs, Path path)
+ throws IOException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("path:" + path.toUri().getPath());
+ }
+
+ GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
+ InputStream hdfsInputStream = null;
+ try {
+ hdfsInputStream = fs.open(path);
+ } catch (IOException e) {
+ if (hdfsInputStream != null) {
+ hdfsInputStream.close();
+ }
+ throw e;
+ }
+
+ DataFileStream<Object> avroDataFileStream = null;
+ try {
+ avroDataFileStream =
+ new DataFileStream<Object>(hdfsInputStream, avroReader);
+ } catch (IOException e) {
+ if (hdfsInputStream != null) {
+ hdfsInputStream.close();
+ }
+ throw e;
+ }
+
+ return avroDataFileStream;
+ }
+
+ @Override
+ public void displayFile(FileSystem fs, Path path, OutputStream outputStream,
+ int startLine, int endLine) throws IOException {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("display avro file:" + path.toUri().getPath());
+ }
+
+ DataFileStream<Object> avroDatastream = null;
+ JsonGenerator g = null;
+
+ try {
+ avroDatastream = getAvroDataStream(fs, path);
+ Schema schema = avroDatastream.getSchema();
+ DatumWriter<Object> avroWriter = new GenericDatumWriter<Object>(schema);
+
+ g = new JsonFactory().createJsonGenerator(
+ outputStream, JsonEncoding.UTF8);
+ g.useDefaultPrettyPrinter();
+ Encoder encoder = EncoderFactory.get().jsonEncoder(schema, g);
+
+ long endTime = System.currentTimeMillis() + STOP_TIME;
+ int lineno = 1; // line number starts from 1
+ while (avroDatastream.hasNext() && lineno <= endLine
+ && System.currentTimeMillis() <= endTime) {
+ Object datum = avroDatastream.next();
+ if (lineno >= startLine) {
+ String record = "\n\n Record " + lineno + ":\n";
+ outputStream.write(record.getBytes("UTF-8"));
+ avroWriter.write(datum, encoder);
+ encoder.flush();
+ }
+ lineno++;
+ }
+ } catch (IOException e) {
+ outputStream.write(("Error in display avro file: " + e
+ .getLocalizedMessage()).getBytes("UTF-8"));
+ throw e;
+ } finally {
+ if (g != null) {
+ g.close();
+ }
+ avroDatastream.close();
+ }
+ }
+
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/AzkabanSequenceFileReader.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/AzkabanSequenceFileReader.java
new file mode 100644
index 0000000..c1bf7a7
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/AzkabanSequenceFileReader.java
@@ -0,0 +1,1075 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import azkaban.security.commons.HadoopSecurityManager;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile.Metadata;
+import org.apache.hadoop.io.SequenceFile.ValueBytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableName;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Our forked version of hadoop sequence file.
+ * it is to deal with sequence file possibly keep open connections and snap up
+ * ports
+ */
+@SuppressWarnings("deprecation")
+public class AzkabanSequenceFileReader {
+ private final static Logger LOG = Logger
+ .getLogger(HadoopSecurityManager.class);
+ private static final byte BLOCK_COMPRESS_VERSION = (byte) 4;
+ private static final byte CUSTOM_COMPRESS_VERSION = (byte) 5;
+ private static final byte VERSION_WITH_METADATA = (byte) 6;
+ private static final int SYNC_ESCAPE = -1; // "length" of sync entries
+ private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
+ private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash
+ /** The number of bytes between sync points. */
+ public static final int SYNC_INTERVAL = 100 * SYNC_SIZE;
+ private static final byte[] VERSION = new byte[]{(byte) 'S', (byte) 'E',
+ (byte) 'Q', VERSION_WITH_METADATA};
+
+ private AzkabanSequenceFileReader() {
+ } // no public ctor
+
+ /**
+ * The compression type used to compress key/value pairs in the
+ * {@link AzkabanSequenceFileReader}.
+ *
+ * @see AzkabanSequenceFileReader.Writer
+ */
+ public static enum CompressionType {
+ /** Do not compress records. */
+ NONE,
+ /** Compress values only, each separately. */
+ RECORD,
+ /** Compress sequences of records together in blocks. */
+ BLOCK
+ }
+
+ private static class UncompressedBytes implements ValueBytes {
+ private int dataSize;
+ private byte[] data;
+
+ private UncompressedBytes() {
+ this.data = null;
+ this.dataSize = 0;
+ }
+
+ private void reset(final DataInputStream in, final int length) throws IOException {
+ this.data = new byte[length];
+ this.dataSize = -1;
+
+ in.readFully(this.data);
+ this.dataSize = this.data.length;
+ }
+
+ @Override
+ public int getSize() {
+ return this.dataSize;
+ }
+
+ @Override
+ public void writeUncompressedBytes(final DataOutputStream outStream)
+ throws IOException {
+ outStream.write(this.data, 0, this.dataSize);
+ }
+
+ @Override
+ public void writeCompressedBytes(final DataOutputStream outStream)
+ throws IllegalArgumentException, IOException {
+ throw new IllegalArgumentException(
+ "UncompressedBytes cannot be compressed!");
+ }
+
+ } // UncompressedBytes
+
+ private static class CompressedBytes implements ValueBytes {
+ DataInputBuffer rawData = null;
+ CompressionCodec codec = null;
+ CompressionInputStream decompressedStream = null;
+ private int dataSize;
+ private byte[] data;
+
+ private CompressedBytes(final CompressionCodec codec) {
+ this.data = null;
+ this.dataSize = 0;
+ this.codec = codec;
+ }
+
+ private void reset(final DataInputStream in, final int length) throws IOException {
+ this.data = new byte[length];
+ this.dataSize = -1;
+
+ in.readFully(this.data);
+ this.dataSize = this.data.length;
+ }
+
+ @Override
+ public int getSize() {
+ return this.dataSize;
+ }
+
+ @Override
+ public void writeUncompressedBytes(final DataOutputStream outStream)
+ throws IOException {
+ if (this.decompressedStream == null) {
+ this.rawData = new DataInputBuffer();
+ this.decompressedStream = this.codec.createInputStream(this.rawData);
+ } else {
+ this.decompressedStream.resetState();
+ }
+ this.rawData.reset(this.data, 0, this.dataSize);
+
+ final byte[] buffer = new byte[8192];
+ int bytesRead = 0;
+ while ((bytesRead = this.decompressedStream.read(buffer, 0, 8192)) != -1) {
+ outStream.write(buffer, 0, bytesRead);
+ }
+ }
+
+ @Override
+ public void writeCompressedBytes(final DataOutputStream outStream)
+ throws IllegalArgumentException, IOException {
+ outStream.write(this.data, 0, this.dataSize);
+ }
+
+ } // CompressedBytes
+
+ /** Reads key/value pairs from a sequence-format file. */
+ public static class Reader implements Closeable {
+
+ private final Path file;
+ private final DataOutputBuffer outBuf = new DataOutputBuffer();
+ private final byte[] sync = new byte[SYNC_HASH_SIZE];
+ private final byte[] syncCheck = new byte[SYNC_HASH_SIZE];
+ private final long end;
+ private final Configuration conf;
+ private final boolean lazyDecompress = true;
+ private FSDataInputStream in;
+ private byte version;
+ private String keyClassName;
+ private String valClassName;
+ @SuppressWarnings("rawtypes")
+ private Class keyClass;
+ @SuppressWarnings("rawtypes")
+ private Class valClass;
+ private CompressionCodec codec = null;
+ private Metadata metadata = null;
+ private boolean syncSeen;
+ private int keyLength;
+ private int recordLength;
+ private boolean decompress;
+ private boolean blockCompressed;
+ private int noBufferedRecords = 0;
+ private boolean valuesDecompressed = true;
+
+ private int noBufferedKeys = 0;
+ private int noBufferedValues = 0;
+
+ private DataInputBuffer keyLenBuffer = null;
+ private CompressionInputStream keyLenInFilter = null;
+ private DataInputStream keyLenIn = null;
+ private Decompressor keyLenDecompressor = null;
+ private DataInputBuffer keyBuffer = null;
+ private CompressionInputStream keyInFilter = null;
+ private DataInputStream keyIn = null;
+ private Decompressor keyDecompressor = null;
+
+ private DataInputBuffer valLenBuffer = null;
+ private CompressionInputStream valLenInFilter = null;
+ private DataInputStream valLenIn = null;
+ private Decompressor valLenDecompressor = null;
+ private DataInputBuffer valBuffer = null;
+ private CompressionInputStream valInFilter = null;
+ private DataInputStream valIn = null;
+ private Decompressor valDecompressor = null;
+
+ @SuppressWarnings("rawtypes")
+ private Deserializer keyDeserializer;
+ @SuppressWarnings("rawtypes")
+ private Deserializer valDeserializer;
+
+ /** Open the named file. */
+ public Reader(final FileSystem fs, final Path file, final Configuration conf)
+ throws IOException {
+ this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false);
+ }
+
+ private Reader(final FileSystem fs, final Path file, final int bufferSize,
+ final Configuration conf, final boolean tempReader) throws IOException {
+ this(fs, file, bufferSize, 0, fs.getLength(file), conf, tempReader);
+ }
+
+ private Reader(final FileSystem fs, final Path file, final int bufferSize, final long start,
+ final long length, final Configuration conf, final boolean tempReader) throws IOException {
+ this.file = file;
+
+ try {
+ this.in = openFile(fs, file, bufferSize, length);
+ this.conf = conf;
+ seek(start);
+ this.end = this.in.getPos() + length;
+ init(tempReader);
+ } catch (final IOException e) {
+ if (this.in != null) {
+ this.in.close();
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Override this method to specialize the type of {@link FSDataInputStream}
+ * returned.
+ */
+ protected FSDataInputStream openFile(final FileSystem fs, final Path file,
+ final int bufferSize, final long length) throws IOException {
+ return fs.open(file, bufferSize);
+ }
+
+ /**
+ * Initialize the {@link Reader}
+ *
+ * @param tmpReader <code>true</code> if we are constructing a temporary
+ * reader
+ * {@link AzkabanSequenceFileReader.Sorter.cloneFileAttributes},
+ * and hence do not initialize every component; <code>false</code>
+ * otherwise.
+ * @throws IOException
+ */
+ private void init(final boolean tempReader) throws IOException {
+ final byte[] versionBlock = new byte[VERSION.length];
+ this.in.readFully(versionBlock);
+
+ if ((versionBlock[0] != VERSION[0]) || (versionBlock[1] != VERSION[1])
+ || (versionBlock[2] != VERSION[2])) {
+ throw new IOException(this.file + " not a SequenceFile");
+ }
+
+ // Set 'version'
+ this.version = versionBlock[3];
+ if (this.version > VERSION[3]) {
+ throw new VersionMismatchException(VERSION[3], this.version);
+ }
+
+ if (this.version < BLOCK_COMPRESS_VERSION) {
+ final UTF8 className = new UTF8();
+
+ className.readFields(this.in);
+ this.keyClassName = className.toString(); // key class name
+
+ className.readFields(this.in);
+ this.valClassName = className.toString(); // val class name
+ } else {
+ this.keyClassName = Text.readString(this.in);
+ this.valClassName = Text.readString(this.in);
+ }
+
+ if (this.version > 2) { // if version > 2
+ this.decompress = this.in.readBoolean(); // is compressed?
+ } else {
+ this.decompress = false;
+ }
+
+ if (this.version >= BLOCK_COMPRESS_VERSION) { // if version >= 4
+ this.blockCompressed = this.in.readBoolean(); // is block-compressed?
+ } else {
+ this.blockCompressed = false;
+ }
+
+ // if version >= 5
+ // setup the compression codec
+ if (this.decompress) {
+ if (this.version >= CUSTOM_COMPRESS_VERSION) {
+ final String codecClassname = Text.readString(this.in);
+ try {
+ final Class<? extends CompressionCodec> codecClass =
+ this.conf.getClassByName(codecClassname).asSubclass(
+ CompressionCodec.class);
+ this.codec = ReflectionUtils.newInstance(codecClass, this.conf);
+ } catch (final ClassNotFoundException cnfe) {
+ throw new IllegalArgumentException("Unknown codec: "
+ + codecClassname, cnfe);
+ }
+ } else {
+ this.codec = new DefaultCodec();
+ ((Configurable) this.codec).setConf(this.conf);
+ }
+ }
+
+ this.metadata = new Metadata();
+ if (this.version >= VERSION_WITH_METADATA) { // if version >= 6
+ this.metadata.readFields(this.in);
+ }
+
+ if (this.version > 1) { // if version > 1
+ this.in.readFully(this.sync); // read sync bytes
+ }
+
+ // Initialize... *not* if this we are constructing a temporary Reader
+ if (!tempReader) {
+ this.valBuffer = new DataInputBuffer();
+ if (this.decompress) {
+ this.valDecompressor = CodecPool.getDecompressor(this.codec);
+ this.valInFilter = this.codec.createInputStream(this.valBuffer, this.valDecompressor);
+ this.valIn = new DataInputStream(this.valInFilter);
+ } else {
+ this.valIn = this.valBuffer;
+ }
+
+ if (this.blockCompressed) {
+ this.keyLenBuffer = new DataInputBuffer();
+ this.keyBuffer = new DataInputBuffer();
+ this.valLenBuffer = new DataInputBuffer();
+
+ this.keyLenDecompressor = CodecPool.getDecompressor(this.codec);
+ this.keyLenInFilter =
+ this.codec.createInputStream(this.keyLenBuffer, this.keyLenDecompressor);
+ this.keyLenIn = new DataInputStream(this.keyLenInFilter);
+
+ this.keyDecompressor = CodecPool.getDecompressor(this.codec);
+ this.keyInFilter = this.codec.createInputStream(this.keyBuffer, this.keyDecompressor);
+ this.keyIn = new DataInputStream(this.keyInFilter);
+
+ this.valLenDecompressor = CodecPool.getDecompressor(this.codec);
+ this.valLenInFilter =
+ this.codec.createInputStream(this.valLenBuffer, this.valLenDecompressor);
+ this.valLenIn = new DataInputStream(this.valLenInFilter);
+ }
+
+ final SerializationFactory serializationFactory =
+ new SerializationFactory(this.conf);
+ this.keyDeserializer =
+ getDeserializer(serializationFactory, getKeyClass());
+ if (!this.blockCompressed) {
+ this.keyDeserializer.open(this.valBuffer);
+ } else {
+ this.keyDeserializer.open(this.keyIn);
+ }
+ this.valDeserializer =
+ getDeserializer(serializationFactory, getValueClass());
+ this.valDeserializer.open(this.valIn);
+ }
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private Deserializer getDeserializer(final SerializationFactory sf, final Class c) {
+ return sf.getDeserializer(c);
+ }
+
+ /** Close the file. */
+ @Override
+ public synchronized void close() throws IOException {
+ // Return the decompressors to the pool
+ CodecPool.returnDecompressor(this.keyLenDecompressor);
+ CodecPool.returnDecompressor(this.keyDecompressor);
+ CodecPool.returnDecompressor(this.valLenDecompressor);
+ CodecPool.returnDecompressor(this.valDecompressor);
+ this.keyLenDecompressor = this.keyDecompressor = null;
+ this.valLenDecompressor = this.valDecompressor = null;
+
+ if (this.keyDeserializer != null) {
+ this.keyDeserializer.close();
+ }
+ if (this.valDeserializer != null) {
+ this.valDeserializer.close();
+ }
+
+ // Close the input-stream
+ if (this.in != null) {
+ this.in.close();
+ }
+ }
+
+ /** Returns the name of the key class. */
+ public String getKeyClassName() {
+ return this.keyClassName;
+ }
+
+ /** Returns the class of keys in this file. */
+ public synchronized Class<?> getKeyClass() {
+ if (null == this.keyClass) {
+ try {
+ this.keyClass = WritableName.getClass(getKeyClassName(), this.conf);
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return this.keyClass;
+ }
+
+ /** Returns the name of the value class. */
+ public String getValueClassName() {
+ return this.valClassName;
+ }
+
+ /** Returns the class of values in this file. */
+ public synchronized Class<?> getValueClass() {
+ if (null == this.valClass) {
+ try {
+ this.valClass = WritableName.getClass(getValueClassName(), this.conf);
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return this.valClass;
+ }
+
+ /** Returns true if values are compressed. */
+ public boolean isCompressed() {
+ return this.decompress;
+ }
+
+ /** Returns true if records are block-compressed. */
+ public boolean isBlockCompressed() {
+ return this.blockCompressed;
+ }
+
+ /** Returns the compression codec of data in this file. */
+ public CompressionCodec getCompressionCodec() {
+ return this.codec;
+ }
+
+ /** Returns the metadata object of the file */
+ public Metadata getMetadata() {
+ return this.metadata;
+ }
+
+ /** Returns the configuration used for this file. */
+ Configuration getConf() {
+ return this.conf;
+ }
+
+ /** Read a compressed buffer */
+ private synchronized void readBuffer(final DataInputBuffer buffer,
+ final CompressionInputStream filter) throws IOException {
+ // Read data into a temporary buffer
+ final DataOutputBuffer dataBuffer = new DataOutputBuffer();
+
+ try {
+ final int dataBufferLength = WritableUtils.readVInt(this.in);
+ dataBuffer.write(this.in, dataBufferLength);
+
+ // Set up 'buffer' connected to the input-stream
+ buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
+ } finally {
+ dataBuffer.close();
+ }
+
+ // Reset the codec
+ filter.resetState();
+ }
+
+ /** Read the next 'compressed' block */
+ private synchronized void readBlock() throws IOException {
+ // Check if we need to throw away a whole block of
+ // 'values' due to 'lazy decompression'
+ if (this.lazyDecompress && !this.valuesDecompressed) {
+ this.in.seek(WritableUtils.readVInt(this.in) + this.in.getPos());
+ this.in.seek(WritableUtils.readVInt(this.in) + this.in.getPos());
+ }
+
+ // Reset internal states
+ this.noBufferedKeys = 0;
+ this.noBufferedValues = 0;
+ this.noBufferedRecords = 0;
+ this.valuesDecompressed = false;
+
+ // Process sync
+ if (this.sync != null) {
+ this.in.readInt();
+ this.in.readFully(this.syncCheck); // read syncCheck
+ if (!Arrays.equals(this.sync, this.syncCheck)) // check it
+ throw new IOException("File is corrupt!");
+ }
+ this.syncSeen = true;
+
+ // Read number of records in this block
+ this.noBufferedRecords = WritableUtils.readVInt(this.in);
+
+ // Read key lengths and keys
+ readBuffer(this.keyLenBuffer, this.keyLenInFilter);
+ readBuffer(this.keyBuffer, this.keyInFilter);
+ this.noBufferedKeys = this.noBufferedRecords;
+
+ // Read value lengths and values
+ if (!this.lazyDecompress) {
+ readBuffer(this.valLenBuffer, this.valLenInFilter);
+ readBuffer(this.valBuffer, this.valInFilter);
+ this.noBufferedValues = this.noBufferedRecords;
+ this.valuesDecompressed = true;
+ }
+ }
+
+ /**
+ * Position valLenIn/valIn to the 'value' corresponding to the 'current' key
+ */
+ private synchronized void seekToCurrentValue() throws IOException {
+ if (!this.blockCompressed) {
+ if (this.decompress) {
+ this.valInFilter.resetState();
+ }
+ this.valBuffer.reset();
+ } else {
+ // Check if this is the first value in the 'block' to be read
+ if (this.lazyDecompress && !this.valuesDecompressed) {
+ // Read the value lengths and values
+ readBuffer(this.valLenBuffer, this.valLenInFilter);
+ readBuffer(this.valBuffer, this.valInFilter);
+ this.noBufferedValues = this.noBufferedRecords;
+ this.valuesDecompressed = true;
+ }
+
+ // Calculate the no. of bytes to skip
+ // Note: 'current' key has already been read!
+ int skipValBytes = 0;
+ final int currentKey = this.noBufferedKeys + 1;
+ for (int i = this.noBufferedValues; i > currentKey; --i) {
+ skipValBytes += WritableUtils.readVInt(this.valLenIn);
+ --this.noBufferedValues;
+ }
+
+ // Skip to the 'val' corresponding to 'current' key
+ if (skipValBytes > 0) {
+ if (this.valIn.skipBytes(skipValBytes) != skipValBytes) {
+ throw new IOException("Failed to seek to " + currentKey
+ + "(th) value!");
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the 'value' corresponding to the last read 'key'.
+ *
+ * @param val : The 'value' to be read.
+ * @throws IOException
+ */
+ public synchronized void getCurrentValue(final Writable val) throws IOException {
+ if (val instanceof Configurable) {
+ ((Configurable) val).setConf(this.conf);
+ }
+
+ // Position stream to 'current' value
+ seekToCurrentValue();
+
+ if (!this.blockCompressed) {
+ val.readFields(this.valIn);
+
+ if (this.valIn.read() > 0) {
+ LOG.info("available bytes: " + this.valIn.available());
+ throw new IOException(val + " read "
+ + (this.valBuffer.getPosition() - this.keyLength) + " bytes, should read "
+ + (this.valBuffer.getLength() - this.keyLength));
+ }
+ } else {
+ // Get the value
+ final int valLength = WritableUtils.readVInt(this.valLenIn);
+ val.readFields(this.valIn);
+
+ // Read another compressed 'value'
+ --this.noBufferedValues;
+
+ // Sanity check
+ if (valLength < 0) {
+ LOG.debug(val + " is a zero-length value");
+ }
+ }
+
+ }
+
+ /**
+ * Get the 'value' corresponding to the last read 'key'.
+ *
+ * @param val : The 'value' to be read.
+ * @throws IOException
+ */
+ public synchronized Object getCurrentValue(Object val) throws IOException {
+ if (val instanceof Configurable) {
+ ((Configurable) val).setConf(this.conf);
+ }
+
+ // Position stream to 'current' value
+ seekToCurrentValue();
+
+ if (!this.blockCompressed) {
+ val = deserializeValue(val);
+
+ if (this.valIn.read() > 0) {
+ LOG.info("available bytes: " + this.valIn.available());
+ throw new IOException(val + " read "
+ + (this.valBuffer.getPosition() - this.keyLength) + " bytes, should read "
+ + (this.valBuffer.getLength() - this.keyLength));
+ }
+ } else {
+ // Get the value
+ final int valLength = WritableUtils.readVInt(this.valLenIn);
+ val = deserializeValue(val);
+
+ // Read another compressed 'value'
+ --this.noBufferedValues;
+
+ // Sanity check
+ if (valLength < 0) {
+ LOG.debug(val + " is a zero-length value");
+ }
+ }
+ return val;
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private Object deserializeValue(final Object val) throws IOException {
+ return this.valDeserializer.deserialize(val);
+ }
+
+ /**
+ * Read the next key in the file into <code>key</code>, skipping its value.
+ * True if another entry exists, and false at end of file.
+ */
+ public synchronized boolean next(final Writable key) throws IOException {
+ if (key.getClass() != getKeyClass())
+ throw new IOException("wrong key class: " + key.getClass().getName()
+ + " is not " + this.keyClass);
+
+ if (!this.blockCompressed) {
+ this.outBuf.reset();
+
+ this.keyLength = next(this.outBuf);
+ if (this.keyLength < 0)
+ return false;
+
+ this.valBuffer.reset(this.outBuf.getData(), this.outBuf.getLength());
+
+ key.readFields(this.valBuffer);
+ this.valBuffer.mark(0);
+ if (this.valBuffer.getPosition() != this.keyLength) {
+ throw new IOException(key + " read " + this.valBuffer.getPosition()
+ + " bytes, should read " + this.keyLength);
+ }
+ } else {
+ // Reset syncSeen
+ this.syncSeen = false;
+
+ if (this.noBufferedKeys == 0) {
+ try {
+ readBlock();
+ } catch (final EOFException eof) {
+ return false;
+ }
+ }
+
+ final int keyLength = WritableUtils.readVInt(this.keyLenIn);
+
+ // Sanity check
+ if (keyLength < 0) {
+ return false;
+ }
+
+ // Read another compressed 'key'
+ key.readFields(this.keyIn);
+ --this.noBufferedKeys;
+ }
+
+ return true;
+ }
+
+ /**
+ * Read the next key/value pair in the file into <code>key</code> and
+ * <code>val</code>. Returns true if such a pair exists and false when at
+ * end of file
+ */
+ public synchronized boolean next(final Writable key, final Writable val)
+ throws IOException {
+ if (val.getClass() != getValueClass())
+ throw new IOException("wrong value class: " + val + " is not "
+ + this.valClass);
+
+ final boolean more = next(key);
+
+ if (more) {
+ getCurrentValue(val);
+ }
+
+ return more;
+ }
+
+ /**
+ * Read and return the next record length, potentially skipping over a sync
+ * block.
+ *
+ * @return the length of the next record or -1 if there is no next record
+ * @throws IOException
+ */
+ private synchronized int readRecordLength() throws IOException {
+ if (this.in.getPos() >= this.end) {
+ return -1;
+ }
+ int length = this.in.readInt();
+ if (this.version > 1 && this.sync != null && length == SYNC_ESCAPE) { // process a
+ // sync entry
+ this.in.readFully(this.syncCheck); // read syncCheck
+ if (!Arrays.equals(this.sync, this.syncCheck)) // check it
+ throw new IOException("File is corrupt!");
+ this.syncSeen = true;
+ if (this.in.getPos() >= this.end) {
+ return -1;
+ }
+ length = this.in.readInt(); // re-read length
+ } else {
+ this.syncSeen = false;
+ }
+
+ return length;
+ }
+
+ /**
+ * Read the next key/value pair in the file into <code>buffer</code>.
+ * Returns the length of the key read, or -1 if at end of file. The length
+ * of the value may be computed by calling buffer.getLength() before and
+ * after calls to this method.
+ */
+ /**
+ * @deprecated Call
+ * {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}.
+ */
+ public synchronized int next(final DataOutputBuffer buffer) throws IOException {
+ // Unsupported for block-compressed sequence files
+ if (this.blockCompressed) {
+ throw new IOException(
+ "Unsupported call for block-compressed"
+ + " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
+ }
+ try {
+ final int length = readRecordLength();
+ if (length == -1) {
+ return -1;
+ }
+ final int keyLength = this.in.readInt();
+ buffer.write(this.in, length);
+ return keyLength;
+ } catch (final ChecksumException e) { // checksum failure
+ handleChecksumException(e);
+ return next(buffer);
+ }
+ }
+
+ public ValueBytes createValueBytes() {
+ ValueBytes val = null;
+ if (!this.decompress || this.blockCompressed) {
+ val = new UncompressedBytes();
+ } else {
+ val = new CompressedBytes(this.codec);
+ }
+ return val;
+ }
+
+ /**
+ * Read 'raw' records.
+ *
+ * @param key - The buffer into which the key is read
+ * @param val - The 'raw' value
+ * @return Returns the total record length or -1 for end of file
+ * @throws IOException
+ */
+ public synchronized int nextRaw(final DataOutputBuffer key, final ValueBytes val)
+ throws IOException {
+ if (!this.blockCompressed) {
+ final int length = readRecordLength();
+ if (length == -1) {
+ return -1;
+ }
+ final int keyLength = this.in.readInt();
+ final int valLength = length - keyLength;
+ key.write(this.in, keyLength);
+ if (this.decompress) {
+ final CompressedBytes value = (CompressedBytes) val;
+ value.reset(this.in, valLength);
+ } else {
+ final UncompressedBytes value = (UncompressedBytes) val;
+ value.reset(this.in, valLength);
+ }
+
+ return length;
+ } else {
+ // Reset syncSeen
+ this.syncSeen = false;
+
+ // Read 'key'
+ if (this.noBufferedKeys == 0) {
+ if (this.in.getPos() >= this.end)
+ return -1;
+
+ try {
+ readBlock();
+ } catch (final EOFException eof) {
+ return -1;
+ }
+ }
+ final int keyLength = WritableUtils.readVInt(this.keyLenIn);
+ if (keyLength < 0) {
+ throw new IOException("zero length key found!");
+ }
+ key.write(this.keyIn, keyLength);
+ --this.noBufferedKeys;
+
+ // Read raw 'value'
+ seekToCurrentValue();
+ final int valLength = WritableUtils.readVInt(this.valLenIn);
+ final UncompressedBytes rawValue = (UncompressedBytes) val;
+ rawValue.reset(this.valIn, valLength);
+ --this.noBufferedValues;
+
+ return (keyLength + valLength);
+ }
+
+ }
+
+ /**
+ * Read 'raw' keys.
+ *
+ * @param key - The buffer into which the key is read
+ * @return Returns the key length or -1 for end of file
+ * @throws IOException
+ */
+ public int nextRawKey(final DataOutputBuffer key) throws IOException {
+ if (!this.blockCompressed) {
+ this.recordLength = readRecordLength();
+ if (this.recordLength == -1) {
+ return -1;
+ }
+ this.keyLength = this.in.readInt();
+ key.write(this.in, this.keyLength);
+ return this.keyLength;
+ } else {
+ // Reset syncSeen
+ this.syncSeen = false;
+
+ // Read 'key'
+ if (this.noBufferedKeys == 0) {
+ if (this.in.getPos() >= this.end)
+ return -1;
+
+ try {
+ readBlock();
+ } catch (final EOFException eof) {
+ return -1;
+ }
+ }
+ final int keyLength = WritableUtils.readVInt(this.keyLenIn);
+ if (keyLength < 0) {
+ throw new IOException("zero length key found!");
+ }
+ key.write(this.keyIn, keyLength);
+ --this.noBufferedKeys;
+
+ return keyLength;
+ }
+
+ }
+
+ /**
+ * Read the next key in the file, skipping its value. Return null at end of
+ * file.
+ */
+ public synchronized Object next(Object key) throws IOException {
+ if (key != null && key.getClass() != getKeyClass()) {
+ throw new IOException("wrong key class: " + key.getClass().getName()
+ + " is not " + this.keyClass);
+ }
+
+ if (!this.blockCompressed) {
+ this.outBuf.reset();
+
+ this.keyLength = next(this.outBuf);
+ if (this.keyLength < 0)
+ return null;
+
+ this.valBuffer.reset(this.outBuf.getData(), this.outBuf.getLength());
+
+ key = deserializeKey(key);
+ this.valBuffer.mark(0);
+ if (this.valBuffer.getPosition() != this.keyLength) {
+ throw new IOException(key + " read " + this.valBuffer.getPosition()
+ + " bytes, should read " + this.keyLength);
+ }
+ } else {
+ // Reset syncSeen
+ this.syncSeen = false;
+
+ if (this.noBufferedKeys == 0) {
+ try {
+ readBlock();
+ } catch (final EOFException eof) {
+ return null;
+ }
+ }
+
+ final int keyLength = WritableUtils.readVInt(this.keyLenIn);
+
+ // Sanity check
+ if (keyLength < 0) {
+ return null;
+ }
+
+ // Read another compressed 'key'
+ key = deserializeKey(key);
+ --this.noBufferedKeys;
+ }
+
+ return key;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Object deserializeKey(final Object key) throws IOException {
+ return this.keyDeserializer.deserialize(key);
+ }
+
+ /**
+ * Read 'raw' values.
+ *
+ * @param val - The 'raw' value
+ * @return Returns the value length
+ * @throws IOException
+ */
+ public synchronized int nextRawValue(final ValueBytes val) throws IOException {
+
+ // Position stream to current value
+ seekToCurrentValue();
+
+ if (!this.blockCompressed) {
+ final int valLength = this.recordLength - this.keyLength;
+ if (this.decompress) {
+ final CompressedBytes value = (CompressedBytes) val;
+ value.reset(this.in, valLength);
+ } else {
+ final UncompressedBytes value = (UncompressedBytes) val;
+ value.reset(this.in, valLength);
+ }
+
+ return valLength;
+ } else {
+ final int valLength = WritableUtils.readVInt(this.valLenIn);
+ final UncompressedBytes rawValue = (UncompressedBytes) val;
+ rawValue.reset(this.valIn, valLength);
+ --this.noBufferedValues;
+ return valLength;
+ }
+
+ }
+
+ private void handleChecksumException(final ChecksumException e)
+ throws IOException {
+ if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
+ LOG.warn("Bad checksum at " + getPosition() + ". Skipping entries.");
+ sync(getPosition() + this.conf.getInt("io.bytes.per.checksum", 512));
+ } else {
+ throw e;
+ }
+ }
+
+ /**
+ * Set the current byte position in the input file.
+ *
+ * <p>
+ * The position passed must be a position returned by
+ * {@link AzkabanSequenceFileReader.Writer#getLength()} when writing this
+ * file. To seek to an arbitrary position, use
+ * {@link Reader#sync(long)}.
+ */
+ public synchronized void seek(final long position) throws IOException {
+ this.in.seek(position);
+ if (this.blockCompressed) { // trigger block read
+ this.noBufferedKeys = 0;
+ this.valuesDecompressed = true;
+ }
+ }
+
+ /** Seek to the next sync mark past a given position. */
+ public synchronized void sync(final long position) throws IOException {
+ if (position + SYNC_SIZE >= this.end) {
+ seek(this.end);
+ return;
+ }
+
+ try {
+ seek(position + 4); // skip escape
+ this.in.readFully(this.syncCheck);
+ final int syncLen = this.sync.length;
+ for (int i = 0; this.in.getPos() < this.end; i++) {
+ int j = 0;
+ for (; j < syncLen; j++) {
+ if (this.sync[j] != this.syncCheck[(i + j) % syncLen])
+ break;
+ }
+ if (j == syncLen) {
+ this.in.seek(this.in.getPos() - SYNC_SIZE); // position before sync
+ return;
+ }
+ this.syncCheck[i % syncLen] = this.in.readByte();
+ }
+ } catch (final ChecksumException e) { // checksum failure
+ handleChecksumException(e);
+ }
+ }
+
+ /** Returns true iff the previous call to next passed a sync mark. */
+ public boolean syncSeen() {
+ return this.syncSeen;
+ }
+
+ /** Return the current byte position in the input file. */
+ public synchronized long getPosition() throws IOException {
+ return this.in.getPos();
+ }
+
+ @Override
+ /** Returns the name of the file. */
+ public String toString() {
+ return this.file.toString();
+ }
+
+ }
+
+} // SequenceFile
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/BsonFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/BsonFileViewer.java
new file mode 100644
index 0000000..e5f2aff
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/BsonFileViewer.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.bson.BSONObject;
+import org.bson.BasicBSONCallback;
+import org.bson.BasicBSONDecoder;
+
+import com.mongodb.util.JSON;
+
+/**
+ * File viewer for Mongo bson files.
+ *
+ * @author adilaijaz
+ */
+public final class BsonFileViewer extends HdfsFileViewer {
+
+ /**
+ * The maximum time spent, in milliseconds, while reading records from the
+ * file.
+ */
+ private static long STOP_TIME = 2000l;
+
+ private static final String VIEWER_NAME = "BSON";
+
+ @Override
+ public String getName() {
+ return VIEWER_NAME;
+ }
+
+ @Override
+ public Set<Capability> getCapabilities(FileSystem fs, Path path)
+ throws AccessControlException {
+ if (path.getName().endsWith(".bson")) {
+ return EnumSet.of(Capability.READ);
+ }
+ return EnumSet.noneOf(Capability.class);
+ }
+
+ @Override
+ public void displayFile(FileSystem fs, Path path, OutputStream outStream,
+ int startLine, int endLine) throws IOException {
+
+ FSDataInputStream in = null;
+ try {
+ in = fs.open(path, 16 * 1024 * 1024);
+
+ long endTime = System.currentTimeMillis() + STOP_TIME;
+
+ BasicBSONCallback callback = new BasicBSONCallback();
+ BasicBSONDecoder decoder = new BasicBSONDecoder();
+
+ /*
+ * keep reading and rendering bsonObjects until one of these conditions is
+ * met:
+ *
+ * a. we have rendered all bsonObjects desired. b. we have run out of
+ * time.
+ */
+ for (int lineno = 1; lineno <= endLine
+ && System.currentTimeMillis() <= endTime; lineno++) {
+ if (lineno < startLine) {
+ continue;
+ }
+
+ callback.reset();
+ decoder.decode(in, callback);
+
+ BSONObject value = (BSONObject) callback.get();
+
+ StringBuilder bldr = new StringBuilder();
+ bldr.append("\n\n Record ");
+ bldr.append(lineno);
+ bldr.append('\n');
+ JSON.serialize(value, bldr);
+ outStream.write(bldr.toString().getBytes("UTF-8"));
+ }
+ } catch (IOException e) {
+ outStream
+ .write(("Error in display avro file: " + e.getLocalizedMessage())
+ .getBytes("UTF-8"));
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ outStream.flush();
+ }
+ }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/Capability.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/Capability.java
new file mode 100644
index 0000000..aa74c81
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/Capability.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+public enum Capability {
+ READ,
+ SCHEMA
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ContentType.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ContentType.java
new file mode 100644
index 0000000..a02b030
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ContentType.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+public enum ContentType {
+ TEXT,
+ HTML
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HdfsBrowserServlet.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HdfsBrowserServlet.java
new file mode 100644
index 0000000..d85cdb7
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HdfsBrowserServlet.java
@@ -0,0 +1,521 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.log4j.Logger;
+
+import azkaban.security.commons.HadoopSecurityManager;
+import azkaban.security.commons.HadoopSecurityManagerException;
+import azkaban.user.User;
+import azkaban.utils.Props;
+import azkaban.server.session.Session;
+import azkaban.webapp.servlet.LoginAbstractAzkabanServlet;
+import azkaban.webapp.servlet.Page;
+
+public class HdfsBrowserServlet extends LoginAbstractAzkabanServlet {
+ private static final long serialVersionUID = 1L;
+ private static final String PROXY_USER_SESSION_KEY =
+ "hdfs.browser.proxy.user";
+ private static final String HADOOP_SECURITY_MANAGER_CLASS_PARAM =
+ "hadoop.security.manager.class";
+
+ private static final int DEFAULT_FILE_MAX_LINES = 1000;
+
+ private int fileMaxLines;
+ private int defaultStartLine;
+ private int defaultEndLine;
+
+ private static Logger logger = Logger.getLogger(HdfsBrowserServlet.class);
+
+ private ArrayList<HdfsFileViewer> viewers = new ArrayList<HdfsFileViewer>();
+
+ private HdfsFileViewer defaultViewer;
+
+ private Props props;
+ private boolean shouldProxy;
+ private boolean allowGroupProxy;
+
+ private String viewerName;
+ private String viewerPath;
+
+ private HadoopSecurityManager hadoopSecurityManager;
+
+ public HdfsBrowserServlet(Props props) {
+ this.props = props;
+ viewerName = props.getString("viewer.name");
+ viewerPath = props.getString("viewer.path");
+ fileMaxLines = props.getInt("file.max.lines", DEFAULT_FILE_MAX_LINES);
+ defaultStartLine = 1;
+ defaultEndLine = fileMaxLines;
+ }
+
+ @Override
+ public void init(ServletConfig config) throws ServletException {
+ super.init(config);
+
+ shouldProxy = props.getBoolean("azkaban.should.proxy", false);
+ allowGroupProxy = props.getBoolean("allow.group.proxy", false);
+ logger.info("Hdfs browser should proxy: " + shouldProxy);
+
+ props.put("fs.hdfs.impl.disable.cache", "true");
+
+ try {
+ hadoopSecurityManager = loadHadoopSecurityManager(props, logger);
+ } catch (RuntimeException e) {
+ e.printStackTrace();
+ throw new RuntimeException("Failed to get hadoop security manager!"
+ + e.getCause());
+ }
+
+ defaultViewer = new TextFileViewer();
+
+ viewers.add(new HtmlFileViewer());
+ viewers.add(new ORCFileViewer());
+ viewers.add(new AvroFileViewer());
+ viewers.add(new ParquetFileViewer());
+// viewers.add(new JsonSequenceFileViewer());
+ viewers.add(new ImageFileViewer());
+ viewers.add(new BsonFileViewer());
+
+ viewers.add(defaultViewer);
+
+ logger.info("HDFS Browser initiated");
+ }
+
+ private HadoopSecurityManager loadHadoopSecurityManager(Props props,
+ Logger logger) throws RuntimeException {
+
+ Class<?> hadoopSecurityManagerClass =
+ props.getClass(HADOOP_SECURITY_MANAGER_CLASS_PARAM, true,
+ HdfsBrowserServlet.class.getClassLoader());
+ logger.info("Initializing hadoop security manager "
+ + hadoopSecurityManagerClass.getName());
+ HadoopSecurityManager hadoopSecurityManager = null;
+
+ try {
+ Method getInstanceMethod =
+ hadoopSecurityManagerClass.getMethod("getInstance", Props.class);
+ hadoopSecurityManager =
+ (HadoopSecurityManager) getInstanceMethod.invoke(
+ hadoopSecurityManagerClass, props);
+ } catch (InvocationTargetException e) {
+ logger.error("Could not instantiate Hadoop Security Manager "
+ + hadoopSecurityManagerClass.getName() + e.getCause());
+ throw new RuntimeException(e.getCause());
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e.getCause());
+ }
+
+ return hadoopSecurityManager;
+ }
+
+ private FileSystem getFileSystem(String username)
+ throws HadoopSecurityManagerException {
+ return hadoopSecurityManager.getFSAsUser(username);
+ }
+
+ private void errorPage(String user, HttpServletRequest req,
+ HttpServletResponse resp, Session session, String error) {
+ Page page =
+ newPage(req, resp, session,
+ "azkaban/viewer/hdfs/velocity/hdfs-browser.vm");
+ page.add("error_message", "Error: " + error);
+ page.add("user", user);
+ page.add("allowproxy", allowGroupProxy);
+ page.add("no_fs", "true");
+ page.add("viewerName", viewerName);
+ page.render();
+ }
+
+ private void errorAjax(HttpServletResponse resp, Map<String, Object> ret,
+ String error) throws IOException {
+ ret.put("error", error);
+ this.writeJSON(resp, ret);
+ }
+
+ private String getUsername(HttpServletRequest req, Session session)
+ throws ServletException {
+ User user = session.getUser();
+ String username = user.getUserId();
+ if (hasParam(req, "action") && getParam(req, "action").equals("goHomeDir")) {
+ username = getParam(req, "proxyname");
+ } else if (allowGroupProxy) {
+ String proxyName =
+ (String) session.getSessionData(PROXY_USER_SESSION_KEY);
+ if (proxyName != null) {
+ username = proxyName;
+ }
+ }
+ return username;
+ }
+
+ @Override
+ protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
+ Session session) throws ServletException, IOException {
+ String username = getUsername(req, session);
+ boolean ajax = hasParam(req, "ajax");
+ try {
+ if (ajax) {
+ handleAjaxAction(username, req, resp, session);
+ } else {
+ handleFsDisplay(username, req, resp, session);
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Error processing request: "
+ + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
+ Session session) throws ServletException, IOException {
+ User user = session.getUser();
+ if (!hasParam(req, "action")) {
+ return;
+ }
+
+ HashMap<String, String> results = new HashMap<String, String>();
+ String action = getParam(req, "action");
+ if (action.equals("changeProxyUser")) {
+ if (hasParam(req, "proxyname")) {
+ String newProxyname = getParam(req, "proxyname");
+ if (user.getUserId().equals(newProxyname)
+ || user.isInGroup(newProxyname)
+ || user.getRoles().contains("admin")) {
+ session.setSessionData(PROXY_USER_SESSION_KEY, newProxyname);
+ } else {
+ results.put("error", "User '" + user.getUserId()
+ + "' cannot proxy as '" + newProxyname + "'");
+ }
+ }
+ } else {
+ results.put("error", "action param is not set");
+ }
+
+ this.writeJSON(resp, results);
+ }
+
+ private Path getPath(HttpServletRequest req) {
+ String prefix = req.getContextPath() + req.getServletPath();
+ String fsPath = req.getRequestURI().substring(prefix.length());
+ if (fsPath.length() == 0) {
+ fsPath = "/";
+ }
+ return new Path(fsPath);
+ }
+
+ private void getPathSegments(Path path, List<Path> paths,
+ List<String> segments) {
+ Path curr = path;
+ while (curr.getParent() != null) {
+ paths.add(curr);
+ segments.add(curr.getName());
+ curr = curr.getParent();
+ }
+ Collections.reverse(paths);
+ Collections.reverse(segments);
+ }
+
+ private String getHomeDir(FileSystem fs) {
+ String homeDirString = fs.getHomeDirectory().toString();
+ if (homeDirString.startsWith("file:")) {
+ return homeDirString.substring("file:".length());
+ }
+ return homeDirString.substring(fs.getUri().toString().length());
+ }
+
+ private void handleFsDisplay(String user, HttpServletRequest req,
+ HttpServletResponse resp, Session session) throws IOException,
+ ServletException, IllegalArgumentException, IllegalStateException {
+ FileSystem fs = null;
+ try {
+ fs = getFileSystem(user);
+ } catch (HadoopSecurityManagerException e) {
+ errorPage(user, req, resp, session, "Cannot get FileSystem.");
+ return;
+ }
+
+ Path path = getPath(req);
+ if (logger.isDebugEnabled()) {
+ logger.debug("path: '" + path.toString() + "'");
+ }
+
+ try {
+ if (!fs.exists(path)) {
+ errorPage(user, req, resp, session, path.toUri().getPath()
+ + " does not exist.");
+ fs.close();
+ return;
+ }
+ } catch (IOException ioe) {
+ logger.error("Got exception while checking for existence of path '"
+ + path + "'", ioe);
+ errorPage(user, req, resp, session, path.toUri().getPath()
+ + " Encountered error while trying to detect if path '" + path
+ + "' exists. Reason: " + ioe.getMessage());
+ fs.close();
+ return;
+ }
+
+ if (fs.isFile(path)) {
+ displayFilePage(fs, user, req, resp, session, path);
+ } else if (fs.getFileStatus(path).isDir()) {
+ displayDirPage(fs, user, req, resp, session, path);
+ } else {
+ errorPage(user, req, resp, session,
+ "It exists, it is not a file, and it is not a directory, what "
+ + "is it precious?");
+ }
+ fs.close();
+ }
+
+ private void displayDirPage(FileSystem fs, String user,
+ HttpServletRequest req, HttpServletResponse resp, Session session,
+ Path path) throws IOException {
+
+ Page page =
+ newPage(req, resp, session,
+ "azkaban/viewer/hdfs/velocity/hdfs-browser.vm");
+ page.add("allowproxy", allowGroupProxy);
+ page.add("viewerPath", viewerPath);
+ page.add("viewerName", viewerName);
+
+ List<Path> paths = new ArrayList<Path>();
+ List<String> segments = new ArrayList<String>();
+ getPathSegments(path, paths, segments);
+ page.add("paths", paths);
+ page.add("segments", segments);
+ page.add("user", user);
+ page.add("homedir", getHomeDir(fs));
+
+ try {
+ FileStatus[] subdirs = fs.listStatus(path);
+ page.add("subdirs", subdirs);
+ long size = 0;
+ for (int i = 0; i < subdirs.length; ++i) {
+ if (subdirs[i].isDir()) {
+ continue;
+ }
+ size += subdirs[i].getLen();
+ }
+ page.add("dirsize", size);
+ } catch (AccessControlException e) {
+ page.add("error_message", "Permission denied: " + e.getMessage());
+ page.add("no_fs", "true");
+ } catch (IOException e) {
+ page.add("error_message", "Error: " + e.getMessage());
+ }
+ page.render();
+ }
+
+ private void displayFilePage(FileSystem fs, String user,
+ HttpServletRequest req, HttpServletResponse resp, Session session,
+ Path path) {
+
+ Page page =
+ newPage(req, resp, session, "azkaban/viewer/hdfs/velocity/hdfs-file.vm");
+
+ List<Path> paths = new ArrayList<Path>();
+ List<String> segments = new ArrayList<String>();
+ getPathSegments(path, paths, segments);
+
+ page.add("allowproxy", allowGroupProxy);
+ page.add("viewerPath", viewerPath);
+ page.add("viewerName", viewerName);
+
+ page.add("paths", paths);
+ page.add("segments", segments);
+ page.add("user", user);
+ page.add("path", path.toString());
+ page.add("homedir", getHomeDir(fs));
+
+ try {
+ boolean hasSchema = false;
+ int viewerId = -1;
+ for (int i = 0; i < viewers.size(); ++i) {
+ HdfsFileViewer viewer = viewers.get(i);
+ Set<Capability> capabilities = EnumSet.noneOf(Capability.class);
+ capabilities = viewer.getCapabilities(fs, path);
+ if (capabilities.contains(Capability.READ)) {
+ if (capabilities.contains(Capability.SCHEMA)) {
+ hasSchema = true;
+ }
+ viewerId = i;
+ break;
+ }
+ }
+ page.add("contentType", viewers.get(viewerId).getContentType().name());
+ page.add("viewerId", viewerId);
+ page.add("hasSchema", hasSchema);
+
+ FileStatus status = fs.getFileStatus(path);
+ page.add("status", status);
+
+ } catch (Exception ex) {
+ page.add("no_fs", "true");
+ page.add("error_message", "Error: " + ex.getMessage());
+ }
+ page.render();
+ }
+
+ private void handleAjaxAction(String username, HttpServletRequest request,
+ HttpServletResponse response, Session session) throws ServletException,
+ IOException {
+ Map<String, Object> ret = new HashMap<String, Object>();
+ FileSystem fs = null;
+ try {
+ try {
+ fs = getFileSystem(username);
+ } catch (HadoopSecurityManagerException e) {
+ errorAjax(response, ret, "Cannot get FileSystem.");
+ return;
+ }
+
+ String ajaxName = getParam(request, "ajax");
+ Path path = null;
+ if (!hasParam(request, "path")) {
+ errorAjax(response, ret, "Missing parameter 'path'.");
+ return;
+ }
+
+ path = new Path(getParam(request, "path"));
+ if (!fs.exists(path)) {
+ errorAjax(response, ret, path.toUri().getPath() + " does not exist.");
+ return;
+ }
+
+ if (ajaxName.equals("fetchschema")) {
+ handleAjaxFetchSchema(fs, request, ret, session, path);
+ } else if (ajaxName.equals("fetchfile")) {
+ // Note: fetchFile writes directly to the output stream. Thus, we need
+ // to make sure we do not write to the output stream once this call
+ // returns.
+ ret = null;
+ handleAjaxFetchFile(fs, request, response, session, path);
+ } else {
+ ret.put("error", "Unknown AJAX action " + ajaxName);
+ }
+
+ if (ret != null) {
+ this.writeJSON(response, ret);
+ }
+ } finally {
+ fs.close();
+ }
+ }
+
+ private void handleAjaxFetchSchema(FileSystem fs, HttpServletRequest req,
+ Map<String, Object> ret, Session session, Path path) throws IOException,
+ ServletException {
+ HdfsFileViewer fileViewer = null;
+ try {
+ if (hasParam(req, "viewerId")) {
+ fileViewer = viewers.get(getIntParam(req, "viewerId"));
+ if (!fileViewer.getCapabilities(fs, path).contains(Capability.SCHEMA)) {
+ fileViewer = null;
+ }
+ } else {
+ for (HdfsFileViewer viewer : viewers) {
+ if (viewer.getCapabilities(fs, path).contains(Capability.SCHEMA)) {
+ fileViewer = viewer;
+ }
+ }
+ }
+ } catch (AccessControlException e) {
+ ret.put("error", "Permission denied.");
+ }
+
+ if (fileViewer == null) {
+ ret.put("error", "No viewers can display schema.");
+ return;
+ }
+ ret.put("schema", fileViewer.getSchema(fs, path));
+ }
+
+ private void handleAjaxFetchFile(FileSystem fs, HttpServletRequest req,
+ HttpServletResponse resp, Session session, Path path) throws IOException,
+ ServletException {
+ int startLine = getIntParam(req, "startLine", defaultStartLine);
+ int endLine = getIntParam(req, "endLine", defaultEndLine);
+ OutputStream output = resp.getOutputStream();
+
+ if (endLine < startLine) {
+ output.write(("Invalid range: endLine < startLine.").getBytes("UTF-8"));
+ return;
+ }
+
+ if (endLine - startLine > fileMaxLines) {
+ output.write(("Invalid range: range exceeds max number of lines.")
+ .getBytes("UTF-8"));
+ return;
+ }
+
+ // Use registered viewers to show the file content
+ HdfsFileViewer fileViewer = null;
+ try {
+ if (hasParam(req, "viewerId")) {
+ fileViewer = viewers.get(getIntParam(req, "viewerId"));
+ if (!fileViewer.getCapabilities(fs, path).contains(Capability.READ)) {
+ fileViewer = null;
+ }
+ } else {
+ for (HdfsFileViewer viewer : viewers) {
+ if (viewer.getCapabilities(fs, path).contains(Capability.READ)) {
+ fileViewer = viewer;
+ break;
+ }
+ }
+ }
+ // use default text viewer
+ if (fileViewer == null) {
+ if (defaultViewer.getCapabilities(fs, path).contains(Capability.READ)) {
+ fileViewer = defaultViewer;
+ } else {
+ output.write(("No viewer available for file.").getBytes("UTF-8"));
+ return;
+ }
+ }
+ } catch (AccessControlException e) {
+ output.write(("Permission denied.").getBytes("UTF-8"));
+ }
+
+ fileViewer.displayFile(fs, path, output, startLine, endLine);
+ }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HdfsFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HdfsFileViewer.java
new file mode 100644
index 0000000..9da1ba9
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HdfsFileViewer.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+
+public abstract class HdfsFileViewer {
+ public abstract String getName();
+
+ public Set<Capability> getCapabilities(FileSystem fs, Path path)
+ throws AccessControlException {
+ return EnumSet.noneOf(Capability.class);
+ }
+
+ public abstract void displayFile(FileSystem fs, Path path,
+ OutputStream outStream, int startLine, int endLine) throws IOException;
+
+ public String getSchema(FileSystem fs, Path path) {
+ return null;
+ }
+
+ public ContentType getContentType() {
+ return ContentType.TEXT;
+ }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HtmlFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HtmlFileViewer.java
new file mode 100644
index 0000000..802c64b
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HtmlFileViewer.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ *
+ * THIS IS AN EXPERIMENTAL FEATURE, USE WITH CAUTION.
+ *
+ * This viewer is aimed to support the rendering of very basic html files.
+ * The content of a html file will be rendered inside an iframe on azkaban
+ * web page to protect from possible malicious javascript code. It does not
+ * support rendering local image files (e.g. image stored on hdfs), but it
+ * does support showing images stored on remote network locations.
+ *
+ * In fact, not just images, but any data that is stored on HDFS are not
+ * accessible from the html page, for example, css and js files. Everything
+ * must either be self contained or referenced with internet location.
+ * (e.g. jquery script hosted on google.com can be fetched, but jquery script
+ * stored on local hdfs cannot)
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.log4j.Logger;
+
+public class HtmlFileViewer extends HdfsFileViewer {
+
+ // only display the first 25M chars. it is used to prevent
+ // showing/downloading gb of data
+ private static final int BUFFER_LIMIT = 25000000;
+ private static final String VIEWER_NAME = "Html";
+ private static final Logger logger = Logger.getLogger(HtmlFileViewer.class);
+ private final Set<String> acceptedSuffix = new HashSet<>();
+
+ public HtmlFileViewer() {
+ this.acceptedSuffix.add(".htm");
+ this.acceptedSuffix.add(".html");
+ }
+
+ @Override
+ public String getName() {
+ return VIEWER_NAME;
+ }
+
+
+ @Override
+ public Set<Capability> getCapabilities(final FileSystem fs, final Path path)
+ throws AccessControlException {
+ final String fileName = path.getName();
+ final int pos = fileName.lastIndexOf('.');
+ if (pos < 0) {
+ return EnumSet.noneOf(Capability.class);
+ }
+
+ final String suffix = fileName.substring(pos).toLowerCase();
+ if (this.acceptedSuffix.contains(suffix)) {
+ return EnumSet.of(Capability.READ);
+ } else {
+ return EnumSet.noneOf(Capability.class);
+ }
+ }
+
+ @Override
+ public void displayFile(final FileSystem fs, final Path path, final OutputStream outputStream,
+ final int startLine, final int endLine) throws IOException {
+
+ if (logger.isDebugEnabled())
+ logger.debug("read in uncompressed html file");
+
+ // BUFFER_LIMIT is the only thing we care about, line limit is redundant and actually not
+ // very useful for html files. Thus using Integer.MAX_VALUE to effectively remove the endLine limit.
+ TextFileViewer.displayFileContent(fs, path, outputStream, startLine, Integer.MAX_VALUE, BUFFER_LIMIT);
+ }
+
+ @Override
+ public ContentType getContentType() {
+ return ContentType.HTML;
+ }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ImageFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ImageFileViewer.java
new file mode 100644
index 0000000..1f65d59
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ImageFileViewer.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.log4j.Logger;
+
+/**
+ * Reads a image file if the file size is not larger than
+ * {@value #MAX_IMAGE_FILE_SIZE}.
+ *
+ * @author ximeng
+ */
+public class ImageFileViewer extends HdfsFileViewer {
+
+ private static Logger logger = Logger.getLogger(ImageFileViewer.class);
+ private static final long MAX_IMAGE_FILE_SIZE = 10485760L;
+ private static final String VIEWER_NAME = "Image";
+
+ private HashSet<String> acceptedSuffix;
+
+ public ImageFileViewer() {
+ final String[] imageSuffix = {
+ ".jpg", ".jpeg", ".tif", ".tiff", ".png", ".gif", ".bmp", ".svg"
+ };
+ acceptedSuffix = new HashSet<String>(Arrays.asList(imageSuffix));
+ }
+
+ @Override
+ public String getName() {
+ return VIEWER_NAME;
+ }
+
+ @Override
+ public Set<Capability> getCapabilities(FileSystem fs, Path path)
+ throws AccessControlException {
+ String fileName = path.getName();
+ int pos = fileName.lastIndexOf('.');
+ if (pos < 0) {
+ return EnumSet.noneOf(Capability.class);
+ }
+
+ String suffix = fileName.substring(pos).toLowerCase();
+ if (acceptedSuffix.contains(suffix)) {
+ long len = 0;
+ try {
+ len = fs.getFileStatus(path).getLen();
+ } catch (AccessControlException e) {
+ throw e;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return EnumSet.noneOf(Capability.class);
+ }
+
+ if (len <= MAX_IMAGE_FILE_SIZE) {
+ return EnumSet.of(Capability.READ);
+ }
+ }
+
+ return EnumSet.noneOf(Capability.class);
+ }
+
+ @Override
+ public void displayFile(FileSystem fs, Path path, OutputStream outputStream,
+ int startLine, int endLine) throws IOException {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("read in image file");
+ }
+
+ InputStream inputStream = null;
+ try {
+ inputStream = new BufferedInputStream(fs.open(path));
+ BufferedOutputStream output = new BufferedOutputStream(outputStream);
+ long outputSize = 0L;
+ byte[] buffer = new byte[16384];
+ int len;
+ while ((len = inputStream.read(buffer)) != -1) {
+ output.write(buffer, 0, len);
+ outputSize += len;
+ if (outputSize > MAX_IMAGE_FILE_SIZE) {
+ break;
+ }
+ }
+ output.flush();
+ } finally {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ }
+ }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ORCFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ORCFileViewer.java
new file mode 100644
index 0000000..696dd44
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ORCFileViewer.java
@@ -0,0 +1,154 @@
+package azkaban.viewer.hdfs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.log4j.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import azkaban.viewer.hdfs.utils.SerDeUtilsWrapper;
+
+/**
+ * This class implements a viewer for ORC files
+ *
+ * @author gaggarwa
+ */
+public class ORCFileViewer extends HdfsFileViewer {
+ private static Logger logger = Logger.getLogger(ORCFileViewer.class);
+ // Will spend 5 seconds trying to pull data and then stop.
+ private final static long STOP_TIME = 5000l;
+ private final static int JSON_INDENT = 2;
+
+ private static final String VIEWER_NAME = "ORC";
+
+ @Override
+ public String getName() {
+ return VIEWER_NAME;
+ }
+
+ /**
+ * Get ORCFileViewer functionalities. Currently schema and read are
+ * supported. {@inheritDoc}
+ *
+ * @see HdfsFileViewer#getCapabilities(org.apache.hadoop.fs.FileSystem,
+ * org.apache.hadoop.fs.Path)
+ */
+ @Override
+ public Set<Capability> getCapabilities(FileSystem fs, Path path) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("orc file path: " + path.toUri().getPath());
+ }
+
+ Reader orcReader = null;
+ RecordReader recordReader = null;
+ try {
+ // no need to close orcreader
+ orcReader = OrcFile.createReader(fs, path);
+ recordReader = orcReader.rows(null);
+ } catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(path.toUri().getPath() + " is not a ORC file.");
+ logger.debug("Error in opening ORC file: "
+ + e.getLocalizedMessage());
+ }
+ return EnumSet.noneOf(Capability.class);
+ } finally {
+ if (recordReader != null) {
+ try {
+ recordReader.close();
+ } catch (IOException e) {
+ logger.error(e);
+ }
+ }
+ }
+ return EnumSet.of(Capability.READ, Capability.SCHEMA);
+ }
+
+ /**
+ * Reads orc file and write to outputstream in json format {@inheritDoc}
+ *
+ * @throws IOException
+ * @throws
+ *
+ * @see HdfsFileViewer#displayFile(org.apache.hadoop.fs.FileSystem,
+ * org.apache.hadoop.fs.Path, OutputStream, int, int)
+ */
+ @Override
+ public void displayFile(FileSystem fs, Path path, OutputStream outStream,
+ int startLine, int endLine) throws IOException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("displaying orc file:" + path.toUri().getPath());
+ }
+ StringBuilder ret = new StringBuilder();
+ Reader orcreader = null;
+ RecordReader reader = null;
+ Object row = null;
+ try {
+ int lineNum = 1;
+
+ orcreader = OrcFile.createReader(fs, path);
+ reader = orcreader.rows(null);
+ long endTime = System.currentTimeMillis() + STOP_TIME;
+ while (reader.hasNext() && lineNum <= endLine
+ && System.currentTimeMillis() <= endTime) {
+ row = reader.next(row);
+ if (lineNum >= startLine) {
+ ret.append(String.format("Record %d:\n", lineNum));
+ String jsonString =
+ SerDeUtilsWrapper.getJSON(row,
+ orcreader.getObjectInspector());
+ try {
+ JSONObject jsonobj = new JSONObject(jsonString);
+ ret.append(jsonobj.toString(JSON_INDENT));
+ } catch (JSONException e) {
+ logger.error("Failed to parse json as JSONObject", e);
+ // default to unformatted json string
+ ret.append(jsonString);
+ }
+ ret.append("\n\n");
+ }
+ lineNum++;
+ }
+ outStream.write(ret.toString().getBytes(StandardCharsets.UTF_8));
+ } catch (IOException e) {
+ outStream.write(("Error in display orc file: " + e
+ .getLocalizedMessage()).getBytes("UTF-8"));
+ throw e;
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
+ /**
+ * Get schema in same syntax as in hadoop --orcdump {@inheritDoc}
+ *
+ * @see HdfsFileViewer#getSchema(org.apache.hadoop.fs.FileSystem,
+ * org.apache.hadoop.fs.Path)
+ */
+ @Override
+ public String getSchema(FileSystem fs, Path path) {
+ String schema = null;
+ try {
+ Reader orcReader = OrcFile.createReader(fs, path);
+ schema = orcReader.getObjectInspector().getTypeName();
+ } catch (IOException e) {
+ logger
+ .warn("Cannot get schema for file: " + path.toUri().getPath());
+ return null;
+ }
+
+ return schema;
+ }
+
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ParquetFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ParquetFileViewer.java
new file mode 100644
index 0000000..0118842
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ParquetFileViewer.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.log4j.Logger;
+
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+import parquet.avro.AvroParquetReader;
+import parquet.avro.AvroSchemaConverter;
+
+/**
+ * This class implements a viewer for Parquet files.
+ *
+ * @author David Z. Chen (dchen@linkedin.com)
+ */
+public class ParquetFileViewer extends HdfsFileViewer {
+ private static Logger logger = Logger.getLogger(ParquetFileViewer.class);
+
+ // Will spend 5 seconds trying to pull data and then stop.
+ private final static long STOP_TIME = 2000l;
+
+ private static final String VIEWER_NAME = "Parquet";
+
+ @Override
+ public String getName() {
+ return VIEWER_NAME;
+ }
+
+ @Override
+ public Set<Capability> getCapabilities(FileSystem fs, Path path)
+ throws AccessControlException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Parquet file path: " + path.toUri().getPath());
+ }
+
+ AvroParquetReader<GenericRecord> parquetReader = null;
+ try {
+ parquetReader = new AvroParquetReader<GenericRecord>(path);
+ } catch (IOException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(path.toUri().getPath() + " is not a Parquet file.");
+ logger.debug("Error in opening Parquet file: "
+ + e.getLocalizedMessage());
+ }
+ return EnumSet.noneOf(Capability.class);
+ } finally {
+ try {
+ if (parquetReader != null) {
+ parquetReader.close();
+ }
+ } catch (IOException e) {
+ logger.error(e);
+ }
+ }
+ return EnumSet.of(Capability.READ, Capability.SCHEMA);
+ }
+
+ @Override
+ public void displayFile(FileSystem fs, Path path, OutputStream outputStream,
+ int startLine, int endLine) throws IOException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Display Parquet file: " + path.toUri().getPath());
+ }
+
+ JsonGenerator json = null;
+ AvroParquetReader<GenericRecord> parquetReader = null;
+ try {
+ parquetReader = new AvroParquetReader<GenericRecord>(path);
+
+ // Initialize JsonGenerator.
+ json =
+ new JsonFactory()
+ .createJsonGenerator(outputStream, JsonEncoding.UTF8);
+ json.useDefaultPrettyPrinter();
+
+ // Declare the avroWriter encoder that will be used to output the records
+ // as JSON but don't construct them yet because we need the first record
+ // in order to get the Schema.
+ DatumWriter<GenericRecord> avroWriter = null;
+ Encoder encoder = null;
+
+ long endTime = System.currentTimeMillis() + STOP_TIME;
+ int line = 1;
+ while (line <= endLine && System.currentTimeMillis() <= endTime) {
+ GenericRecord record = parquetReader.read();
+ if (record == null) {
+ break;
+ }
+
+ if (avroWriter == null) {
+ Schema schema = record.getSchema();
+ avroWriter = new GenericDatumWriter<GenericRecord>(schema);
+ encoder = EncoderFactory.get().jsonEncoder(schema, json);
+ }
+
+ if (line >= startLine) {
+ String recordStr = "\n\nRecord " + line + ":\n";
+ outputStream.write(recordStr.getBytes("UTF-8"));
+ avroWriter.write(record, encoder);
+ encoder.flush();
+ }
+ ++line;
+ }
+ } catch (IOException e) {
+ outputStream.write(("Error in displaying Parquet file: " + e
+ .getLocalizedMessage()).getBytes("UTF-8"));
+ throw e;
+ } catch (Throwable t) {
+ logger.error(t.getMessage());
+ return;
+ } finally {
+ if (json != null) {
+ json.close();
+ }
+ parquetReader.close();
+ }
+ }
+
+ @Override
+ public String getSchema(FileSystem fs, Path path) {
+ String schema = null;
+ try {
+ AvroParquetReader<GenericRecord> parquetReader =
+ new AvroParquetReader<GenericRecord>(path);
+ GenericRecord record = parquetReader.read();
+ if (record == null) {
+ return null;
+ }
+ Schema avroSchema = record.getSchema();
+ AvroSchemaConverter converter = new AvroSchemaConverter();
+ schema = converter.convert(avroSchema).toString();
+ } catch (IOException e) {
+ logger.warn("Cannot get schema for file: " + path.toUri().getPath());
+ return null;
+ }
+
+ return schema;
+ }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/SequenceFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/SequenceFileViewer.java
new file mode 100644
index 0000000..1ea029a
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/SequenceFileViewer.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.EnumSet;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+
+public abstract class SequenceFileViewer extends HdfsFileViewer {
+
+ protected abstract Set<Capability> getCapabilities(
+ AzkabanSequenceFileReader.Reader reader);
+
+ protected abstract void displaySequenceFile(
+ AzkabanSequenceFileReader.Reader reader, PrintWriter output,
+ int startLine, int endLine) throws IOException;
+
+ @Override
+ public Set<Capability> getCapabilities(final FileSystem fs, final Path path)
+ throws AccessControlException {
+ Set<Capability> result = EnumSet.noneOf(Capability.class);
+ AzkabanSequenceFileReader.Reader reader = null;
+ try {
+ reader =
+ new AzkabanSequenceFileReader.Reader(fs, path, new Configuration());
+ result = getCapabilities(reader);
+ } catch (final AccessControlException e) {
+ throw e;
+ } catch (final IOException e) {
+ return EnumSet.noneOf(Capability.class);
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (final IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ @SuppressWarnings("DefaultCharset")
+ public void displayFile(final FileSystem fs, final Path file, final OutputStream outputStream,
+ final int startLine, final int endLine) throws IOException {
+
+ AzkabanSequenceFileReader.Reader reader = null;
+ final PrintWriter writer = new PrintWriter(outputStream);
+ try {
+ reader =
+ new AzkabanSequenceFileReader.Reader(fs, file, new Configuration());
+ displaySequenceFile(reader, writer, startLine, endLine);
+ } catch (final IOException e) {
+ writer.write("Error opening sequence file " + e);
+ throw e;
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/TextFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/TextFileViewer.java
new file mode 100644
index 0000000..591d4cd
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/TextFileViewer.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.log4j.Logger;
+
+public class TextFileViewer extends HdfsFileViewer {
+
+ private static Logger logger = Logger.getLogger(TextFileViewer.class);
+ private HashSet<String> acceptedSuffix = new HashSet<String>();
+
+ // only display the first 1M chars. it is used to prevent
+ // showing/downloading gb of data
+ private static final int BUFFER_LIMIT = 1000000;
+ private static final String VIEWER_NAME = "Text";
+
+ public TextFileViewer() {
+ acceptedSuffix.add(".txt");
+ acceptedSuffix.add(".csv");
+ acceptedSuffix.add(".props");
+ acceptedSuffix.add(".xml");
+ acceptedSuffix.add(".html");
+ acceptedSuffix.add(".json");
+ acceptedSuffix.add(".log");
+ }
+
+ @Override
+ public String getName() {
+ return VIEWER_NAME;
+ }
+
+ @Override
+ public Set<Capability> getCapabilities(FileSystem fs, Path path)
+ throws AccessControlException {
+ return EnumSet.of(Capability.READ);
+ }
+
+ @Override
+ public void displayFile(FileSystem fs, Path path, OutputStream outputStream,
+ int startLine, int endLine) throws IOException {
+
+ if (logger.isDebugEnabled())
+ logger.debug("read in uncompressed text file");
+
+ displayFileContent(fs, path, outputStream, startLine, endLine, BUFFER_LIMIT);
+ }
+
+ @SuppressWarnings("DefaultCharset")
+ static void displayFileContent(FileSystem fs, Path path, OutputStream outputStream,
+ int startLine, int endLine, int bufferLimit) throws IOException {
+
+ InputStream inputStream = null;
+ BufferedReader reader = null;
+ try {
+ inputStream = fs.open(path);
+ reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
+ PrintWriter output = new PrintWriter(outputStream);
+ for (int i = 1; i < startLine; i++) {
+ reader.readLine();
+ }
+
+ int bufferSize = 0;
+ for (int i = startLine; i < endLine; i++) {
+ String line = reader.readLine();
+ if (line == null)
+ break;
+
+ // break if reach the buffer limit
+ bufferSize += line.length();
+ if (bufferSize >= bufferLimit)
+ break;
+
+ output.write(line);
+ output.write("\n");
+ }
+ output.flush();
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ }
+ }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/utils/SerDeUtilsWrapper.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/utils/SerDeUtilsWrapper.java
new file mode 100644
index 0000000..5b5edfd
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/utils/SerDeUtilsWrapper.java
@@ -0,0 +1,235 @@
+package azkaban.viewer.hdfs.utils;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Adapted from @see org.apache.hadoop.hive.serde2.SerDeUtils to escape binary
+ * string and have a valid JSON String
+ */
+public final class SerDeUtilsWrapper {
+
+ /**
+ * Get serialized json using an orc object and corresponding object
+ * inspector Adapted from
+ * {@link org.apache.hadoop.hive.serde2.SerDeUtils#getJSONString(Object, ObjectInspector)}
+ *
+ * @param obj
+ * @param objIns
+ * @return
+ */
+ public static String getJSON(Object obj, ObjectInspector objIns) {
+ StringBuilder sb = new StringBuilder();
+ buildJSONString(sb, obj, objIns);
+ return sb.toString();
+ }
+
+ private static void buildJSONString(StringBuilder sb, Object obj,
+ ObjectInspector objIns) {
+ String nullStr = "null";
+ switch (objIns.getCategory()) {
+ case PRIMITIVE: {
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) objIns;
+ if (obj == null) {
+ sb.append(nullStr);
+ } else {
+ switch (poi.getPrimitiveCategory()) {
+ case BOOLEAN: {
+ boolean b = ((BooleanObjectInspector) poi).get(obj);
+ sb.append(b ? "true" : "false");
+ break;
+ }
+ case BYTE: {
+ sb.append(((ByteObjectInspector) poi).get(obj));
+ break;
+ }
+ case SHORT: {
+ sb.append(((ShortObjectInspector) poi).get(obj));
+ break;
+ }
+ case INT: {
+ sb.append(((IntObjectInspector) poi).get(obj));
+ break;
+ }
+ case LONG: {
+ sb.append(((LongObjectInspector) poi).get(obj));
+ break;
+ }
+ case FLOAT: {
+ sb.append(((FloatObjectInspector) poi).get(obj));
+ break;
+ }
+ case DOUBLE: {
+ sb.append(((DoubleObjectInspector) poi).get(obj));
+ break;
+ }
+ case STRING: {
+ sb.append('"');
+ sb.append(SerDeUtils
+ .escapeString(((StringObjectInspector) poi)
+ .getPrimitiveJavaObject(obj)));
+ sb.append('"');
+ break;
+ }
+ case VARCHAR: {
+ sb.append('"');
+ sb.append(SerDeUtils
+ .escapeString(((HiveVarcharObjectInspector) poi)
+ .getPrimitiveJavaObject(obj).toString()));
+ sb.append('"');
+ break;
+ }
+ case DATE: {
+ sb.append('"');
+ sb.append(((DateObjectInspector) poi)
+ .getPrimitiveWritableObject(obj));
+ sb.append('"');
+ break;
+ }
+ case TIMESTAMP: {
+ sb.append('"');
+ sb.append(((TimestampObjectInspector) poi)
+ .getPrimitiveWritableObject(obj));
+ sb.append('"');
+ break;
+ }
+ case BINARY: {
+ BytesWritable bw =
+ ((BinaryObjectInspector) objIns)
+ .getPrimitiveWritableObject(obj);
+ Text txt = new Text();
+ txt.set(bw.getBytes(), 0, bw.getLength());
+ // Fix to serialize binary type
+ sb.append('"');
+ sb.append(StringEscapeUtils.escapeJava(txt.toString()));
+ sb.append('"');
+ break;
+ }
+ case DECIMAL: {
+ sb.append(((HiveDecimalObjectInspector) objIns)
+ .getPrimitiveJavaObject(obj));
+ break;
+ }
+ default:
+ throw new RuntimeException("Unknown primitive type: "
+ + poi.getPrimitiveCategory());
+ }
+ }
+ break;
+ }
+ case LIST: {
+ ListObjectInspector loi = (ListObjectInspector) objIns;
+ ObjectInspector listElementObjectInspector =
+ loi.getListElementObjectInspector();
+ List<?> olist = loi.getList(obj);
+ if (olist == null) {
+ sb.append(nullStr);
+ } else {
+ sb.append(SerDeUtils.LBRACKET);
+ for (int i = 0; i < olist.size(); i++) {
+ if (i > 0) {
+ sb.append(SerDeUtils.COMMA);
+ }
+ buildJSONString(sb, olist.get(i),
+ listElementObjectInspector);
+ }
+ sb.append(SerDeUtils.RBRACKET);
+ }
+ break;
+ }
+ case MAP: {
+ MapObjectInspector moi = (MapObjectInspector) objIns;
+ ObjectInspector mapKeyObjectInspector =
+ moi.getMapKeyObjectInspector();
+ ObjectInspector mapValueObjectInspector =
+ moi.getMapValueObjectInspector();
+ Map<?, ?> omap = moi.getMap(obj);
+ if (omap == null) {
+ sb.append(nullStr);
+ } else {
+ sb.append(SerDeUtils.LBRACE);
+ boolean first = true;
+ for (Object entry : omap.entrySet()) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(SerDeUtils.COMMA);
+ }
+ Map.Entry<?, ?> e = (Map.Entry<?, ?>) entry;
+ buildJSONString(sb, e.getKey(), mapKeyObjectInspector);
+ sb.append(SerDeUtils.COLON);
+ buildJSONString(sb, e.getValue(), mapValueObjectInspector);
+ }
+ sb.append(SerDeUtils.RBRACE);
+ }
+ break;
+ }
+ case STRUCT: {
+ StructObjectInspector soi = (StructObjectInspector) objIns;
+ List<? extends StructField> structFields =
+ soi.getAllStructFieldRefs();
+ if (obj == null) {
+ sb.append(nullStr);
+ } else {
+ sb.append(SerDeUtils.LBRACE);
+ for (int i = 0; i < structFields.size(); i++) {
+ if (i > 0) {
+ sb.append(SerDeUtils.COMMA);
+ }
+ sb.append(SerDeUtils.QUOTE);
+ sb.append(structFields.get(i).getFieldName());
+ sb.append(SerDeUtils.QUOTE);
+ sb.append(SerDeUtils.COLON);
+ buildJSONString(sb,
+ soi.getStructFieldData(obj, structFields.get(i)),
+ structFields.get(i).getFieldObjectInspector());
+ }
+ sb.append(SerDeUtils.RBRACE);
+ }
+ break;
+ }
+ case UNION: {
+ UnionObjectInspector uoi = (UnionObjectInspector) objIns;
+ if (obj == null) {
+ sb.append(nullStr);
+ } else {
+ sb.append(SerDeUtils.LBRACE);
+ sb.append(uoi.getTag(obj));
+ sb.append(SerDeUtils.COLON);
+ buildJSONString(sb, uoi.getField(obj), uoi
+ .getObjectInspectors().get(uoi.getTag(obj)));
+ sb.append(SerDeUtils.RBRACE);
+ }
+ break;
+ }
+ default:
+ throw new RuntimeException("Unknown type in ObjectInspector!");
+ }
+ }
+
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/change-proxy-modal.vm b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/change-proxy-modal.vm
new file mode 100644
index 0000000..fd67c65
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/change-proxy-modal.vm
@@ -0,0 +1,40 @@
+#*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *#
+
+ <div class="modal" id="messageDialog">
+ <div class="modal-dialog">
+ <div class="modal-content">
+ <div class="modal-header" id="messageTitle">
+ <button type="button" class="close" data-dismiss="modal" aria-hidden="true">×</button>
+ <h4 class="modal-title">Change Proxy User</h4>
+ </div>
+ <div class="modal-body" id="messageDiv">
+ <fieldset>
+ <div class="form-group">
+ <label for="proxyname" class="col-sm-3 control-label">Proxy User</label>
+ <div class="col-sm-9">
+ <input id="proxyname" type="text" name="proxyname" val="${user}" class="form-control">
+ </div>
+ </div>
+ </fieldset>
+ </div>
+ <div class="modal-footer">
+ <button type="button" class="btn btn-default" data-dismiss="modal">Cancel</button>
+ <button id="submitChangeUserBtn" class="btn btn-primary">Change User</button>
+ </div>
+ </div>
+ </div>
+ </div>
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-browser.vm b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-browser.vm
new file mode 100644
index 0000000..5c1869b
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-browser.vm
@@ -0,0 +1,147 @@
+#*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *#
+
+<!DOCTYPE html>
+<html>
+ <head lang="en">
+
+#parse ("azkaban/webapp/servlet/velocity/style.vm")
+#parse ("azkaban/webapp/servlet/velocity/javascript.vm")
+
+ <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-1.10.1.custom.js"></script>
+ <script type="text/javascript">
+ var contextURL = "${context}";
+ var currentTime = ${currentTime};
+ var timezone = "${timezone}";
+ var homeDir = "${homedir}";
+ </script>
+#if ($allowproxy)
+ #parse ("azkaban/viewer/hdfs/velocity/hdfs-proxy-js.vm")
+#end
+#parse ("azkaban/viewer/hdfs/velocity/hdfs-viewer-css.vm")
+ </head>
+ <body>
+
+#set ($current_page="$viewerName")
+#parse ("azkaban/webapp/servlet/velocity/nav.vm")
+
+#if ($errorMsg)
+ #parse ("azkaban/webapp/servlet/velocity/errormsg.vm")
+#else
+
+ <div class="az-page-header">
+ <div class="container-full">
+ <div class="row">
+ <div class="header-title">
+ <h1><a href="${context}/hdfs">HDFS Browser</a></h1>
+ </div>
+ <div class="header-control">
+ <div class="pull-right header-form">
+ #if ($allowproxy)
+ <strong>Proxy user</strong> ${user}
+ <button type="button" id="changeUserBtn" class="btn btn-sm btn-primary">Change User</button>
+ #end
+ <a id="goHomeLink" class="btn btn-sm btn-default" href="${context}/hdfs${homedir}">Home Directory</a>
+ </div>
+ <div class="clearfix"></div>
+ </div>
+ </div>
+ </div>
+ </div>
+
+ <div class="container-full">
+
+ #parse ("azkaban/webapp/servlet/velocity/alerts.vm")
+ #if (!$no_fs)
+
+ <div class="row">
+ <div class="col-xs-12">
+ <div class="panel panel-default" id="all-hdfsbrowser-content">
+ <div class="panel-heading">
+ #set ($size = $paths.size() - 1)
+ <a class="firstCrumb" href="${context}/hdfs/"> / </a>#if($size >= 0)#foreach($i in [0 ..$size])<a href="$context/hdfs${paths.get($i)}">${segments.get($i)}</a><span> / </span>#end #end
+ <div class="pull-right">
+ <strong>$subdirs.size()</strong> items <strong>$utils.displayBytes($dirsize)</strong> total
+ </div>
+ </div>
+ <table id="hdfs-dir" class="table table-condensed table-striped table-hover table-bordered">
+ <thead>
+ <tr>
+ <th>File</th>
+ <th>Permission</th>
+ <th>Owner/Group</th>
+ <th>Size</th>
+ <th>Block Size</th>
+ <th>Reps</th>
+ <th>Modified Date</th>
+ </tr>
+ </thead>
+ <tbody>
+ #if ($subdirs)
+ #foreach ($status in $subdirs)
+ <tr>
+ <td>
+ <a href="${context}/hdfs${status.getPath().toUri().getPath()}">
+ #if ($status.isDir())
+ <span class="glyphicon glyphicon-folder-close icon-directory"></span>
+ #else
+ <span class="glyphicon glyphicon-file icon-file"></span>
+ #end
+ ${status.path.name}#if($status.isDir())/#end</a>
+ </a>
+ </td>
+ <td>${status.permission}</td>
+ <td>${status.owner}/${status.group}</td>
+ <td>
+ #if ($status.isDir())
+ –
+ #else
+ $utils.displayBytes(${status.len})
+ #end
+ </td>
+ <td>
+ #if ($status.isDir())
+ –
+ #else
+ $utils.displayBytes(${status.getBlockSize()})
+ #end
+ </td>
+ <td>
+ #if ($status.isDir())
+ –
+ #else
+ ${status.getReplication()}
+ #end
+ </td>
+ <td>$utils.formatDateTime(${status.modificationTime})</td>
+ </tr>
+ #end
+ #else
+ <tr><td>No Files In This Directory</td></tr>
+ #end
+ </tbody>
+ </table>
+ #end
+
+ </div><!-- /.panel -->
+ </div><!-- /.col-xs-12 -->
+ </div><!-- /.row -->
+
+#parse ("azkaban/viewer/hdfs/velocity/change-proxy-modal.vm")
+#end
+ </div><!-- /.container-full -->
+ </body>
+</html>
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-file.vm b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-file.vm
new file mode 100644
index 0000000..d57e4b5
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-file.vm
@@ -0,0 +1,158 @@
+#*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *#
+
+<!DOCTYPE html>
+<html>
+ <head lang="en">
+
+#parse ("azkaban/webapp/servlet/velocity/style.vm")
+#parse ("azkaban/webapp/servlet/velocity/javascript.vm")
+
+ <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-1.10.1.custom.js"></script>
+ <script type="text/javascript">
+ var contextURL = "${context}";
+ var currentTime = ${currentTime};
+ var timezone = "${timezone}";
+ var homeDir = "${homedir}";
+ var path = "${path}";
+ var hasSchema = ${hasSchema};
+ var viewerId = ${viewerId};
+ var contentType = "${contentType}";
+ </script>
+#if ($allowproxy)
+ #parse ("azkaban/viewer/hdfs/velocity/hdfs-proxy-js.vm")
+#end
+#parse ("azkaban/viewer/hdfs/velocity/hdfs-file-js.vm")
+#parse ("azkaban/viewer/hdfs/velocity/hdfs-viewer-css.vm")
+ </head>
+ <body>
+
+#set ($current_page = "$viewerName")
+#parse ("azkaban/webapp/servlet/velocity/nav.vm")
+
+#if ($errorMsg)
+ #parse ("azkaban/webapp/servlet/velocity/errormsg.vm")
+#else
+
+ <div class="az-page-header">
+ <div class="container-full">
+ <div class="row">
+ <div class="header-title">
+ <h1><a href="${context}/hdfs">HDFS Browser</a></h1>
+ </div>
+ <div class="header-control">
+ <div class="pull-right header-form">
+ #if ($allowproxy)
+ <strong>Proxy user</strong> ${user}
+ <button type="button" id="changeUserBtn" class="btn btn-sm btn-primary">Change User</button>
+ #end
+ <a id="goHomeLink" class="btn btn-sm btn-default" href="${context}/hdfs${homedir}">Home Directory</a>
+ </div>
+ <div class="clearfix"></div>
+ </div>
+ </div>
+ </div>
+ </div>
+
+ <div class="container-full">
+
+ #parse ("azkaban/webapp/servlet/velocity/alerts.vm")
+ #if (!$no_fs)
+
+ <div class="row">
+ <div class="col-xs-12">
+ <div class="panel panel-default" id="all-hdfsbrowser-content">
+ <div class="panel-heading">
+ #set ($size = $paths.size() - 1)
+ <a class="firstCrumb" href="${context}/hdfs/"> / </a>
+ #if ($size >= 0)
+ #set ($end = $size - 1)
+ #foreach ($i in [0 ..$end])
+ <a href="$context/hdfs${paths.get($i)}">${segments.get($i)}</a><span> / </span>
+ #end
+ ${segments.get($size)}
+ #end
+ </div>
+ <table class="table table-condensed table-striped table-bordered">
+ <tbody>
+ <tr>
+ <td class="property-key">Permission</td>
+ <td class="property-value-half">${status.permission}</td>
+ <td class="property-key">Owner/Group</td>
+ <td class="property-value-half">${status.owner}/${status.group}</td>
+ </tr>
+ <tr>
+ <td class="property-key">Size</td>
+ <td class="property-value-half">$utils.displayBytes(${status.len})</td>
+ <td class="property-key">Block Size</td>
+ <td class="property-value-half">$utils.displayBytes(${status.getBlockSize()})</td>
+ </tr>
+ <tr>
+ <td class="property-key">Reps</td>
+ <td class="property-value-half">${status.getReplication()}</td>
+ <td class="property-key">Modified Date</td>
+ <td class="property-value-half">$utils.formatDateTime(${status.modificationTime})</td>
+ </tr>
+ </tbody>
+ </table>
+ </div><!-- /.panel -->
+ </div><!-- /.col-xs-12 -->
+ </div><!-- /.row -->
+
+ <div class="row">
+ <div class="col-xs-12">
+ <ul class="nav nav-tabs nav-sm" id="file-tabs">
+ <li class="active"><a href="#contents" data-toggle="tab">Contents</a></li>
+ #if ($hasSchema)
+ <li><a href="#schema" data-toggle="tab">Schema</a></li>
+ #end
+ </ul>
+
+ <div class="tab-content">
+ <div class="tab-pane active" id="contents">
+ <div class="progress progress-striped active" id="file-contents-loading">
+ <div class="progress-bar progress-bar-info" role="progressbar" aria-valuenow="100" aria-valuemin="0" aria-valuemax="100" style="width: 100%">
+ <span class="sr-only">Loading...</span>
+ </div>
+ </div>
+
+ #if ($contentType == "HTML")
+ <iframe id="file-contents-iframe" style="display:none;overflow:scroll" sandbox="allow-same-origin"></iframe>
+ #else
+ <pre id="file-contents"></pre>
+ #end
+ </div>
+
+ #if ($hasSchema)
+ <div class="tab-pane" id="schema">
+ <div class="progress progress-striped active" id="file-schema-loading">
+ <div class="progress-bar progress-bar-info" role="progressbar" aria-valuenow="100" aria-valuemin="0" aria-valuemax="100" style="width: 100%">
+ <span class="sr-only">Loading...</span>
+ </div>
+ </div>
+ <pre id="file-schema"></pre>
+ </div>
+ #end
+ </div>
+ </div>
+ </div>
+
+ #end
+#parse ("azkaban/viewer/hdfs/velocity/change-proxy-modal.vm")
+#end
+ </div><!-- /.container-full -->
+ </body>
+</html>
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-file-js.vm b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-file-js.vm
new file mode 100644
index 0000000..197895a
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-file-js.vm
@@ -0,0 +1,192 @@
+#*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *#
+
+<script type="text/javascript">
+
+$.namespace('azkaban');
+
+azkaban.HdfsSchemaModel = Backbone.Model.extend({
+ initialize: function() {
+ this.set('schema', '');
+ },
+
+ fetchSchema: function(path, viewerId) {
+ var requestURL = '/hdfs';
+ var requestData = {
+ 'ajax': 'fetchschema',
+ 'path': path,
+ 'viewerId': viewerId
+ };
+ var model = this;
+ var successHandler = function(data) {
+ if (data.error != null) {
+ model.set('error', data.error);
+ }
+ if (data.schema != null) {
+ model.set('schema', data.schema);
+ }
+ };
+ $.get(requestURL, requestData, successHandler, 'json');
+ }
+});
+
+azkaban.HdfsSchemaView = Backbone.View.extend({
+ events: {
+ },
+
+ initialize: function(settings) {
+ this.listenTo(this.model, 'change:schema', this.render);
+ this.rendered = false;
+ },
+
+ show: function() {
+ if (this.rendered == true) {
+ return;
+ }
+ this.model.fetchSchema(path, viewerId);
+ },
+
+ render: function(self) {
+ if (this.rendered == true) {
+ return;
+ }
+
+ var schema = this.model.get('schema');
+ if (schema == null) {
+ return;
+ }
+
+ $('#file-schema-loading').hide();
+ $('#file-schema').show().text(schema);
+ this.rendered = true;
+ }
+});
+
+azkaban.HdfsFileModel = Backbone.Model.extend({
+ initialize: function() {
+ this.set('contents', '');
+ },
+
+ fetchFile: function(path, viewerId) {
+ var requestURL = '/hdfs';
+ var requestData = {
+ 'ajax': 'fetchfile',
+ 'path': path,
+ 'viewerId': viewerId,
+ };
+ var model = this;
+ var successHandler = function(data) {
+ if (data.error != null) {
+ model.set('error', data.error);
+ }
+ if (data === '') {
+ data = 'Oops this is an empty file !!';
+ }
+ model.set('contents', data);
+ };
+ $.get(requestURL, requestData, successHandler, 'text');
+ }
+});
+
+azkaban.HdfsFileView = Backbone.View.extend({
+ events: {
+ },
+
+ initialize: function(settings) {
+ this.listenTo(this.model, 'change:contents', this.render);
+ this.rendered = false;
+ },
+
+ show: function() {
+ if (this.rendered == true) {
+ return;
+ }
+ this.model.fetchFile(path, viewerId);
+ },
+
+ render: function(self) {
+ if (this.rendered == true) {
+ return;
+ }
+
+ var file = this.model.get('contents');
+ if (file == null) {
+ return;
+ }
+
+ $('#file-contents-loading').hide();
+
+ if (contentType == "HTML") {
+ var iframe = document.getElementById('file-contents-iframe');
+ // show iframe (initially it's hidden)
+ iframe.style.display = "inline";
+ // Write content to iframe. We can't just set the html source content as
+ // inner html of the iframe. The iframe hosts a completely separate dom
+ // structure than the current html, thus setting the inner HTML will not
+ // work. The only way to store html inside iframe is by either writing
+ // the content directly into it or set the content in srcdoc attribute.
+ // Since either way is fine, will go with write method since it looks
+ // nicer when you inspect the html structure.
+ var doc = iframe.contentWindow.document;
+ doc.open();
+ doc.write(file);
+ doc.close();
+ // adjust iframe size
+ iframe.width = iframe.contentWindow.document.body.scrollWidth;
+ iframe.height = iframe.contentWindow.document.body.scrollHeight;
+ } else {
+ $('#file-contents').show().text(file);
+ }
+ this.rendered = true;
+ }
+});
+
+var schemaModel;
+var schemaView;
+
+var fileModel;
+var fileView;
+
+$(function () {
+ $('a[data-toggle="tab"]').on('shown.bs.tab', function (e) {
+ var current = e.target;
+ var previous = e.relatedTarget;
+ var hash = $(current).attr('href');
+ if (hash == '#schema' && schemaModel != null) {
+ schemaView.show();
+ }
+ else if (hash == '#contents') {
+ fileView.show();
+ }
+ });
+
+ if (hasSchema) {
+ schemaModel = new azkaban.HdfsSchemaModel();
+ schemaView = new azkaban.HdfsSchemaView({
+ el: $("#schema"),
+ model: schemaModel
+ });
+ }
+
+ fileModel = new azkaban.HdfsFileModel();
+ fileView = new azkaban.HdfsFileView({
+ el: $("#contents"),
+ model: fileModel
+ });
+
+ fileView.show();
+});
+</script>
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-proxy-js.vm b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-proxy-js.vm
new file mode 100644
index 0000000..99dfe27
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-proxy-js.vm
@@ -0,0 +1,47 @@
+#*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *#
+
+<script type="text/javascript">
+$(function() {
+ $('#changeUserBtn').click( function() {
+ $('#messageDialog').modal();
+ $('#proxyname').focus();
+ });
+
+ $('#submitChangeUserBtn').click( function() {
+ var user = $('#proxyname').val();
+ var requestData = {
+ 'action': 'changeProxyUser',
+ 'proxyname': user
+ };
+ var successHandler = function(data) {
+ if (data.error) {
+ $('#messageBox').text(data.error);
+ }
+ else {
+ location.reload(true);
+ }
+ };
+ $.post("", requestData, successHandler);
+ });
+
+ $('#proxyname').keyup(function(event) {
+ if (event.key == "Enter") {
+ $('#submitChangeUserBtn').click();
+ }
+ });
+});
+</script>
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-viewer-css.vm b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-viewer-css.vm
new file mode 100644
index 0000000..2b536e0
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-viewer-css.vm
@@ -0,0 +1,30 @@
+#*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *#
+
+<style type="text/css">
+.icon-directory {
+ color: #428bca;
+}
+
+.icon-file {
+ color: #cccccc;
+}
+
+#file-contents,
+#file-schema {
+ display: none;
+}
+</style>
diff --git a/az-hdfs-viewer/src/test/java/azkaban/viewer/hdfs/HtmlFileViewerTest.java b/az-hdfs-viewer/src/test/java/azkaban/viewer/hdfs/HtmlFileViewerTest.java
new file mode 100644
index 0000000..f093973
--- /dev/null
+++ b/az-hdfs-viewer/src/test/java/azkaban/viewer/hdfs/HtmlFileViewerTest.java
@@ -0,0 +1,87 @@
+package azkaban.viewer.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * <pre>
+ * Test cases for HtmlFileViewer
+ *
+ * Validate accepted capabilities for the viewer and ensures that it
+ * generates the correct content and content type.
+ * </pre>
+ */
+public class HtmlFileViewerTest {
+ private static final String EMPTY_HTM = "TestHtmEmptyFile.htm";
+ private static final String VALID_HTML = "TestHtmlFile.html";
+ FileSystem fs;
+
+ HtmlFileViewer viewer;
+
+ @Before
+ public void setUp() throws IOException {
+ this.fs = new LocalFileSystem();
+ this.fs.initialize(this.fs.getWorkingDirectory().toUri(), new Configuration());
+ this.viewer = new HtmlFileViewer();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ this.fs.close();
+ }
+
+ @Test
+ public void testCapabilities() throws AccessControlException {
+ Set<Capability> capabilities = this.viewer
+ .getCapabilities(this.fs, getResourcePath(EMPTY_HTM));
+ // READ should be the the one and only capability
+ assertTrue(capabilities.contains(Capability.READ));
+ assertEquals(capabilities.size(), 1);
+
+ capabilities = this.viewer.getCapabilities(this.fs, getResourcePath(VALID_HTML));
+ // READ should be the the one and only capability
+ assertTrue(capabilities.contains(Capability.READ));
+ assertEquals(capabilities.size(), 1);
+ }
+
+ @Test
+ public void testContentType() {
+ assertEquals(ContentType.HTML, this.viewer.getContentType());
+ }
+
+ @Test
+ public void testEmptyFile() throws IOException {
+ final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ this.viewer.displayFile(this.fs, getResourcePath(EMPTY_HTM), outStream, 1, 2);
+ final String output = outStream.toString();
+ assertTrue(output.isEmpty());
+ }
+
+ @Test
+ @SuppressWarnings("DefaultCharset")
+ public void testValidHtmlFile() throws IOException {
+ final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ this.viewer.displayFile(this.fs, getResourcePath(VALID_HTML), outStream, 1, 2);
+ final String output = new String(outStream.toByteArray());
+ assertEquals(output, "<p>file content</p>\n");
+ }
+
+ /* Get Path to a file from resource dir */
+ private Path getResourcePath(final String filename) {
+ final String HDFS_VIEWER_ROOT_PATH = "../test/hdfs-viewer-sample-files/";
+ return new Path(HDFS_VIEWER_ROOT_PATH + filename);
+ }
+}
diff --git a/az-hdfs-viewer/src/test/java/azkaban/viewer/hdfs/ORCFileViewerTest.java b/az-hdfs-viewer/src/test/java/azkaban/viewer/hdfs/ORCFileViewerTest.java
new file mode 100644
index 0000000..4f92f25
--- /dev/null
+++ b/az-hdfs-viewer/src/test/java/azkaban/viewer/hdfs/ORCFileViewerTest.java
@@ -0,0 +1,207 @@
+package azkaban.viewer.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Set;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * <pre>
+ * Test cases for ORCFileViewer
+ *
+ * Validate capability for ORC files Validate false capability for avro, text
+ * and parquet schema for ORC files verify raw records from orc files
+ * </pre>
+ */
+@Ignore
+public class ORCFileViewerTest {
+ ORCFileViewer viewer;
+ Set<Capability> supportedCapabilities;
+ Set<Capability> unSupportedCapabilities;
+ FileSystem fs;
+
+ @Before
+ public void setUp() throws IOException {
+ this.fs = new LocalFileSystem();
+ this.fs.initialize(this.fs.getWorkingDirectory().toUri(), new Configuration());
+ this.viewer = new ORCFileViewer();
+ this.supportedCapabilities = EnumSet.of(Capability.READ, Capability.SCHEMA);
+ this.unSupportedCapabilities = EnumSet.noneOf(Capability.class);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ this.fs.close();
+ }
+
+ @SuppressWarnings("DefaultCharset")
+ /* Calls ORCFileViewer#displayFile and parse results */
+ String displayRecordWrapper(final String filename, final int startRecord, final int endRecord)
+ throws IOException {
+ final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ this.viewer.displayFile(this.fs, getResourcePath(filename), outStream,
+ startRecord, endRecord);
+ String records = new String(outStream.toByteArray());
+ records = records.replaceAll("Record [0-9]*:", "");
+ records = StringUtils.deleteWhitespace(records);
+ return records;
+ }
+
+ /* Get Path to a file from resource dir */
+ Path getResourcePath(final String filename) {
+ final String HDFS_VIEWER_ROOT_PATH = "../test/hdfs-viewer-sample-files/";
+ return new Path(HDFS_VIEWER_ROOT_PATH + filename);
+ }
+
+ /* verify capability for empty orc files */
+ @Test
+ public void orcEmptyFileCapability() throws IOException {
+ assertEquals(this.supportedCapabilities, this.viewer.getCapabilities(this.fs,
+ getResourcePath("TestOrcFile.emptyFile.orc")));
+ }
+
+ /* verify capability for generic orc files */
+ @Test
+ public void genericORCFileCapability() throws IOException {
+ assertEquals(this.supportedCapabilities, this.viewer.getCapabilities(this.fs,
+ getResourcePath("TestOrcFile.testPredicatePushdown.orc")));
+ }
+
+ /* verify capability for orc files with binary type */
+ @Test
+ public void binaryTypeORCFileCapability() throws IOException {
+ assertEquals(this.supportedCapabilities, this.viewer.getCapabilities(this.fs,
+ getResourcePath("TestOrcFile.testStringAndBinaryStatistics.orc")));
+ }
+
+ /* verify capability for snappy compressed orc file */
+ @Test
+ public void snappyCompressedORCFileCapability() throws IOException {
+ assertEquals(this.supportedCapabilities, this.viewer.getCapabilities(this.fs,
+ getResourcePath("TestOrcFile.testSnappy.orc")));
+ }
+
+ /* verify capability for union type orc file */
+ @Test
+ public void unionTypeORCFileCapability() throws IOException {
+ assertEquals(this.supportedCapabilities, this.viewer.getCapabilities(this.fs,
+ getResourcePath("TestOrcFile.testUnionAndTimestamp.orc")));
+ }
+
+ /* verify capability for avro files */
+ @Test
+ public void noAvroCapability() throws IOException {
+ assertEquals(this.unSupportedCapabilities,
+ this.viewer.getCapabilities(this.fs, getResourcePath("TestAvro.avro")));
+ }
+
+ /* verify capability for text files */
+ @Test
+ public void noTextCapability() throws IOException {
+ assertEquals(this.unSupportedCapabilities,
+ this.viewer.getCapabilities(this.fs, getResourcePath("TestTextFile.txt")));
+ }
+
+ /* verify capability for parquet files */
+ @Test
+ public void noParquetCapability() throws IOException {
+ assertEquals(this.unSupportedCapabilities, this.viewer.getCapabilities(this.fs,
+ getResourcePath("TestParquetFile.parquet")));
+ }
+
+ /* verify schema for empty orc files */
+ @Test
+ public void emptyORCFileSchema() throws IOException {
+ final String schema =
+ "struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int,long1:bigint,"
+ + "float1:float,double1:double,bytes1:binary,string1:string,"
+ + "middle:struct<list:array<struct<int1:int,string1:string>>>,"
+ + "list:array<struct<int1:int,string1:string>>,map:map<string,struct<int1:int,string1:string>>>";
+ assertEquals(schema,
+ this.viewer.getSchema(this.fs, getResourcePath("TestOrcFile.emptyFile.orc")));
+ }
+
+ /* verify schema for generic orc files */
+ @Test
+ public void genericORCFileSchema() throws IOException {
+ assertEquals("struct<int1:int,string1:string>", this.viewer.getSchema(this.fs,
+ getResourcePath("TestOrcFile.testPredicatePushdown.orc")));
+ }
+
+ /* verify schema for orc files with binary type */
+ @Test
+ public void binaryTypeFileSchema() throws IOException {
+ assertEquals("struct<bytes1:binary,string1:string>", this.viewer.getSchema(
+ this.fs,
+ getResourcePath("TestOrcFile.testStringAndBinaryStatistics.orc")));
+ }
+
+ /* verify schema for snappy compressed orc file */
+ @Test
+ public void snappyCompressedFileSchema() throws IOException {
+ assertEquals("struct<int1:int,string1:string>",
+ this.viewer.getSchema(this.fs, getResourcePath("TestOrcFile.testSnappy.orc")));
+ }
+
+ /* verify schema for union type orc file */
+ @Test
+ public void unionTypeFileSchema() throws IOException {
+ assertEquals(
+ "struct<time:timestamp,union:uniontype<int,string>,decimal:decimal>",
+ this.viewer.getSchema(this.fs,
+ getResourcePath("TestOrcFile.testUnionAndTimestamp.orc")));
+ }
+
+ /* verify record display for empty orc files */
+ @Test
+ public void emptyORCFileDisplay() throws IOException {
+ final String actual = displayRecordWrapper("TestOrcFile.emptyFile.orc", 1, 1);
+ assertEquals("", actual);
+ }
+
+ /* verify record display for generic orc files */
+ @Test
+ public void genericORCFileDisplay() throws IOException {
+ final String actual =
+ displayRecordWrapper("TestOrcFile.testPredicatePushdown.orc", 2, 2);
+ assertEquals("{\"int1\":300,\"string1\":\"a\"}", actual);
+ }
+
+ /* verify record display for orc files with binary type */
+ @Test
+ public void binaryTypeFileDisplay() throws IOException {
+ final String actual =
+ displayRecordWrapper(
+ "TestOrcFile.testStringAndBinaryStatistics.orc", 4, 4);
+ assertEquals("{\"bytes1\":null,\"string1\":\"hi\"}", actual);
+ }
+
+ /* verify record display for snappy compressed orc file */
+ @Test
+ public void snappyCompressedFileDisplay() throws IOException {
+ final String actual =
+ displayRecordWrapper("TestOrcFile.testSnappy.orc", 2, 2);
+ assertEquals("{\"int1\":1181413113,\"string1\":\"382fdaaa\"}", actual);
+ }
+
+ /* verify record display for union type orc file */
+ @Test
+ public void unionTypeFileDisplay() throws IOException {
+ final String actual =
+ displayRecordWrapper("TestOrcFile.testUnionAndTimestamp.orc", 5, 5);
+ assertEquals("{\"decimal\":null,\"time\":null,\"union\":{\"1\":null}}",
+ actual);
+ }
+
+}
diff --git a/az-hdfs-viewer/src/test/resources/TestAvro.avro b/az-hdfs-viewer/src/test/resources/TestAvro.avro
new file mode 100644
index 0000000..8ffdc97
Binary files /dev/null and b/az-hdfs-viewer/src/test/resources/TestAvro.avro differ
diff --git a/az-hdfs-viewer/src/test/resources/TestHtmEmptyFile.htm b/az-hdfs-viewer/src/test/resources/TestHtmEmptyFile.htm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/az-hdfs-viewer/src/test/resources/TestHtmEmptyFile.htm
diff --git a/az-hdfs-viewer/src/test/resources/TestHtmlFile.html b/az-hdfs-viewer/src/test/resources/TestHtmlFile.html
new file mode 100644
index 0000000..2c77cfb
--- /dev/null
+++ b/az-hdfs-viewer/src/test/resources/TestHtmlFile.html
@@ -0,0 +1 @@
+<p>file content</p>
\ No newline at end of file
diff --git a/az-hdfs-viewer/src/test/resources/TestOrcFile.emptyFile.orc b/az-hdfs-viewer/src/test/resources/TestOrcFile.emptyFile.orc
new file mode 100644
index 0000000..ecdadcb
Binary files /dev/null and b/az-hdfs-viewer/src/test/resources/TestOrcFile.emptyFile.orc differ
diff --git a/az-hdfs-viewer/src/test/resources/TestOrcFile.testPredicatePushdown.orc b/az-hdfs-viewer/src/test/resources/TestOrcFile.testPredicatePushdown.orc
new file mode 100644
index 0000000..4865dd8
Binary files /dev/null and b/az-hdfs-viewer/src/test/resources/TestOrcFile.testPredicatePushdown.orc differ
diff --git a/az-hdfs-viewer/src/test/resources/TestOrcFile.testSnappy.orc b/az-hdfs-viewer/src/test/resources/TestOrcFile.testSnappy.orc
new file mode 100644
index 0000000..aa6cc9c
Binary files /dev/null and b/az-hdfs-viewer/src/test/resources/TestOrcFile.testSnappy.orc differ
diff --git a/az-hdfs-viewer/src/test/resources/TestOrcFile.testStringAndBinaryStatistics.orc b/az-hdfs-viewer/src/test/resources/TestOrcFile.testStringAndBinaryStatistics.orc
new file mode 100644
index 0000000..4282c2a
Binary files /dev/null and b/az-hdfs-viewer/src/test/resources/TestOrcFile.testStringAndBinaryStatistics.orc differ
diff --git a/az-hdfs-viewer/src/test/resources/TestOrcFile.testUnionAndTimestamp.orc b/az-hdfs-viewer/src/test/resources/TestOrcFile.testUnionAndTimestamp.orc
new file mode 100644
index 0000000..377862d
Binary files /dev/null and b/az-hdfs-viewer/src/test/resources/TestOrcFile.testUnionAndTimestamp.orc differ
diff --git a/az-hdfs-viewer/src/test/resources/TestParquetFile.parquet b/az-hdfs-viewer/src/test/resources/TestParquetFile.parquet
new file mode 100644
index 0000000..bc61f97
Binary files /dev/null and b/az-hdfs-viewer/src/test/resources/TestParquetFile.parquet differ
diff --git a/az-hdfs-viewer/src/test/resources/TestTextFile.txt b/az-hdfs-viewer/src/test/resources/TestTextFile.txt
new file mode 100644
index 0000000..830e987
--- /dev/null
+++ b/az-hdfs-viewer/src/test/resources/TestTextFile.txt
@@ -0,0 +1 @@
+This is a dummy text file.
build.gradle 3(+3 -0)
diff --git a/build.gradle b/build.gradle
index 524eae0..2d5dafe 100644
--- a/build.gradle
+++ b/build.gradle
@@ -101,8 +101,11 @@ ext.deps = [
metricsCore : 'io.dropwizard.metrics:metrics-core:3.1.0',
metricsJvm : 'io.dropwizard.metrics:metrics-jvm:3.1.0',
mockito : 'org.mockito:mockito-core:2.10.0',
+ mongoDriver : 'org.mongodb:mongo-java-driver:2.14.0',
mysqlConnector : 'mysql:mysql-connector-java:5.1.28',
pig : 'org.apache.pig:pig:' + versions.pig,
+ parquetAvro : 'com.twitter:parquet-avro:1.3.2',
+ parquetBundle : 'com.twitter:parquet-hadoop-bundle:1.3.2',
quartz : 'org.quartz-scheduler:quartz:2.2.1',
restliGenerator : 'com.linkedin.pegasus:generator:' + versions.restli,
restliServer : 'com.linkedin.pegasus:restli-server:' + versions.restli,
settings.gradle 1(+1 -0)
diff --git a/settings.gradle b/settings.gradle
index a0d63cf..9a94fe6 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -29,3 +29,4 @@ include 'az-flow-trigger-dependency-plugin'
include 'test'
include 'az-reportal'
include 'az-hadoop-jobtype-plugin'
+include 'az-hdfs-viewer'
diff --git a/test/hdfs-viewer-sample-files/TestAvro.avro b/test/hdfs-viewer-sample-files/TestAvro.avro
new file mode 100644
index 0000000..8ffdc97
Binary files /dev/null and b/test/hdfs-viewer-sample-files/TestAvro.avro differ
diff --git a/test/hdfs-viewer-sample-files/TestHtmEmptyFile.htm b/test/hdfs-viewer-sample-files/TestHtmEmptyFile.htm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/hdfs-viewer-sample-files/TestHtmEmptyFile.htm
diff --git a/test/hdfs-viewer-sample-files/TestHtmlFile.html b/test/hdfs-viewer-sample-files/TestHtmlFile.html
new file mode 100644
index 0000000..2c77cfb
--- /dev/null
+++ b/test/hdfs-viewer-sample-files/TestHtmlFile.html
@@ -0,0 +1 @@
+<p>file content</p>
\ No newline at end of file
diff --git a/test/hdfs-viewer-sample-files/TestOrcFile.emptyFile.orc b/test/hdfs-viewer-sample-files/TestOrcFile.emptyFile.orc
new file mode 100644
index 0000000..ecdadcb
Binary files /dev/null and b/test/hdfs-viewer-sample-files/TestOrcFile.emptyFile.orc differ
diff --git a/test/hdfs-viewer-sample-files/TestOrcFile.testPredicatePushdown.orc b/test/hdfs-viewer-sample-files/TestOrcFile.testPredicatePushdown.orc
new file mode 100644
index 0000000..4865dd8
Binary files /dev/null and b/test/hdfs-viewer-sample-files/TestOrcFile.testPredicatePushdown.orc differ
diff --git a/test/hdfs-viewer-sample-files/TestOrcFile.testSnappy.orc b/test/hdfs-viewer-sample-files/TestOrcFile.testSnappy.orc
new file mode 100644
index 0000000..aa6cc9c
Binary files /dev/null and b/test/hdfs-viewer-sample-files/TestOrcFile.testSnappy.orc differ
diff --git a/test/hdfs-viewer-sample-files/TestOrcFile.testStringAndBinaryStatistics.orc b/test/hdfs-viewer-sample-files/TestOrcFile.testStringAndBinaryStatistics.orc
new file mode 100644
index 0000000..4282c2a
Binary files /dev/null and b/test/hdfs-viewer-sample-files/TestOrcFile.testStringAndBinaryStatistics.orc differ
diff --git a/test/hdfs-viewer-sample-files/TestOrcFile.testUnionAndTimestamp.orc b/test/hdfs-viewer-sample-files/TestOrcFile.testUnionAndTimestamp.orc
new file mode 100644
index 0000000..377862d
Binary files /dev/null and b/test/hdfs-viewer-sample-files/TestOrcFile.testUnionAndTimestamp.orc differ
diff --git a/test/hdfs-viewer-sample-files/TestParquetFile.parquet b/test/hdfs-viewer-sample-files/TestParquetFile.parquet
new file mode 100644
index 0000000..bc61f97
Binary files /dev/null and b/test/hdfs-viewer-sample-files/TestParquetFile.parquet differ
diff --git a/test/hdfs-viewer-sample-files/TestTextFile.txt b/test/hdfs-viewer-sample-files/TestTextFile.txt
new file mode 100644
index 0000000..830e987
--- /dev/null
+++ b/test/hdfs-viewer-sample-files/TestTextFile.txt
@@ -0,0 +1 @@
+This is a dummy text file.