azkaban-aplcache

Hdfsviewer move from azkaban-plugins (#1781) This moduleThis

6/1/2018 5:42:54 PM

Changes

build.gradle 3(+3 -0)

settings.gradle 1(+1 -0)

Details

diff --git a/az-hdfs-viewer/build.gradle b/az-hdfs-viewer/build.gradle
new file mode 100644
index 0000000..359dbc8
--- /dev/null
+++ b/az-hdfs-viewer/build.gradle
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+apply plugin: 'distribution'
+
+dependencies {
+    compileOnly project(':az-core')
+    compileOnly project(":azkaban-common")
+    compileOnly project(":azkaban-web-server")
+    compileOnly project(":azkaban-hadoop-security-plugin")
+
+    compile deps.mongoDriver
+    compile deps.parquetAvro
+    compile deps.parquetBundle
+    compileOnly deps.hadoopCommon
+    compileOnly deps.hadoopMRClientCommon
+    compileOnly deps.hadoopMRClientCore
+
+    testCompile deps.hadoopCommon
+    compileOnly deps.hiveMetastore
+    compileOnly(deps.hiveExecCore) {
+        exclude group: 'org.pentaho', module: 'pentaho-aggdesigner-algorithm'
+        exclude group: 'eigenbase', module: 'eigenbase-properties'
+    }
+}
+
+distributions {
+    main {
+        contents {
+            from(jar) {
+                into 'lib'
+            }
+            from(configurations.runtime) {
+                into 'lib'
+            }
+        }
+    }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/AvroFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/AvroFileViewer.java
new file mode 100644
index 0000000..7b63c69
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/AvroFileViewer.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.log4j.Logger;
+
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+/**
+ * This class implements a viewer of avro files
+ *
+ * @author lguo
+ */
+public class AvroFileViewer extends HdfsFileViewer {
+
+  private static Logger logger = Logger.getLogger(AvroFileViewer.class);
+  // Will spend 5 seconds trying to pull data and then stop.
+  private static long STOP_TIME = 2000l;
+
+  private static final String VIEWER_NAME = "Avro";
+
+  @Override
+  public String getName() {
+    return VIEWER_NAME;
+  }
+
+  @Override
+  public Set<Capability> getCapabilities(FileSystem fs, Path path)
+      throws AccessControlException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("path:" + path.toUri().getPath());
+    }
+
+    DataFileStream<Object> avroDataStream = null;
+    try {
+      avroDataStream = getAvroDataStream(fs, path);
+      Schema schema = avroDataStream.getSchema();
+      return (schema != null) ? EnumSet.of(Capability.READ, Capability.SCHEMA)
+          : EnumSet.noneOf(Capability.class);
+    } catch (AccessControlException e) {
+      throw e;
+    } catch (IOException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(path.toUri().getPath() + " is not an avro file.");
+        logger.debug("Error in getting avro schema: ", e);
+      }
+      return EnumSet.noneOf(Capability.class);
+    } finally {
+      try {
+        if (avroDataStream != null) {
+          avroDataStream.close();
+        }
+      } catch (IOException e) {
+        logger.error(e);
+      }
+    }
+  }
+
+  @Override
+  public String getSchema(FileSystem fs, Path path) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("path:" + path.toUri().getPath());
+    }
+
+    DataFileStream<Object> avroDataStream = null;
+    try {
+      avroDataStream = getAvroDataStream(fs, path);
+      Schema schema = avroDataStream.getSchema();
+      return schema.toString(true);
+    } catch (IOException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(path.toUri().getPath() + " is not an avro file.");
+        logger.debug("Error in getting avro schema: ", e);
+      }
+      return null;
+    } finally {
+      try {
+        if (avroDataStream != null) {
+          avroDataStream.close();
+        }
+      } catch (IOException e) {
+        logger.error(e);
+      }
+    }
+  }
+
+  private DataFileStream<Object> getAvroDataStream(FileSystem fs, Path path)
+      throws IOException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("path:" + path.toUri().getPath());
+    }
+
+    GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
+    InputStream hdfsInputStream = null;
+    try {
+      hdfsInputStream = fs.open(path);
+    } catch (IOException e) {
+      if (hdfsInputStream != null) {
+        hdfsInputStream.close();
+      }
+      throw e;
+    }
+
+    DataFileStream<Object> avroDataFileStream = null;
+    try {
+      avroDataFileStream =
+          new DataFileStream<Object>(hdfsInputStream, avroReader);
+    } catch (IOException e) {
+      if (hdfsInputStream != null) {
+        hdfsInputStream.close();
+      }
+      throw e;
+    }
+
+    return avroDataFileStream;
+  }
+
+  @Override
+  public void displayFile(FileSystem fs, Path path, OutputStream outputStream,
+      int startLine, int endLine) throws IOException {
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("display avro file:" + path.toUri().getPath());
+    }
+
+    DataFileStream<Object> avroDatastream = null;
+    JsonGenerator g = null;
+
+    try {
+      avroDatastream = getAvroDataStream(fs, path);
+      Schema schema = avroDatastream.getSchema();
+      DatumWriter<Object> avroWriter = new GenericDatumWriter<Object>(schema);
+
+      g = new JsonFactory().createJsonGenerator(
+          outputStream, JsonEncoding.UTF8);
+      g.useDefaultPrettyPrinter();
+      Encoder encoder = EncoderFactory.get().jsonEncoder(schema, g);
+
+      long endTime = System.currentTimeMillis() + STOP_TIME;
+      int lineno = 1; // line number starts from 1
+      while (avroDatastream.hasNext() && lineno <= endLine
+          && System.currentTimeMillis() <= endTime) {
+        Object datum = avroDatastream.next();
+        if (lineno >= startLine) {
+          String record = "\n\n Record " + lineno + ":\n";
+          outputStream.write(record.getBytes("UTF-8"));
+          avroWriter.write(datum, encoder);
+          encoder.flush();
+        }
+        lineno++;
+      }
+    } catch (IOException e) {
+      outputStream.write(("Error in display avro file: " + e
+          .getLocalizedMessage()).getBytes("UTF-8"));
+      throw e;
+    } finally {
+      if (g != null) {
+        g.close();
+      }
+      avroDatastream.close();
+    }
+  }
+
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/AzkabanSequenceFileReader.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/AzkabanSequenceFileReader.java
new file mode 100644
index 0000000..c1bf7a7
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/AzkabanSequenceFileReader.java
@@ -0,0 +1,1075 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import azkaban.security.commons.HadoopSecurityManager;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile.Metadata;
+import org.apache.hadoop.io.SequenceFile.ValueBytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableName;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Our forked version of hadoop sequence file.
+ * it is to deal with sequence file possibly keep open connections and snap up
+ * ports
+ */
+@SuppressWarnings("deprecation")
+public class AzkabanSequenceFileReader {
+  private final static Logger LOG = Logger
+      .getLogger(HadoopSecurityManager.class);
+  private static final byte BLOCK_COMPRESS_VERSION = (byte) 4;
+  private static final byte CUSTOM_COMPRESS_VERSION = (byte) 5;
+  private static final byte VERSION_WITH_METADATA = (byte) 6;
+  private static final int SYNC_ESCAPE = -1; // "length" of sync entries
+  private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
+  private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash
+  /** The number of bytes between sync points. */
+  public static final int SYNC_INTERVAL = 100 * SYNC_SIZE;
+  private static final byte[] VERSION = new byte[]{(byte) 'S', (byte) 'E',
+      (byte) 'Q', VERSION_WITH_METADATA};
+
+  private AzkabanSequenceFileReader() {
+  } // no public ctor
+
+  /**
+   * The compression type used to compress key/value pairs in the
+   * {@link AzkabanSequenceFileReader}.
+   *
+   * @see AzkabanSequenceFileReader.Writer
+   */
+  public static enum CompressionType {
+    /** Do not compress records. */
+    NONE,
+    /** Compress values only, each separately. */
+    RECORD,
+    /** Compress sequences of records together in blocks. */
+    BLOCK
+  }
+
+  private static class UncompressedBytes implements ValueBytes {
+    private int dataSize;
+    private byte[] data;
+
+    private UncompressedBytes() {
+      this.data = null;
+      this.dataSize = 0;
+    }
+
+    private void reset(final DataInputStream in, final int length) throws IOException {
+      this.data = new byte[length];
+      this.dataSize = -1;
+
+      in.readFully(this.data);
+      this.dataSize = this.data.length;
+    }
+
+    @Override
+    public int getSize() {
+      return this.dataSize;
+    }
+
+    @Override
+    public void writeUncompressedBytes(final DataOutputStream outStream)
+        throws IOException {
+      outStream.write(this.data, 0, this.dataSize);
+    }
+
+    @Override
+    public void writeCompressedBytes(final DataOutputStream outStream)
+        throws IllegalArgumentException, IOException {
+      throw new IllegalArgumentException(
+          "UncompressedBytes cannot be compressed!");
+    }
+
+  } // UncompressedBytes
+
+  private static class CompressedBytes implements ValueBytes {
+    DataInputBuffer rawData = null;
+    CompressionCodec codec = null;
+    CompressionInputStream decompressedStream = null;
+    private int dataSize;
+    private byte[] data;
+
+    private CompressedBytes(final CompressionCodec codec) {
+      this.data = null;
+      this.dataSize = 0;
+      this.codec = codec;
+    }
+
+    private void reset(final DataInputStream in, final int length) throws IOException {
+      this.data = new byte[length];
+      this.dataSize = -1;
+
+      in.readFully(this.data);
+      this.dataSize = this.data.length;
+    }
+
+    @Override
+    public int getSize() {
+      return this.dataSize;
+    }
+
+    @Override
+    public void writeUncompressedBytes(final DataOutputStream outStream)
+        throws IOException {
+      if (this.decompressedStream == null) {
+        this.rawData = new DataInputBuffer();
+        this.decompressedStream = this.codec.createInputStream(this.rawData);
+      } else {
+        this.decompressedStream.resetState();
+      }
+      this.rawData.reset(this.data, 0, this.dataSize);
+
+      final byte[] buffer = new byte[8192];
+      int bytesRead = 0;
+      while ((bytesRead = this.decompressedStream.read(buffer, 0, 8192)) != -1) {
+        outStream.write(buffer, 0, bytesRead);
+      }
+    }
+
+    @Override
+    public void writeCompressedBytes(final DataOutputStream outStream)
+        throws IllegalArgumentException, IOException {
+      outStream.write(this.data, 0, this.dataSize);
+    }
+
+  } // CompressedBytes
+
+  /** Reads key/value pairs from a sequence-format file. */
+  public static class Reader implements Closeable {
+
+    private final Path file;
+    private final DataOutputBuffer outBuf = new DataOutputBuffer();
+    private final byte[] sync = new byte[SYNC_HASH_SIZE];
+    private final byte[] syncCheck = new byte[SYNC_HASH_SIZE];
+    private final long end;
+    private final Configuration conf;
+    private final boolean lazyDecompress = true;
+    private FSDataInputStream in;
+    private byte version;
+    private String keyClassName;
+    private String valClassName;
+    @SuppressWarnings("rawtypes")
+    private Class keyClass;
+    @SuppressWarnings("rawtypes")
+    private Class valClass;
+    private CompressionCodec codec = null;
+    private Metadata metadata = null;
+    private boolean syncSeen;
+    private int keyLength;
+    private int recordLength;
+    private boolean decompress;
+    private boolean blockCompressed;
+    private int noBufferedRecords = 0;
+    private boolean valuesDecompressed = true;
+
+    private int noBufferedKeys = 0;
+    private int noBufferedValues = 0;
+
+    private DataInputBuffer keyLenBuffer = null;
+    private CompressionInputStream keyLenInFilter = null;
+    private DataInputStream keyLenIn = null;
+    private Decompressor keyLenDecompressor = null;
+    private DataInputBuffer keyBuffer = null;
+    private CompressionInputStream keyInFilter = null;
+    private DataInputStream keyIn = null;
+    private Decompressor keyDecompressor = null;
+
+    private DataInputBuffer valLenBuffer = null;
+    private CompressionInputStream valLenInFilter = null;
+    private DataInputStream valLenIn = null;
+    private Decompressor valLenDecompressor = null;
+    private DataInputBuffer valBuffer = null;
+    private CompressionInputStream valInFilter = null;
+    private DataInputStream valIn = null;
+    private Decompressor valDecompressor = null;
+
+    @SuppressWarnings("rawtypes")
+    private Deserializer keyDeserializer;
+    @SuppressWarnings("rawtypes")
+    private Deserializer valDeserializer;
+
+    /** Open the named file. */
+    public Reader(final FileSystem fs, final Path file, final Configuration conf)
+        throws IOException {
+      this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false);
+    }
+
+    private Reader(final FileSystem fs, final Path file, final int bufferSize,
+        final Configuration conf, final boolean tempReader) throws IOException {
+      this(fs, file, bufferSize, 0, fs.getLength(file), conf, tempReader);
+    }
+
+    private Reader(final FileSystem fs, final Path file, final int bufferSize, final long start,
+        final long length, final Configuration conf, final boolean tempReader) throws IOException {
+      this.file = file;
+
+      try {
+        this.in = openFile(fs, file, bufferSize, length);
+        this.conf = conf;
+        seek(start);
+        this.end = this.in.getPos() + length;
+        init(tempReader);
+      } catch (final IOException e) {
+        if (this.in != null) {
+          this.in.close();
+        }
+        throw e;
+      }
+    }
+
+    /**
+     * Override this method to specialize the type of {@link FSDataInputStream}
+     * returned.
+     */
+    protected FSDataInputStream openFile(final FileSystem fs, final Path file,
+        final int bufferSize, final long length) throws IOException {
+      return fs.open(file, bufferSize);
+    }
+
+    /**
+     * Initialize the {@link Reader}
+     *
+     * @param tmpReader <code>true</code> if we are constructing a temporary
+     *          reader
+     *          {@link AzkabanSequenceFileReader.Sorter.cloneFileAttributes},
+     *          and hence do not initialize every component; <code>false</code>
+     *          otherwise.
+     * @throws IOException
+     */
+    private void init(final boolean tempReader) throws IOException {
+      final byte[] versionBlock = new byte[VERSION.length];
+      this.in.readFully(versionBlock);
+
+      if ((versionBlock[0] != VERSION[0]) || (versionBlock[1] != VERSION[1])
+          || (versionBlock[2] != VERSION[2])) {
+        throw new IOException(this.file + " not a SequenceFile");
+      }
+
+      // Set 'version'
+      this.version = versionBlock[3];
+      if (this.version > VERSION[3]) {
+        throw new VersionMismatchException(VERSION[3], this.version);
+      }
+
+      if (this.version < BLOCK_COMPRESS_VERSION) {
+        final UTF8 className = new UTF8();
+
+        className.readFields(this.in);
+        this.keyClassName = className.toString(); // key class name
+
+        className.readFields(this.in);
+        this.valClassName = className.toString(); // val class name
+      } else {
+        this.keyClassName = Text.readString(this.in);
+        this.valClassName = Text.readString(this.in);
+      }
+
+      if (this.version > 2) { // if version > 2
+        this.decompress = this.in.readBoolean(); // is compressed?
+      } else {
+        this.decompress = false;
+      }
+
+      if (this.version >= BLOCK_COMPRESS_VERSION) { // if version >= 4
+        this.blockCompressed = this.in.readBoolean(); // is block-compressed?
+      } else {
+        this.blockCompressed = false;
+      }
+
+      // if version >= 5
+      // setup the compression codec
+      if (this.decompress) {
+        if (this.version >= CUSTOM_COMPRESS_VERSION) {
+          final String codecClassname = Text.readString(this.in);
+          try {
+            final Class<? extends CompressionCodec> codecClass =
+                this.conf.getClassByName(codecClassname).asSubclass(
+                    CompressionCodec.class);
+            this.codec = ReflectionUtils.newInstance(codecClass, this.conf);
+          } catch (final ClassNotFoundException cnfe) {
+            throw new IllegalArgumentException("Unknown codec: "
+                + codecClassname, cnfe);
+          }
+        } else {
+          this.codec = new DefaultCodec();
+          ((Configurable) this.codec).setConf(this.conf);
+        }
+      }
+
+      this.metadata = new Metadata();
+      if (this.version >= VERSION_WITH_METADATA) { // if version >= 6
+        this.metadata.readFields(this.in);
+      }
+
+      if (this.version > 1) { // if version > 1
+        this.in.readFully(this.sync); // read sync bytes
+      }
+
+      // Initialize... *not* if this we are constructing a temporary Reader
+      if (!tempReader) {
+        this.valBuffer = new DataInputBuffer();
+        if (this.decompress) {
+          this.valDecompressor = CodecPool.getDecompressor(this.codec);
+          this.valInFilter = this.codec.createInputStream(this.valBuffer, this.valDecompressor);
+          this.valIn = new DataInputStream(this.valInFilter);
+        } else {
+          this.valIn = this.valBuffer;
+        }
+
+        if (this.blockCompressed) {
+          this.keyLenBuffer = new DataInputBuffer();
+          this.keyBuffer = new DataInputBuffer();
+          this.valLenBuffer = new DataInputBuffer();
+
+          this.keyLenDecompressor = CodecPool.getDecompressor(this.codec);
+          this.keyLenInFilter =
+              this.codec.createInputStream(this.keyLenBuffer, this.keyLenDecompressor);
+          this.keyLenIn = new DataInputStream(this.keyLenInFilter);
+
+          this.keyDecompressor = CodecPool.getDecompressor(this.codec);
+          this.keyInFilter = this.codec.createInputStream(this.keyBuffer, this.keyDecompressor);
+          this.keyIn = new DataInputStream(this.keyInFilter);
+
+          this.valLenDecompressor = CodecPool.getDecompressor(this.codec);
+          this.valLenInFilter =
+              this.codec.createInputStream(this.valLenBuffer, this.valLenDecompressor);
+          this.valLenIn = new DataInputStream(this.valLenInFilter);
+        }
+
+        final SerializationFactory serializationFactory =
+            new SerializationFactory(this.conf);
+        this.keyDeserializer =
+            getDeserializer(serializationFactory, getKeyClass());
+        if (!this.blockCompressed) {
+          this.keyDeserializer.open(this.valBuffer);
+        } else {
+          this.keyDeserializer.open(this.keyIn);
+        }
+        this.valDeserializer =
+            getDeserializer(serializationFactory, getValueClass());
+        this.valDeserializer.open(this.valIn);
+      }
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private Deserializer getDeserializer(final SerializationFactory sf, final Class c) {
+      return sf.getDeserializer(c);
+    }
+
+    /** Close the file. */
+    @Override
+    public synchronized void close() throws IOException {
+      // Return the decompressors to the pool
+      CodecPool.returnDecompressor(this.keyLenDecompressor);
+      CodecPool.returnDecompressor(this.keyDecompressor);
+      CodecPool.returnDecompressor(this.valLenDecompressor);
+      CodecPool.returnDecompressor(this.valDecompressor);
+      this.keyLenDecompressor = this.keyDecompressor = null;
+      this.valLenDecompressor = this.valDecompressor = null;
+
+      if (this.keyDeserializer != null) {
+        this.keyDeserializer.close();
+      }
+      if (this.valDeserializer != null) {
+        this.valDeserializer.close();
+      }
+
+      // Close the input-stream
+      if (this.in != null) {
+        this.in.close();
+      }
+    }
+
+    /** Returns the name of the key class. */
+    public String getKeyClassName() {
+      return this.keyClassName;
+    }
+
+    /** Returns the class of keys in this file. */
+    public synchronized Class<?> getKeyClass() {
+      if (null == this.keyClass) {
+        try {
+          this.keyClass = WritableName.getClass(getKeyClassName(), this.conf);
+        } catch (final IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      return this.keyClass;
+    }
+
+    /** Returns the name of the value class. */
+    public String getValueClassName() {
+      return this.valClassName;
+    }
+
+    /** Returns the class of values in this file. */
+    public synchronized Class<?> getValueClass() {
+      if (null == this.valClass) {
+        try {
+          this.valClass = WritableName.getClass(getValueClassName(), this.conf);
+        } catch (final IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      return this.valClass;
+    }
+
+    /** Returns true if values are compressed. */
+    public boolean isCompressed() {
+      return this.decompress;
+    }
+
+    /** Returns true if records are block-compressed. */
+    public boolean isBlockCompressed() {
+      return this.blockCompressed;
+    }
+
+    /** Returns the compression codec of data in this file. */
+    public CompressionCodec getCompressionCodec() {
+      return this.codec;
+    }
+
+    /** Returns the metadata object of the file */
+    public Metadata getMetadata() {
+      return this.metadata;
+    }
+
+    /** Returns the configuration used for this file. */
+    Configuration getConf() {
+      return this.conf;
+    }
+
+    /** Read a compressed buffer */
+    private synchronized void readBuffer(final DataInputBuffer buffer,
+        final CompressionInputStream filter) throws IOException {
+      // Read data into a temporary buffer
+      final DataOutputBuffer dataBuffer = new DataOutputBuffer();
+
+      try {
+        final int dataBufferLength = WritableUtils.readVInt(this.in);
+        dataBuffer.write(this.in, dataBufferLength);
+
+        // Set up 'buffer' connected to the input-stream
+        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
+      } finally {
+        dataBuffer.close();
+      }
+
+      // Reset the codec
+      filter.resetState();
+    }
+
+    /** Read the next 'compressed' block */
+    private synchronized void readBlock() throws IOException {
+      // Check if we need to throw away a whole block of
+      // 'values' due to 'lazy decompression'
+      if (this.lazyDecompress && !this.valuesDecompressed) {
+        this.in.seek(WritableUtils.readVInt(this.in) + this.in.getPos());
+        this.in.seek(WritableUtils.readVInt(this.in) + this.in.getPos());
+      }
+
+      // Reset internal states
+      this.noBufferedKeys = 0;
+      this.noBufferedValues = 0;
+      this.noBufferedRecords = 0;
+      this.valuesDecompressed = false;
+
+      // Process sync
+      if (this.sync != null) {
+        this.in.readInt();
+        this.in.readFully(this.syncCheck); // read syncCheck
+        if (!Arrays.equals(this.sync, this.syncCheck)) // check it
+          throw new IOException("File is corrupt!");
+      }
+      this.syncSeen = true;
+
+      // Read number of records in this block
+      this.noBufferedRecords = WritableUtils.readVInt(this.in);
+
+      // Read key lengths and keys
+      readBuffer(this.keyLenBuffer, this.keyLenInFilter);
+      readBuffer(this.keyBuffer, this.keyInFilter);
+      this.noBufferedKeys = this.noBufferedRecords;
+
+      // Read value lengths and values
+      if (!this.lazyDecompress) {
+        readBuffer(this.valLenBuffer, this.valLenInFilter);
+        readBuffer(this.valBuffer, this.valInFilter);
+        this.noBufferedValues = this.noBufferedRecords;
+        this.valuesDecompressed = true;
+      }
+    }
+
+    /**
+     * Position valLenIn/valIn to the 'value' corresponding to the 'current' key
+     */
+    private synchronized void seekToCurrentValue() throws IOException {
+      if (!this.blockCompressed) {
+        if (this.decompress) {
+          this.valInFilter.resetState();
+        }
+        this.valBuffer.reset();
+      } else {
+        // Check if this is the first value in the 'block' to be read
+        if (this.lazyDecompress && !this.valuesDecompressed) {
+          // Read the value lengths and values
+          readBuffer(this.valLenBuffer, this.valLenInFilter);
+          readBuffer(this.valBuffer, this.valInFilter);
+          this.noBufferedValues = this.noBufferedRecords;
+          this.valuesDecompressed = true;
+        }
+
+        // Calculate the no. of bytes to skip
+        // Note: 'current' key has already been read!
+        int skipValBytes = 0;
+        final int currentKey = this.noBufferedKeys + 1;
+        for (int i = this.noBufferedValues; i > currentKey; --i) {
+          skipValBytes += WritableUtils.readVInt(this.valLenIn);
+          --this.noBufferedValues;
+        }
+
+        // Skip to the 'val' corresponding to 'current' key
+        if (skipValBytes > 0) {
+          if (this.valIn.skipBytes(skipValBytes) != skipValBytes) {
+            throw new IOException("Failed to seek to " + currentKey
+                + "(th) value!");
+          }
+        }
+      }
+    }
+
+    /**
+     * Get the 'value' corresponding to the last read 'key'.
+     *
+     * @param val : The 'value' to be read.
+     * @throws IOException
+     */
+    public synchronized void getCurrentValue(final Writable val) throws IOException {
+      if (val instanceof Configurable) {
+        ((Configurable) val).setConf(this.conf);
+      }
+
+      // Position stream to 'current' value
+      seekToCurrentValue();
+
+      if (!this.blockCompressed) {
+        val.readFields(this.valIn);
+
+        if (this.valIn.read() > 0) {
+          LOG.info("available bytes: " + this.valIn.available());
+          throw new IOException(val + " read "
+              + (this.valBuffer.getPosition() - this.keyLength) + " bytes, should read "
+              + (this.valBuffer.getLength() - this.keyLength));
+        }
+      } else {
+        // Get the value
+        final int valLength = WritableUtils.readVInt(this.valLenIn);
+        val.readFields(this.valIn);
+
+        // Read another compressed 'value'
+        --this.noBufferedValues;
+
+        // Sanity check
+        if (valLength < 0) {
+          LOG.debug(val + " is a zero-length value");
+        }
+      }
+
+    }
+
+    /**
+     * Get the 'value' corresponding to the last read 'key'.
+     *
+     * @param val : The 'value' to be read.
+     * @throws IOException
+     */
+    public synchronized Object getCurrentValue(Object val) throws IOException {
+      if (val instanceof Configurable) {
+        ((Configurable) val).setConf(this.conf);
+      }
+
+      // Position stream to 'current' value
+      seekToCurrentValue();
+
+      if (!this.blockCompressed) {
+        val = deserializeValue(val);
+
+        if (this.valIn.read() > 0) {
+          LOG.info("available bytes: " + this.valIn.available());
+          throw new IOException(val + " read "
+              + (this.valBuffer.getPosition() - this.keyLength) + " bytes, should read "
+              + (this.valBuffer.getLength() - this.keyLength));
+        }
+      } else {
+        // Get the value
+        final int valLength = WritableUtils.readVInt(this.valLenIn);
+        val = deserializeValue(val);
+
+        // Read another compressed 'value'
+        --this.noBufferedValues;
+
+        // Sanity check
+        if (valLength < 0) {
+          LOG.debug(val + " is a zero-length value");
+        }
+      }
+      return val;
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private Object deserializeValue(final Object val) throws IOException {
+      return this.valDeserializer.deserialize(val);
+    }
+
+    /**
+     * Read the next key in the file into <code>key</code>, skipping its value.
+     * True if another entry exists, and false at end of file.
+     */
+    public synchronized boolean next(final Writable key) throws IOException {
+      if (key.getClass() != getKeyClass())
+        throw new IOException("wrong key class: " + key.getClass().getName()
+            + " is not " + this.keyClass);
+
+      if (!this.blockCompressed) {
+        this.outBuf.reset();
+
+        this.keyLength = next(this.outBuf);
+        if (this.keyLength < 0)
+          return false;
+
+        this.valBuffer.reset(this.outBuf.getData(), this.outBuf.getLength());
+
+        key.readFields(this.valBuffer);
+        this.valBuffer.mark(0);
+        if (this.valBuffer.getPosition() != this.keyLength) {
+          throw new IOException(key + " read " + this.valBuffer.getPosition()
+              + " bytes, should read " + this.keyLength);
+        }
+      } else {
+        // Reset syncSeen
+        this.syncSeen = false;
+
+        if (this.noBufferedKeys == 0) {
+          try {
+            readBlock();
+          } catch (final EOFException eof) {
+            return false;
+          }
+        }
+
+        final int keyLength = WritableUtils.readVInt(this.keyLenIn);
+
+        // Sanity check
+        if (keyLength < 0) {
+          return false;
+        }
+
+        // Read another compressed 'key'
+        key.readFields(this.keyIn);
+        --this.noBufferedKeys;
+      }
+
+      return true;
+    }
+
+    /**
+     * Read the next key/value pair in the file into <code>key</code> and
+     * <code>val</code>. Returns true if such a pair exists and false when at
+     * end of file
+     */
+    public synchronized boolean next(final Writable key, final Writable val)
+        throws IOException {
+      if (val.getClass() != getValueClass())
+        throw new IOException("wrong value class: " + val + " is not "
+            + this.valClass);
+
+      final boolean more = next(key);
+
+      if (more) {
+        getCurrentValue(val);
+      }
+
+      return more;
+    }
+
+    /**
+     * Read and return the next record length, potentially skipping over a sync
+     * block.
+     *
+     * @return the length of the next record or -1 if there is no next record
+     * @throws IOException
+     */
+    private synchronized int readRecordLength() throws IOException {
+      if (this.in.getPos() >= this.end) {
+        return -1;
+      }
+      int length = this.in.readInt();
+      if (this.version > 1 && this.sync != null && length == SYNC_ESCAPE) { // process a
+                                                                  // sync entry
+        this.in.readFully(this.syncCheck); // read syncCheck
+        if (!Arrays.equals(this.sync, this.syncCheck)) // check it
+          throw new IOException("File is corrupt!");
+        this.syncSeen = true;
+        if (this.in.getPos() >= this.end) {
+          return -1;
+        }
+        length = this.in.readInt(); // re-read length
+      } else {
+        this.syncSeen = false;
+      }
+
+      return length;
+    }
+
+    /**
+     * Read the next key/value pair in the file into <code>buffer</code>.
+     * Returns the length of the key read, or -1 if at end of file. The length
+     * of the value may be computed by calling buffer.getLength() before and
+     * after calls to this method.
+     */
+    /**
+     * @deprecated Call
+     *             {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}.
+     */
+    public synchronized int next(final DataOutputBuffer buffer) throws IOException {
+      // Unsupported for block-compressed sequence files
+      if (this.blockCompressed) {
+        throw new IOException(
+            "Unsupported call for block-compressed"
+                + " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
+      }
+      try {
+        final int length = readRecordLength();
+        if (length == -1) {
+          return -1;
+        }
+        final int keyLength = this.in.readInt();
+        buffer.write(this.in, length);
+        return keyLength;
+      } catch (final ChecksumException e) { // checksum failure
+        handleChecksumException(e);
+        return next(buffer);
+      }
+    }
+
+    public ValueBytes createValueBytes() {
+      ValueBytes val = null;
+      if (!this.decompress || this.blockCompressed) {
+        val = new UncompressedBytes();
+      } else {
+        val = new CompressedBytes(this.codec);
+      }
+      return val;
+    }
+
+    /**
+     * Read 'raw' records.
+     *
+     * @param key - The buffer into which the key is read
+     * @param val - The 'raw' value
+     * @return Returns the total record length or -1 for end of file
+     * @throws IOException
+     */
+    public synchronized int nextRaw(final DataOutputBuffer key, final ValueBytes val)
+        throws IOException {
+      if (!this.blockCompressed) {
+        final int length = readRecordLength();
+        if (length == -1) {
+          return -1;
+        }
+        final int keyLength = this.in.readInt();
+        final int valLength = length - keyLength;
+        key.write(this.in, keyLength);
+        if (this.decompress) {
+          final CompressedBytes value = (CompressedBytes) val;
+          value.reset(this.in, valLength);
+        } else {
+          final UncompressedBytes value = (UncompressedBytes) val;
+          value.reset(this.in, valLength);
+        }
+
+        return length;
+      } else {
+        // Reset syncSeen
+        this.syncSeen = false;
+
+        // Read 'key'
+        if (this.noBufferedKeys == 0) {
+          if (this.in.getPos() >= this.end)
+            return -1;
+
+          try {
+            readBlock();
+          } catch (final EOFException eof) {
+            return -1;
+          }
+        }
+        final int keyLength = WritableUtils.readVInt(this.keyLenIn);
+        if (keyLength < 0) {
+          throw new IOException("zero length key found!");
+        }
+        key.write(this.keyIn, keyLength);
+        --this.noBufferedKeys;
+
+        // Read raw 'value'
+        seekToCurrentValue();
+        final int valLength = WritableUtils.readVInt(this.valLenIn);
+        final UncompressedBytes rawValue = (UncompressedBytes) val;
+        rawValue.reset(this.valIn, valLength);
+        --this.noBufferedValues;
+
+        return (keyLength + valLength);
+      }
+
+    }
+
+    /**
+     * Read 'raw' keys.
+     *
+     * @param key - The buffer into which the key is read
+     * @return Returns the key length or -1 for end of file
+     * @throws IOException
+     */
+    public int nextRawKey(final DataOutputBuffer key) throws IOException {
+      if (!this.blockCompressed) {
+        this.recordLength = readRecordLength();
+        if (this.recordLength == -1) {
+          return -1;
+        }
+        this.keyLength = this.in.readInt();
+        key.write(this.in, this.keyLength);
+        return this.keyLength;
+      } else {
+        // Reset syncSeen
+        this.syncSeen = false;
+
+        // Read 'key'
+        if (this.noBufferedKeys == 0) {
+          if (this.in.getPos() >= this.end)
+            return -1;
+
+          try {
+            readBlock();
+          } catch (final EOFException eof) {
+            return -1;
+          }
+        }
+        final int keyLength = WritableUtils.readVInt(this.keyLenIn);
+        if (keyLength < 0) {
+          throw new IOException("zero length key found!");
+        }
+        key.write(this.keyIn, keyLength);
+        --this.noBufferedKeys;
+
+        return keyLength;
+      }
+
+    }
+
+    /**
+     * Read the next key in the file, skipping its value. Return null at end of
+     * file.
+     */
+    public synchronized Object next(Object key) throws IOException {
+      if (key != null && key.getClass() != getKeyClass()) {
+        throw new IOException("wrong key class: " + key.getClass().getName()
+            + " is not " + this.keyClass);
+      }
+
+      if (!this.blockCompressed) {
+        this.outBuf.reset();
+
+        this.keyLength = next(this.outBuf);
+        if (this.keyLength < 0)
+          return null;
+
+        this.valBuffer.reset(this.outBuf.getData(), this.outBuf.getLength());
+
+        key = deserializeKey(key);
+        this.valBuffer.mark(0);
+        if (this.valBuffer.getPosition() != this.keyLength) {
+          throw new IOException(key + " read " + this.valBuffer.getPosition()
+              + " bytes, should read " + this.keyLength);
+        }
+      } else {
+        // Reset syncSeen
+        this.syncSeen = false;
+
+        if (this.noBufferedKeys == 0) {
+          try {
+            readBlock();
+          } catch (final EOFException eof) {
+            return null;
+          }
+        }
+
+        final int keyLength = WritableUtils.readVInt(this.keyLenIn);
+
+        // Sanity check
+        if (keyLength < 0) {
+          return null;
+        }
+
+        // Read another compressed 'key'
+        key = deserializeKey(key);
+        --this.noBufferedKeys;
+      }
+
+      return key;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Object deserializeKey(final Object key) throws IOException {
+      return this.keyDeserializer.deserialize(key);
+    }
+
+    /**
+     * Read 'raw' values.
+     *
+     * @param val - The 'raw' value
+     * @return Returns the value length
+     * @throws IOException
+     */
+    public synchronized int nextRawValue(final ValueBytes val) throws IOException {
+
+      // Position stream to current value
+      seekToCurrentValue();
+
+      if (!this.blockCompressed) {
+        final int valLength = this.recordLength - this.keyLength;
+        if (this.decompress) {
+          final CompressedBytes value = (CompressedBytes) val;
+          value.reset(this.in, valLength);
+        } else {
+          final UncompressedBytes value = (UncompressedBytes) val;
+          value.reset(this.in, valLength);
+        }
+
+        return valLength;
+      } else {
+        final int valLength = WritableUtils.readVInt(this.valLenIn);
+        final UncompressedBytes rawValue = (UncompressedBytes) val;
+        rawValue.reset(this.valIn, valLength);
+        --this.noBufferedValues;
+        return valLength;
+      }
+
+    }
+
+    private void handleChecksumException(final ChecksumException e)
+        throws IOException {
+      if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
+        LOG.warn("Bad checksum at " + getPosition() + ". Skipping entries.");
+        sync(getPosition() + this.conf.getInt("io.bytes.per.checksum", 512));
+      } else {
+        throw e;
+      }
+    }
+
+    /**
+     * Set the current byte position in the input file.
+     *
+     * <p>
+     * The position passed must be a position returned by
+     * {@link AzkabanSequenceFileReader.Writer#getLength()} when writing this
+     * file. To seek to an arbitrary position, use
+     * {@link Reader#sync(long)}.
+     */
+    public synchronized void seek(final long position) throws IOException {
+      this.in.seek(position);
+      if (this.blockCompressed) { // trigger block read
+        this.noBufferedKeys = 0;
+        this.valuesDecompressed = true;
+      }
+    }
+
+    /** Seek to the next sync mark past a given position. */
+    public synchronized void sync(final long position) throws IOException {
+      if (position + SYNC_SIZE >= this.end) {
+        seek(this.end);
+        return;
+      }
+
+      try {
+        seek(position + 4); // skip escape
+        this.in.readFully(this.syncCheck);
+        final int syncLen = this.sync.length;
+        for (int i = 0; this.in.getPos() < this.end; i++) {
+          int j = 0;
+          for (; j < syncLen; j++) {
+            if (this.sync[j] != this.syncCheck[(i + j) % syncLen])
+              break;
+          }
+          if (j == syncLen) {
+            this.in.seek(this.in.getPos() - SYNC_SIZE); // position before sync
+            return;
+          }
+          this.syncCheck[i % syncLen] = this.in.readByte();
+        }
+      } catch (final ChecksumException e) { // checksum failure
+        handleChecksumException(e);
+      }
+    }
+
+    /** Returns true iff the previous call to next passed a sync mark. */
+    public boolean syncSeen() {
+      return this.syncSeen;
+    }
+
+    /** Return the current byte position in the input file. */
+    public synchronized long getPosition() throws IOException {
+      return this.in.getPos();
+    }
+
+    @Override
+    /** Returns the name of the file. */
+    public String toString() {
+      return this.file.toString();
+    }
+
+  }
+
+} // SequenceFile
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/BsonFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/BsonFileViewer.java
new file mode 100644
index 0000000..e5f2aff
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/BsonFileViewer.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.bson.BSONObject;
+import org.bson.BasicBSONCallback;
+import org.bson.BasicBSONDecoder;
+
+import com.mongodb.util.JSON;
+
+/**
+ * File viewer for Mongo bson files.
+ *
+ * @author adilaijaz
+ */
+public final class BsonFileViewer extends HdfsFileViewer {
+
+  /**
+   * The maximum time spent, in milliseconds, while reading records from the
+   * file.
+   */
+  private static long STOP_TIME = 2000l;
+
+  private static final String VIEWER_NAME = "BSON";
+
+  @Override
+  public String getName() {
+    return VIEWER_NAME;
+  }
+
+  @Override
+  public Set<Capability> getCapabilities(FileSystem fs, Path path)
+      throws AccessControlException {
+    if (path.getName().endsWith(".bson")) {
+      return EnumSet.of(Capability.READ);
+    }
+    return EnumSet.noneOf(Capability.class);
+  }
+
+  @Override
+  public void displayFile(FileSystem fs, Path path, OutputStream outStream,
+      int startLine, int endLine) throws IOException {
+
+    FSDataInputStream in = null;
+    try {
+      in = fs.open(path, 16 * 1024 * 1024);
+
+      long endTime = System.currentTimeMillis() + STOP_TIME;
+
+      BasicBSONCallback callback = new BasicBSONCallback();
+      BasicBSONDecoder decoder = new BasicBSONDecoder();
+
+      /*
+       * keep reading and rendering bsonObjects until one of these conditions is
+       * met:
+       *
+       * a. we have rendered all bsonObjects desired. b. we have run out of
+       * time.
+       */
+      for (int lineno = 1; lineno <= endLine
+          && System.currentTimeMillis() <= endTime; lineno++) {
+        if (lineno < startLine) {
+          continue;
+        }
+
+        callback.reset();
+        decoder.decode(in, callback);
+
+        BSONObject value = (BSONObject) callback.get();
+
+        StringBuilder bldr = new StringBuilder();
+        bldr.append("\n\n Record ");
+        bldr.append(lineno);
+        bldr.append('\n');
+        JSON.serialize(value, bldr);
+        outStream.write(bldr.toString().getBytes("UTF-8"));
+      }
+    } catch (IOException e) {
+      outStream
+          .write(("Error in display avro file: " + e.getLocalizedMessage())
+              .getBytes("UTF-8"));
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+      outStream.flush();
+    }
+  }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/Capability.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/Capability.java
new file mode 100644
index 0000000..aa74c81
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/Capability.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+public enum Capability {
+  READ,
+  SCHEMA
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ContentType.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ContentType.java
new file mode 100644
index 0000000..a02b030
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ContentType.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+public enum ContentType {
+  TEXT,
+  HTML
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HdfsBrowserServlet.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HdfsBrowserServlet.java
new file mode 100644
index 0000000..d85cdb7
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HdfsBrowserServlet.java
@@ -0,0 +1,521 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.log4j.Logger;
+
+import azkaban.security.commons.HadoopSecurityManager;
+import azkaban.security.commons.HadoopSecurityManagerException;
+import azkaban.user.User;
+import azkaban.utils.Props;
+import azkaban.server.session.Session;
+import azkaban.webapp.servlet.LoginAbstractAzkabanServlet;
+import azkaban.webapp.servlet.Page;
+
+public class HdfsBrowserServlet extends LoginAbstractAzkabanServlet {
+  private static final long serialVersionUID = 1L;
+  private static final String PROXY_USER_SESSION_KEY =
+      "hdfs.browser.proxy.user";
+  private static final String HADOOP_SECURITY_MANAGER_CLASS_PARAM =
+      "hadoop.security.manager.class";
+
+  private static final int DEFAULT_FILE_MAX_LINES = 1000;
+
+  private int fileMaxLines;
+  private int defaultStartLine;
+  private int defaultEndLine;
+
+  private static Logger logger = Logger.getLogger(HdfsBrowserServlet.class);
+
+  private ArrayList<HdfsFileViewer> viewers = new ArrayList<HdfsFileViewer>();
+
+  private HdfsFileViewer defaultViewer;
+
+  private Props props;
+  private boolean shouldProxy;
+  private boolean allowGroupProxy;
+
+  private String viewerName;
+  private String viewerPath;
+
+  private HadoopSecurityManager hadoopSecurityManager;
+
+  public HdfsBrowserServlet(Props props) {
+    this.props = props;
+    viewerName = props.getString("viewer.name");
+    viewerPath = props.getString("viewer.path");
+    fileMaxLines = props.getInt("file.max.lines", DEFAULT_FILE_MAX_LINES);
+    defaultStartLine = 1;
+    defaultEndLine = fileMaxLines;
+  }
+
+  @Override
+  public void init(ServletConfig config) throws ServletException {
+    super.init(config);
+
+    shouldProxy = props.getBoolean("azkaban.should.proxy", false);
+    allowGroupProxy = props.getBoolean("allow.group.proxy", false);
+    logger.info("Hdfs browser should proxy: " + shouldProxy);
+
+    props.put("fs.hdfs.impl.disable.cache", "true");
+
+    try {
+      hadoopSecurityManager = loadHadoopSecurityManager(props, logger);
+    } catch (RuntimeException e) {
+      e.printStackTrace();
+      throw new RuntimeException("Failed to get hadoop security manager!"
+          + e.getCause());
+    }
+
+    defaultViewer = new TextFileViewer();
+
+    viewers.add(new HtmlFileViewer());
+    viewers.add(new ORCFileViewer());
+    viewers.add(new AvroFileViewer());
+    viewers.add(new ParquetFileViewer());
+//    viewers.add(new JsonSequenceFileViewer());
+    viewers.add(new ImageFileViewer());
+    viewers.add(new BsonFileViewer());
+
+    viewers.add(defaultViewer);
+
+    logger.info("HDFS Browser initiated");
+  }
+
+  private HadoopSecurityManager loadHadoopSecurityManager(Props props,
+      Logger logger) throws RuntimeException {
+
+    Class<?> hadoopSecurityManagerClass =
+        props.getClass(HADOOP_SECURITY_MANAGER_CLASS_PARAM, true,
+            HdfsBrowserServlet.class.getClassLoader());
+    logger.info("Initializing hadoop security manager "
+        + hadoopSecurityManagerClass.getName());
+    HadoopSecurityManager hadoopSecurityManager = null;
+
+    try {
+      Method getInstanceMethod =
+          hadoopSecurityManagerClass.getMethod("getInstance", Props.class);
+      hadoopSecurityManager =
+          (HadoopSecurityManager) getInstanceMethod.invoke(
+              hadoopSecurityManagerClass, props);
+    } catch (InvocationTargetException e) {
+      logger.error("Could not instantiate Hadoop Security Manager "
+          + hadoopSecurityManagerClass.getName() + e.getCause());
+      throw new RuntimeException(e.getCause());
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException(e.getCause());
+    }
+
+    return hadoopSecurityManager;
+  }
+
+  private FileSystem getFileSystem(String username)
+      throws HadoopSecurityManagerException {
+    return hadoopSecurityManager.getFSAsUser(username);
+  }
+
+  private void errorPage(String user, HttpServletRequest req,
+      HttpServletResponse resp, Session session, String error) {
+    Page page =
+        newPage(req, resp, session,
+            "azkaban/viewer/hdfs/velocity/hdfs-browser.vm");
+    page.add("error_message", "Error: " + error);
+    page.add("user", user);
+    page.add("allowproxy", allowGroupProxy);
+    page.add("no_fs", "true");
+    page.add("viewerName", viewerName);
+    page.render();
+  }
+
+  private void errorAjax(HttpServletResponse resp, Map<String, Object> ret,
+      String error) throws IOException {
+    ret.put("error", error);
+    this.writeJSON(resp, ret);
+  }
+
+  private String getUsername(HttpServletRequest req, Session session)
+      throws ServletException {
+    User user = session.getUser();
+    String username = user.getUserId();
+    if (hasParam(req, "action") && getParam(req, "action").equals("goHomeDir")) {
+      username = getParam(req, "proxyname");
+    } else if (allowGroupProxy) {
+      String proxyName =
+          (String) session.getSessionData(PROXY_USER_SESSION_KEY);
+      if (proxyName != null) {
+        username = proxyName;
+      }
+    }
+    return username;
+  }
+
+  @Override
+  protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
+      Session session) throws ServletException, IOException {
+    String username = getUsername(req, session);
+    boolean ajax = hasParam(req, "ajax");
+    try {
+      if (ajax) {
+        handleAjaxAction(username, req, resp, session);
+      } else {
+        handleFsDisplay(username, req, resp, session);
+      }
+    } catch (Exception e) {
+      throw new IllegalStateException("Error processing request: "
+          + e.getMessage(), e);
+    }
+  }
+
+  @Override
+  protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
+      Session session) throws ServletException, IOException {
+    User user = session.getUser();
+    if (!hasParam(req, "action")) {
+      return;
+    }
+
+    HashMap<String, String> results = new HashMap<String, String>();
+    String action = getParam(req, "action");
+    if (action.equals("changeProxyUser")) {
+      if (hasParam(req, "proxyname")) {
+        String newProxyname = getParam(req, "proxyname");
+        if (user.getUserId().equals(newProxyname)
+            || user.isInGroup(newProxyname)
+            || user.getRoles().contains("admin")) {
+          session.setSessionData(PROXY_USER_SESSION_KEY, newProxyname);
+        } else {
+          results.put("error", "User '" + user.getUserId()
+              + "' cannot proxy as '" + newProxyname + "'");
+        }
+      }
+    } else {
+      results.put("error", "action param is not set");
+    }
+
+    this.writeJSON(resp, results);
+  }
+
+  private Path getPath(HttpServletRequest req) {
+    String prefix = req.getContextPath() + req.getServletPath();
+    String fsPath = req.getRequestURI().substring(prefix.length());
+    if (fsPath.length() == 0) {
+      fsPath = "/";
+    }
+    return new Path(fsPath);
+  }
+
+  private void getPathSegments(Path path, List<Path> paths,
+      List<String> segments) {
+    Path curr = path;
+    while (curr.getParent() != null) {
+      paths.add(curr);
+      segments.add(curr.getName());
+      curr = curr.getParent();
+    }
+    Collections.reverse(paths);
+    Collections.reverse(segments);
+  }
+
+  private String getHomeDir(FileSystem fs) {
+    String homeDirString = fs.getHomeDirectory().toString();
+    if (homeDirString.startsWith("file:")) {
+      return homeDirString.substring("file:".length());
+    }
+    return homeDirString.substring(fs.getUri().toString().length());
+  }
+
+  private void handleFsDisplay(String user, HttpServletRequest req,
+      HttpServletResponse resp, Session session) throws IOException,
+      ServletException, IllegalArgumentException, IllegalStateException {
+    FileSystem fs = null;
+    try {
+      fs = getFileSystem(user);
+    } catch (HadoopSecurityManagerException e) {
+      errorPage(user, req, resp, session, "Cannot get FileSystem.");
+      return;
+    }
+
+    Path path = getPath(req);
+    if (logger.isDebugEnabled()) {
+      logger.debug("path: '" + path.toString() + "'");
+    }
+
+    try {
+      if (!fs.exists(path)) {
+        errorPage(user, req, resp, session, path.toUri().getPath()
+            + " does not exist.");
+        fs.close();
+        return;
+      }
+    } catch (IOException ioe) {
+      logger.error("Got exception while checking for existence of path '"
+          + path + "'", ioe);
+      errorPage(user, req, resp, session, path.toUri().getPath()
+          + " Encountered error while trying to detect if path '" + path
+          + "' exists. Reason: " + ioe.getMessage());
+      fs.close();
+      return;
+    }
+
+    if (fs.isFile(path)) {
+      displayFilePage(fs, user, req, resp, session, path);
+    } else if (fs.getFileStatus(path).isDir()) {
+      displayDirPage(fs, user, req, resp, session, path);
+    } else {
+      errorPage(user, req, resp, session,
+          "It exists, it is not a file, and it is not a directory, what "
+              + "is it precious?");
+    }
+    fs.close();
+  }
+
+  private void displayDirPage(FileSystem fs, String user,
+      HttpServletRequest req, HttpServletResponse resp, Session session,
+      Path path) throws IOException {
+
+    Page page =
+        newPage(req, resp, session,
+            "azkaban/viewer/hdfs/velocity/hdfs-browser.vm");
+    page.add("allowproxy", allowGroupProxy);
+    page.add("viewerPath", viewerPath);
+    page.add("viewerName", viewerName);
+
+    List<Path> paths = new ArrayList<Path>();
+    List<String> segments = new ArrayList<String>();
+    getPathSegments(path, paths, segments);
+    page.add("paths", paths);
+    page.add("segments", segments);
+    page.add("user", user);
+    page.add("homedir", getHomeDir(fs));
+
+    try {
+      FileStatus[] subdirs = fs.listStatus(path);
+      page.add("subdirs", subdirs);
+      long size = 0;
+      for (int i = 0; i < subdirs.length; ++i) {
+        if (subdirs[i].isDir()) {
+          continue;
+        }
+        size += subdirs[i].getLen();
+      }
+      page.add("dirsize", size);
+    } catch (AccessControlException e) {
+      page.add("error_message", "Permission denied: " + e.getMessage());
+      page.add("no_fs", "true");
+    } catch (IOException e) {
+      page.add("error_message", "Error: " + e.getMessage());
+    }
+    page.render();
+  }
+
+  private void displayFilePage(FileSystem fs, String user,
+      HttpServletRequest req, HttpServletResponse resp, Session session,
+      Path path) {
+
+    Page page =
+        newPage(req, resp, session, "azkaban/viewer/hdfs/velocity/hdfs-file.vm");
+
+    List<Path> paths = new ArrayList<Path>();
+    List<String> segments = new ArrayList<String>();
+    getPathSegments(path, paths, segments);
+
+    page.add("allowproxy", allowGroupProxy);
+    page.add("viewerPath", viewerPath);
+    page.add("viewerName", viewerName);
+
+    page.add("paths", paths);
+    page.add("segments", segments);
+    page.add("user", user);
+    page.add("path", path.toString());
+    page.add("homedir", getHomeDir(fs));
+
+    try {
+      boolean hasSchema = false;
+      int viewerId = -1;
+      for (int i = 0; i < viewers.size(); ++i) {
+        HdfsFileViewer viewer = viewers.get(i);
+        Set<Capability> capabilities = EnumSet.noneOf(Capability.class);
+        capabilities = viewer.getCapabilities(fs, path);
+        if (capabilities.contains(Capability.READ)) {
+          if (capabilities.contains(Capability.SCHEMA)) {
+            hasSchema = true;
+          }
+          viewerId = i;
+          break;
+        }
+      }
+      page.add("contentType", viewers.get(viewerId).getContentType().name());
+      page.add("viewerId", viewerId);
+      page.add("hasSchema", hasSchema);
+
+      FileStatus status = fs.getFileStatus(path);
+      page.add("status", status);
+
+    } catch (Exception ex) {
+      page.add("no_fs", "true");
+      page.add("error_message", "Error: " + ex.getMessage());
+    }
+    page.render();
+  }
+
+  private void handleAjaxAction(String username, HttpServletRequest request,
+      HttpServletResponse response, Session session) throws ServletException,
+      IOException {
+    Map<String, Object> ret = new HashMap<String, Object>();
+    FileSystem fs = null;
+    try {
+      try {
+        fs = getFileSystem(username);
+      } catch (HadoopSecurityManagerException e) {
+        errorAjax(response, ret, "Cannot get FileSystem.");
+        return;
+      }
+
+      String ajaxName = getParam(request, "ajax");
+      Path path = null;
+      if (!hasParam(request, "path")) {
+        errorAjax(response, ret, "Missing parameter 'path'.");
+        return;
+      }
+
+      path = new Path(getParam(request, "path"));
+      if (!fs.exists(path)) {
+        errorAjax(response, ret, path.toUri().getPath() + " does not exist.");
+        return;
+      }
+
+      if (ajaxName.equals("fetchschema")) {
+        handleAjaxFetchSchema(fs, request, ret, session, path);
+      } else if (ajaxName.equals("fetchfile")) {
+        // Note: fetchFile writes directly to the output stream. Thus, we need
+        // to make sure we do not write to the output stream once this call
+        // returns.
+        ret = null;
+        handleAjaxFetchFile(fs, request, response, session, path);
+      } else {
+        ret.put("error", "Unknown AJAX action " + ajaxName);
+      }
+
+      if (ret != null) {
+        this.writeJSON(response, ret);
+      }
+    } finally {
+      fs.close();
+    }
+  }
+
+  private void handleAjaxFetchSchema(FileSystem fs, HttpServletRequest req,
+      Map<String, Object> ret, Session session, Path path) throws IOException,
+      ServletException {
+    HdfsFileViewer fileViewer = null;
+    try {
+      if (hasParam(req, "viewerId")) {
+        fileViewer = viewers.get(getIntParam(req, "viewerId"));
+        if (!fileViewer.getCapabilities(fs, path).contains(Capability.SCHEMA)) {
+          fileViewer = null;
+        }
+      } else {
+        for (HdfsFileViewer viewer : viewers) {
+          if (viewer.getCapabilities(fs, path).contains(Capability.SCHEMA)) {
+            fileViewer = viewer;
+          }
+        }
+      }
+    } catch (AccessControlException e) {
+      ret.put("error", "Permission denied.");
+    }
+
+    if (fileViewer == null) {
+      ret.put("error", "No viewers can display schema.");
+      return;
+    }
+    ret.put("schema", fileViewer.getSchema(fs, path));
+  }
+
+  private void handleAjaxFetchFile(FileSystem fs, HttpServletRequest req,
+      HttpServletResponse resp, Session session, Path path) throws IOException,
+      ServletException {
+    int startLine = getIntParam(req, "startLine", defaultStartLine);
+    int endLine = getIntParam(req, "endLine", defaultEndLine);
+    OutputStream output = resp.getOutputStream();
+
+    if (endLine < startLine) {
+      output.write(("Invalid range: endLine < startLine.").getBytes("UTF-8"));
+      return;
+    }
+
+    if (endLine - startLine > fileMaxLines) {
+      output.write(("Invalid range: range exceeds max number of lines.")
+          .getBytes("UTF-8"));
+      return;
+    }
+
+    // Use registered viewers to show the file content
+    HdfsFileViewer fileViewer = null;
+    try {
+      if (hasParam(req, "viewerId")) {
+        fileViewer = viewers.get(getIntParam(req, "viewerId"));
+        if (!fileViewer.getCapabilities(fs, path).contains(Capability.READ)) {
+          fileViewer = null;
+        }
+      } else {
+        for (HdfsFileViewer viewer : viewers) {
+          if (viewer.getCapabilities(fs, path).contains(Capability.READ)) {
+            fileViewer = viewer;
+            break;
+          }
+        }
+      }
+      // use default text viewer
+      if (fileViewer == null) {
+        if (defaultViewer.getCapabilities(fs, path).contains(Capability.READ)) {
+          fileViewer = defaultViewer;
+        } else {
+          output.write(("No viewer available for file.").getBytes("UTF-8"));
+          return;
+        }
+      }
+    } catch (AccessControlException e) {
+      output.write(("Permission denied.").getBytes("UTF-8"));
+    }
+
+    fileViewer.displayFile(fs, path, output, startLine, endLine);
+  }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HdfsFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HdfsFileViewer.java
new file mode 100644
index 0000000..9da1ba9
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HdfsFileViewer.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+
+public abstract class HdfsFileViewer {
+  public abstract String getName();
+
+  public Set<Capability> getCapabilities(FileSystem fs, Path path)
+      throws AccessControlException {
+    return EnumSet.noneOf(Capability.class);
+  }
+
+  public abstract void displayFile(FileSystem fs, Path path,
+      OutputStream outStream, int startLine, int endLine) throws IOException;
+
+  public String getSchema(FileSystem fs, Path path) {
+    return null;
+  }
+
+  public ContentType getContentType() {
+    return ContentType.TEXT;
+  }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HtmlFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HtmlFileViewer.java
new file mode 100644
index 0000000..802c64b
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/HtmlFileViewer.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ *
+ * THIS IS AN EXPERIMENTAL FEATURE, USE WITH CAUTION.
+ *
+ * This viewer is aimed to support the rendering of very basic html files.
+ * The content of a html file will be rendered inside an iframe on azkaban
+ * web page to protect from possible malicious javascript code. It does not
+ * support rendering local image files (e.g. image stored on hdfs), but it
+ * does support showing images stored on remote network locations.
+ *
+ * In fact, not just images, but any data that is stored on HDFS are not
+ * accessible from the html page, for example, css and js files. Everything
+ * must either be self contained or referenced with internet location.
+ * (e.g. jquery script hosted on google.com can be fetched, but jquery script
+ * stored on local hdfs cannot)
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.log4j.Logger;
+
+public class HtmlFileViewer extends HdfsFileViewer {
+
+  // only display the first 25M chars. it is used to prevent
+  // showing/downloading gb of data
+  private static final int BUFFER_LIMIT = 25000000;
+  private static final String VIEWER_NAME = "Html";
+  private static final Logger logger = Logger.getLogger(HtmlFileViewer.class);
+  private final Set<String> acceptedSuffix = new HashSet<>();
+
+  public HtmlFileViewer() {
+    this.acceptedSuffix.add(".htm");
+    this.acceptedSuffix.add(".html");
+  }
+
+  @Override
+  public String getName() {
+    return VIEWER_NAME;
+  }
+
+
+  @Override
+  public Set<Capability> getCapabilities(final FileSystem fs, final Path path)
+      throws AccessControlException {
+    final String fileName = path.getName();
+    final int pos = fileName.lastIndexOf('.');
+    if (pos < 0) {
+      return EnumSet.noneOf(Capability.class);
+    }
+
+    final String suffix = fileName.substring(pos).toLowerCase();
+    if (this.acceptedSuffix.contains(suffix)) {
+      return EnumSet.of(Capability.READ);
+    } else {
+      return EnumSet.noneOf(Capability.class);
+    }
+  }
+
+  @Override
+  public void displayFile(final FileSystem fs, final Path path, final OutputStream outputStream,
+      final int startLine, final int endLine) throws IOException {
+
+    if (logger.isDebugEnabled())
+      logger.debug("read in uncompressed html file");
+
+    // BUFFER_LIMIT is the only thing we care about, line limit is redundant and actually not
+    // very useful for html files. Thus using Integer.MAX_VALUE to effectively remove the endLine limit.
+    TextFileViewer.displayFileContent(fs, path, outputStream, startLine, Integer.MAX_VALUE, BUFFER_LIMIT);
+  }
+
+  @Override
+  public ContentType getContentType() {
+    return ContentType.HTML;
+  }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ImageFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ImageFileViewer.java
new file mode 100644
index 0000000..1f65d59
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ImageFileViewer.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.log4j.Logger;
+
+/**
+ * Reads a image file if the file size is not larger than
+ * {@value #MAX_IMAGE_FILE_SIZE}.
+ *
+ * @author ximeng
+ */
+public class ImageFileViewer extends HdfsFileViewer {
+
+  private static Logger logger = Logger.getLogger(ImageFileViewer.class);
+  private static final long MAX_IMAGE_FILE_SIZE = 10485760L;
+  private static final String VIEWER_NAME = "Image";
+
+  private HashSet<String> acceptedSuffix;
+
+  public ImageFileViewer() {
+    final String[] imageSuffix = {
+      ".jpg", ".jpeg", ".tif", ".tiff", ".png", ".gif", ".bmp", ".svg"
+    };
+    acceptedSuffix = new HashSet<String>(Arrays.asList(imageSuffix));
+  }
+
+  @Override
+  public String getName() {
+    return VIEWER_NAME;
+  }
+
+  @Override
+  public Set<Capability> getCapabilities(FileSystem fs, Path path)
+      throws AccessControlException {
+    String fileName = path.getName();
+    int pos = fileName.lastIndexOf('.');
+    if (pos < 0) {
+      return EnumSet.noneOf(Capability.class);
+    }
+
+    String suffix = fileName.substring(pos).toLowerCase();
+    if (acceptedSuffix.contains(suffix)) {
+      long len = 0;
+      try {
+        len = fs.getFileStatus(path).getLen();
+      } catch (AccessControlException e) {
+        throw e;
+      } catch (IOException e) {
+        e.printStackTrace();
+        return EnumSet.noneOf(Capability.class);
+      }
+
+      if (len <= MAX_IMAGE_FILE_SIZE) {
+        return EnumSet.of(Capability.READ);
+      }
+    }
+
+    return EnumSet.noneOf(Capability.class);
+  }
+
+  @Override
+  public void displayFile(FileSystem fs, Path path, OutputStream outputStream,
+      int startLine, int endLine) throws IOException {
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("read in image file");
+    }
+
+    InputStream inputStream = null;
+    try {
+      inputStream = new BufferedInputStream(fs.open(path));
+      BufferedOutputStream output = new BufferedOutputStream(outputStream);
+      long outputSize = 0L;
+      byte[] buffer = new byte[16384];
+      int len;
+      while ((len = inputStream.read(buffer)) != -1) {
+        output.write(buffer, 0, len);
+        outputSize += len;
+        if (outputSize > MAX_IMAGE_FILE_SIZE) {
+          break;
+        }
+      }
+      output.flush();
+    } finally {
+      if (inputStream != null) {
+        inputStream.close();
+      }
+    }
+  }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ORCFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ORCFileViewer.java
new file mode 100644
index 0000000..696dd44
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ORCFileViewer.java
@@ -0,0 +1,154 @@
+package azkaban.viewer.hdfs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.log4j.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import azkaban.viewer.hdfs.utils.SerDeUtilsWrapper;
+
+/**
+ * This class implements a viewer for ORC files
+ *
+ * @author gaggarwa
+ */
+public class ORCFileViewer extends HdfsFileViewer {
+    private static Logger logger = Logger.getLogger(ORCFileViewer.class);
+    // Will spend 5 seconds trying to pull data and then stop.
+    private final static long STOP_TIME = 5000l;
+    private final static int JSON_INDENT = 2;
+
+    private static final String VIEWER_NAME = "ORC";
+
+    @Override
+    public String getName() {
+        return VIEWER_NAME;
+    }
+
+    /**
+     * Get ORCFileViewer functionalities. Currently schema and read are
+     * supported. {@inheritDoc}
+     *
+     * @see HdfsFileViewer#getCapabilities(org.apache.hadoop.fs.FileSystem,
+     *      org.apache.hadoop.fs.Path)
+     */
+    @Override
+    public Set<Capability> getCapabilities(FileSystem fs, Path path) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("orc file path: " + path.toUri().getPath());
+        }
+
+        Reader orcReader = null;
+        RecordReader recordReader = null;
+        try {
+            // no need to close orcreader
+            orcReader = OrcFile.createReader(fs, path);
+            recordReader = orcReader.rows(null);
+        } catch (Exception e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug(path.toUri().getPath() + " is not a ORC file.");
+                logger.debug("Error in opening ORC file: "
+                    + e.getLocalizedMessage());
+            }
+            return EnumSet.noneOf(Capability.class);
+        } finally {
+            if (recordReader != null) {
+                try {
+                    recordReader.close();
+                } catch (IOException e) {
+                    logger.error(e);
+                }
+            }
+        }
+        return EnumSet.of(Capability.READ, Capability.SCHEMA);
+    }
+
+    /**
+     * Reads orc file and write to outputstream in json format {@inheritDoc}
+     *
+     * @throws IOException
+     * @throws
+     *
+     * @see HdfsFileViewer#displayFile(org.apache.hadoop.fs.FileSystem,
+     *      org.apache.hadoop.fs.Path, OutputStream, int, int)
+     */
+    @Override
+    public void displayFile(FileSystem fs, Path path, OutputStream outStream,
+        int startLine, int endLine) throws IOException {
+        if (logger.isDebugEnabled()) {
+            logger.debug("displaying orc file:" + path.toUri().getPath());
+        }
+        StringBuilder ret = new StringBuilder();
+        Reader orcreader = null;
+        RecordReader reader = null;
+        Object row = null;
+        try {
+            int lineNum = 1;
+
+            orcreader = OrcFile.createReader(fs, path);
+            reader = orcreader.rows(null);
+            long endTime = System.currentTimeMillis() + STOP_TIME;
+            while (reader.hasNext() && lineNum <= endLine
+                && System.currentTimeMillis() <= endTime) {
+                row = reader.next(row);
+                if (lineNum >= startLine) {
+                    ret.append(String.format("Record %d:\n", lineNum));
+                    String jsonString =
+                        SerDeUtilsWrapper.getJSON(row,
+                            orcreader.getObjectInspector());
+                    try {
+                        JSONObject jsonobj = new JSONObject(jsonString);
+                        ret.append(jsonobj.toString(JSON_INDENT));
+                    } catch (JSONException e) {
+                        logger.error("Failed to parse json as JSONObject", e);
+                        // default to unformatted json string
+                        ret.append(jsonString);
+                    }
+                    ret.append("\n\n");
+                }
+                lineNum++;
+            }
+            outStream.write(ret.toString().getBytes(StandardCharsets.UTF_8));
+        } catch (IOException e) {
+            outStream.write(("Error in display orc file: " + e
+                .getLocalizedMessage()).getBytes("UTF-8"));
+            throw e;
+        } finally {
+            if (reader != null) {
+                reader.close();
+            }
+        }
+    }
+
+    /**
+     * Get schema in same syntax as in hadoop --orcdump {@inheritDoc}
+     *
+     * @see HdfsFileViewer#getSchema(org.apache.hadoop.fs.FileSystem,
+     *      org.apache.hadoop.fs.Path)
+     */
+    @Override
+    public String getSchema(FileSystem fs, Path path) {
+        String schema = null;
+        try {
+            Reader orcReader = OrcFile.createReader(fs, path);
+            schema = orcReader.getObjectInspector().getTypeName();
+        } catch (IOException e) {
+            logger
+                .warn("Cannot get schema for file: " + path.toUri().getPath());
+            return null;
+        }
+
+        return schema;
+    }
+
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ParquetFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ParquetFileViewer.java
new file mode 100644
index 0000000..0118842
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/ParquetFileViewer.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.log4j.Logger;
+
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+import parquet.avro.AvroParquetReader;
+import parquet.avro.AvroSchemaConverter;
+
+/**
+ * This class implements a viewer for Parquet files.
+ *
+ * @author David Z. Chen (dchen@linkedin.com)
+ */
+public class ParquetFileViewer extends HdfsFileViewer {
+  private static Logger logger = Logger.getLogger(ParquetFileViewer.class);
+
+  // Will spend 5 seconds trying to pull data and then stop.
+  private final static long STOP_TIME = 2000l;
+
+  private static final String VIEWER_NAME = "Parquet";
+
+  @Override
+  public String getName() {
+    return VIEWER_NAME;
+  }
+
+  @Override
+  public Set<Capability> getCapabilities(FileSystem fs, Path path)
+      throws AccessControlException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Parquet file path: " + path.toUri().getPath());
+    }
+
+    AvroParquetReader<GenericRecord> parquetReader = null;
+    try {
+      parquetReader = new AvroParquetReader<GenericRecord>(path);
+    } catch (IOException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(path.toUri().getPath() + " is not a Parquet file.");
+        logger.debug("Error in opening Parquet file: "
+            + e.getLocalizedMessage());
+      }
+      return EnumSet.noneOf(Capability.class);
+    } finally {
+      try {
+        if (parquetReader != null) {
+          parquetReader.close();
+        }
+      } catch (IOException e) {
+        logger.error(e);
+      }
+    }
+    return EnumSet.of(Capability.READ, Capability.SCHEMA);
+  }
+
+  @Override
+  public void displayFile(FileSystem fs, Path path, OutputStream outputStream,
+      int startLine, int endLine) throws IOException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Display Parquet file: " + path.toUri().getPath());
+    }
+
+    JsonGenerator json = null;
+    AvroParquetReader<GenericRecord> parquetReader = null;
+    try {
+      parquetReader = new AvroParquetReader<GenericRecord>(path);
+
+      // Initialize JsonGenerator.
+      json =
+          new JsonFactory()
+              .createJsonGenerator(outputStream, JsonEncoding.UTF8);
+      json.useDefaultPrettyPrinter();
+
+      // Declare the avroWriter encoder that will be used to output the records
+      // as JSON but don't construct them yet because we need the first record
+      // in order to get the Schema.
+      DatumWriter<GenericRecord> avroWriter = null;
+      Encoder encoder = null;
+
+      long endTime = System.currentTimeMillis() + STOP_TIME;
+      int line = 1;
+      while (line <= endLine && System.currentTimeMillis() <= endTime) {
+        GenericRecord record = parquetReader.read();
+        if (record == null) {
+          break;
+        }
+
+        if (avroWriter == null) {
+          Schema schema = record.getSchema();
+          avroWriter = new GenericDatumWriter<GenericRecord>(schema);
+          encoder = EncoderFactory.get().jsonEncoder(schema, json);
+        }
+
+        if (line >= startLine) {
+          String recordStr = "\n\nRecord " + line + ":\n";
+          outputStream.write(recordStr.getBytes("UTF-8"));
+          avroWriter.write(record, encoder);
+          encoder.flush();
+        }
+        ++line;
+      }
+    } catch (IOException e) {
+      outputStream.write(("Error in displaying Parquet file: " + e
+          .getLocalizedMessage()).getBytes("UTF-8"));
+      throw e;
+    } catch (Throwable t) {
+      logger.error(t.getMessage());
+      return;
+    } finally {
+      if (json != null) {
+        json.close();
+      }
+      parquetReader.close();
+    }
+  }
+
+  @Override
+  public String getSchema(FileSystem fs, Path path) {
+    String schema = null;
+    try {
+      AvroParquetReader<GenericRecord> parquetReader =
+          new AvroParquetReader<GenericRecord>(path);
+      GenericRecord record = parquetReader.read();
+      if (record == null) {
+        return null;
+      }
+      Schema avroSchema = record.getSchema();
+      AvroSchemaConverter converter = new AvroSchemaConverter();
+      schema = converter.convert(avroSchema).toString();
+    } catch (IOException e) {
+      logger.warn("Cannot get schema for file: " + path.toUri().getPath());
+      return null;
+    }
+
+    return schema;
+  }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/SequenceFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/SequenceFileViewer.java
new file mode 100644
index 0000000..1ea029a
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/SequenceFileViewer.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.EnumSet;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+
+public abstract class SequenceFileViewer extends HdfsFileViewer {
+
+  protected abstract Set<Capability> getCapabilities(
+      AzkabanSequenceFileReader.Reader reader);
+
+  protected abstract void displaySequenceFile(
+      AzkabanSequenceFileReader.Reader reader, PrintWriter output,
+      int startLine, int endLine) throws IOException;
+
+  @Override
+  public Set<Capability> getCapabilities(final FileSystem fs, final Path path)
+      throws AccessControlException {
+    Set<Capability> result = EnumSet.noneOf(Capability.class);
+    AzkabanSequenceFileReader.Reader reader = null;
+    try {
+      reader =
+          new AzkabanSequenceFileReader.Reader(fs, path, new Configuration());
+      result = getCapabilities(reader);
+    } catch (final AccessControlException e) {
+      throw e;
+    } catch (final IOException e) {
+      return EnumSet.noneOf(Capability.class);
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (final IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+
+    return result;
+  }
+
+  @Override
+  @SuppressWarnings("DefaultCharset")
+  public void displayFile(final FileSystem fs, final Path file, final OutputStream outputStream,
+      final int startLine, final int endLine) throws IOException {
+
+    AzkabanSequenceFileReader.Reader reader = null;
+    final PrintWriter writer = new PrintWriter(outputStream);
+    try {
+      reader =
+          new AzkabanSequenceFileReader.Reader(fs, file, new Configuration());
+      displaySequenceFile(reader, writer, startLine, endLine);
+    } catch (final IOException e) {
+      writer.write("Error opening sequence file " + e);
+      throw e;
+    } finally {
+      if (reader != null) {
+        reader.close();
+      }
+    }
+  }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/TextFileViewer.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/TextFileViewer.java
new file mode 100644
index 0000000..591d4cd
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/TextFileViewer.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.viewer.hdfs;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.log4j.Logger;
+
+public class TextFileViewer extends HdfsFileViewer {
+
+  private static Logger logger = Logger.getLogger(TextFileViewer.class);
+  private HashSet<String> acceptedSuffix = new HashSet<String>();
+
+  // only display the first 1M chars. it is used to prevent
+  // showing/downloading gb of data
+  private static final int BUFFER_LIMIT = 1000000;
+  private static final String VIEWER_NAME = "Text";
+
+  public TextFileViewer() {
+    acceptedSuffix.add(".txt");
+    acceptedSuffix.add(".csv");
+    acceptedSuffix.add(".props");
+    acceptedSuffix.add(".xml");
+    acceptedSuffix.add(".html");
+    acceptedSuffix.add(".json");
+    acceptedSuffix.add(".log");
+  }
+
+  @Override
+  public String getName() {
+    return VIEWER_NAME;
+  }
+
+  @Override
+  public Set<Capability> getCapabilities(FileSystem fs, Path path)
+      throws AccessControlException {
+    return EnumSet.of(Capability.READ);
+  }
+
+  @Override
+  public void displayFile(FileSystem fs, Path path, OutputStream outputStream,
+      int startLine, int endLine) throws IOException {
+
+    if (logger.isDebugEnabled())
+      logger.debug("read in uncompressed text file");
+
+    displayFileContent(fs, path, outputStream, startLine, endLine, BUFFER_LIMIT);
+  }
+
+  @SuppressWarnings("DefaultCharset")
+  static void displayFileContent(FileSystem fs, Path path, OutputStream outputStream,
+      int startLine, int endLine, int bufferLimit) throws IOException {
+
+    InputStream inputStream = null;
+    BufferedReader reader = null;
+    try {
+      inputStream = fs.open(path);
+      reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
+      PrintWriter output = new PrintWriter(outputStream);
+      for (int i = 1; i < startLine; i++) {
+        reader.readLine();
+      }
+
+      int bufferSize = 0;
+      for (int i = startLine; i < endLine; i++) {
+        String line = reader.readLine();
+        if (line == null)
+          break;
+
+        // break if reach the buffer limit
+        bufferSize += line.length();
+        if (bufferSize >= bufferLimit)
+          break;
+
+        output.write(line);
+        output.write("\n");
+      }
+      output.flush();
+    } finally {
+      if (reader != null) {
+        reader.close();
+      }
+      if (inputStream != null) {
+        inputStream.close();
+      }
+    }
+  }
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/utils/SerDeUtilsWrapper.java b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/utils/SerDeUtilsWrapper.java
new file mode 100644
index 0000000..5b5edfd
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/utils/SerDeUtilsWrapper.java
@@ -0,0 +1,235 @@
+package azkaban.viewer.hdfs.utils;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Adapted from @see org.apache.hadoop.hive.serde2.SerDeUtils to escape binary
+ * string and have a valid JSON String
+ */
+public final class SerDeUtilsWrapper {
+
+    /**
+     * Get serialized json using an orc object and corresponding object
+     * inspector Adapted from
+     * {@link org.apache.hadoop.hive.serde2.SerDeUtils#getJSONString(Object, ObjectInspector)}
+     *
+     * @param obj
+     * @param objIns
+     * @return
+     */
+    public static String getJSON(Object obj, ObjectInspector objIns) {
+        StringBuilder sb = new StringBuilder();
+        buildJSONString(sb, obj, objIns);
+        return sb.toString();
+    }
+
+    private static void buildJSONString(StringBuilder sb, Object obj,
+        ObjectInspector objIns) {
+        String nullStr = "null";
+        switch (objIns.getCategory()) {
+        case PRIMITIVE: {
+            PrimitiveObjectInspector poi = (PrimitiveObjectInspector) objIns;
+            if (obj == null) {
+                sb.append(nullStr);
+            } else {
+                switch (poi.getPrimitiveCategory()) {
+                case BOOLEAN: {
+                    boolean b = ((BooleanObjectInspector) poi).get(obj);
+                    sb.append(b ? "true" : "false");
+                    break;
+                }
+                case BYTE: {
+                    sb.append(((ByteObjectInspector) poi).get(obj));
+                    break;
+                }
+                case SHORT: {
+                    sb.append(((ShortObjectInspector) poi).get(obj));
+                    break;
+                }
+                case INT: {
+                    sb.append(((IntObjectInspector) poi).get(obj));
+                    break;
+                }
+                case LONG: {
+                    sb.append(((LongObjectInspector) poi).get(obj));
+                    break;
+                }
+                case FLOAT: {
+                    sb.append(((FloatObjectInspector) poi).get(obj));
+                    break;
+                }
+                case DOUBLE: {
+                    sb.append(((DoubleObjectInspector) poi).get(obj));
+                    break;
+                }
+                case STRING: {
+                    sb.append('"');
+                    sb.append(SerDeUtils
+                        .escapeString(((StringObjectInspector) poi)
+                            .getPrimitiveJavaObject(obj)));
+                    sb.append('"');
+                    break;
+                }
+                case VARCHAR: {
+                    sb.append('"');
+                    sb.append(SerDeUtils
+                        .escapeString(((HiveVarcharObjectInspector) poi)
+                            .getPrimitiveJavaObject(obj).toString()));
+                    sb.append('"');
+                    break;
+                }
+                case DATE: {
+                    sb.append('"');
+                    sb.append(((DateObjectInspector) poi)
+                        .getPrimitiveWritableObject(obj));
+                    sb.append('"');
+                    break;
+                }
+                case TIMESTAMP: {
+                    sb.append('"');
+                    sb.append(((TimestampObjectInspector) poi)
+                        .getPrimitiveWritableObject(obj));
+                    sb.append('"');
+                    break;
+                }
+                case BINARY: {
+                    BytesWritable bw =
+                        ((BinaryObjectInspector) objIns)
+                            .getPrimitiveWritableObject(obj);
+                    Text txt = new Text();
+                    txt.set(bw.getBytes(), 0, bw.getLength());
+                    // Fix to serialize binary type
+                    sb.append('"');
+                    sb.append(StringEscapeUtils.escapeJava(txt.toString()));
+                    sb.append('"');
+                    break;
+                }
+                case DECIMAL: {
+                    sb.append(((HiveDecimalObjectInspector) objIns)
+                        .getPrimitiveJavaObject(obj));
+                    break;
+                }
+                default:
+                    throw new RuntimeException("Unknown primitive type: "
+                        + poi.getPrimitiveCategory());
+                }
+            }
+            break;
+        }
+        case LIST: {
+            ListObjectInspector loi = (ListObjectInspector) objIns;
+            ObjectInspector listElementObjectInspector =
+                loi.getListElementObjectInspector();
+            List<?> olist = loi.getList(obj);
+            if (olist == null) {
+                sb.append(nullStr);
+            } else {
+                sb.append(SerDeUtils.LBRACKET);
+                for (int i = 0; i < olist.size(); i++) {
+                    if (i > 0) {
+                        sb.append(SerDeUtils.COMMA);
+                    }
+                    buildJSONString(sb, olist.get(i),
+                        listElementObjectInspector);
+                }
+                sb.append(SerDeUtils.RBRACKET);
+            }
+            break;
+        }
+        case MAP: {
+            MapObjectInspector moi = (MapObjectInspector) objIns;
+            ObjectInspector mapKeyObjectInspector =
+                moi.getMapKeyObjectInspector();
+            ObjectInspector mapValueObjectInspector =
+                moi.getMapValueObjectInspector();
+            Map<?, ?> omap = moi.getMap(obj);
+            if (omap == null) {
+                sb.append(nullStr);
+            } else {
+                sb.append(SerDeUtils.LBRACE);
+                boolean first = true;
+                for (Object entry : omap.entrySet()) {
+                    if (first) {
+                        first = false;
+                    } else {
+                        sb.append(SerDeUtils.COMMA);
+                    }
+                    Map.Entry<?, ?> e = (Map.Entry<?, ?>) entry;
+                    buildJSONString(sb, e.getKey(), mapKeyObjectInspector);
+                    sb.append(SerDeUtils.COLON);
+                    buildJSONString(sb, e.getValue(), mapValueObjectInspector);
+                }
+                sb.append(SerDeUtils.RBRACE);
+            }
+            break;
+        }
+        case STRUCT: {
+            StructObjectInspector soi = (StructObjectInspector) objIns;
+            List<? extends StructField> structFields =
+                soi.getAllStructFieldRefs();
+            if (obj == null) {
+                sb.append(nullStr);
+            } else {
+                sb.append(SerDeUtils.LBRACE);
+                for (int i = 0; i < structFields.size(); i++) {
+                    if (i > 0) {
+                        sb.append(SerDeUtils.COMMA);
+                    }
+                    sb.append(SerDeUtils.QUOTE);
+                    sb.append(structFields.get(i).getFieldName());
+                    sb.append(SerDeUtils.QUOTE);
+                    sb.append(SerDeUtils.COLON);
+                    buildJSONString(sb,
+                        soi.getStructFieldData(obj, structFields.get(i)),
+                        structFields.get(i).getFieldObjectInspector());
+                }
+                sb.append(SerDeUtils.RBRACE);
+            }
+            break;
+        }
+        case UNION: {
+            UnionObjectInspector uoi = (UnionObjectInspector) objIns;
+            if (obj == null) {
+                sb.append(nullStr);
+            } else {
+                sb.append(SerDeUtils.LBRACE);
+                sb.append(uoi.getTag(obj));
+                sb.append(SerDeUtils.COLON);
+                buildJSONString(sb, uoi.getField(obj), uoi
+                    .getObjectInspectors().get(uoi.getTag(obj)));
+                sb.append(SerDeUtils.RBRACE);
+            }
+            break;
+        }
+        default:
+            throw new RuntimeException("Unknown type in ObjectInspector!");
+        }
+    }
+
+}
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/change-proxy-modal.vm b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/change-proxy-modal.vm
new file mode 100644
index 0000000..fd67c65
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/change-proxy-modal.vm
@@ -0,0 +1,40 @@
+#*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *#
+
+      <div class="modal" id="messageDialog">
+        <div class="modal-dialog">
+          <div class="modal-content">
+            <div class="modal-header" id="messageTitle">
+              <button type="button" class="close" data-dismiss="modal" aria-hidden="true">&times;</button>
+              <h4 class="modal-title">Change Proxy User</h4>
+            </div>
+            <div class="modal-body" id="messageDiv">
+              <fieldset>
+                <div class="form-group">
+                  <label for="proxyname" class="col-sm-3 control-label">Proxy User</label>
+                  <div class="col-sm-9">
+                    <input id="proxyname" type="text" name="proxyname" val="${user}" class="form-control">
+                  </div>
+                </div>
+              </fieldset>
+            </div>
+            <div class="modal-footer">
+              <button type="button" class="btn btn-default" data-dismiss="modal">Cancel</button>
+              <button id="submitChangeUserBtn" class="btn btn-primary">Change User</button>
+            </div>
+          </div>
+        </div>
+      </div>
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-browser.vm b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-browser.vm
new file mode 100644
index 0000000..5c1869b
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-browser.vm
@@ -0,0 +1,147 @@
+#*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *#
+
+<!DOCTYPE html>
+<html>
+  <head lang="en">
+
+#parse ("azkaban/webapp/servlet/velocity/style.vm")
+#parse ("azkaban/webapp/servlet/velocity/javascript.vm")
+
+    <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-1.10.1.custom.js"></script>
+    <script type="text/javascript">
+      var contextURL = "${context}";
+      var currentTime = ${currentTime};
+      var timezone = "${timezone}";
+      var homeDir = "${homedir}";
+    </script>
+#if ($allowproxy)
+  #parse ("azkaban/viewer/hdfs/velocity/hdfs-proxy-js.vm")
+#end
+#parse ("azkaban/viewer/hdfs/velocity/hdfs-viewer-css.vm")
+  </head>
+  <body>
+
+#set ($current_page="$viewerName")
+#parse ("azkaban/webapp/servlet/velocity/nav.vm")
+
+#if ($errorMsg)
+  #parse ("azkaban/webapp/servlet/velocity/errormsg.vm")
+#else
+
+    <div class="az-page-header">
+      <div class="container-full">
+        <div class="row">
+          <div class="header-title">
+            <h1><a href="${context}/hdfs">HDFS Browser</a></h1>
+          </div>
+          <div class="header-control">
+            <div class="pull-right header-form">
+  #if ($allowproxy)
+              <strong>Proxy user</strong> ${user}
+              <button type="button" id="changeUserBtn" class="btn btn-sm btn-primary">Change User</button>
+  #end
+              <a id="goHomeLink" class="btn btn-sm btn-default" href="${context}/hdfs${homedir}">Home Directory</a>
+            </div>
+            <div class="clearfix"></div>
+          </div>
+        </div>
+      </div>
+    </div>
+
+    <div class="container-full">
+
+  #parse ("azkaban/webapp/servlet/velocity/alerts.vm")
+  #if (!$no_fs)
+
+      <div class="row">
+        <div class="col-xs-12">
+          <div class="panel panel-default" id="all-hdfsbrowser-content">
+            <div class="panel-heading">
+    #set ($size = $paths.size() - 1)
+              <a class="firstCrumb" href="${context}/hdfs/"> / </a>#if($size >= 0)#foreach($i in [0 ..$size])<a href="$context/hdfs${paths.get($i)}">${segments.get($i)}</a><span> / </span>#end #end
+              <div class="pull-right">
+                <strong>$subdirs.size()</strong> items <strong>$utils.displayBytes($dirsize)</strong> total
+              </div>
+            </div>
+            <table id="hdfs-dir" class="table table-condensed table-striped table-hover table-bordered">
+              <thead>
+                <tr>
+                  <th>File</th>
+                  <th>Permission</th>
+                  <th>Owner/Group</th>
+                  <th>Size</th>
+                  <th>Block Size</th>
+                  <th>Reps</th>
+                  <th>Modified Date</th>
+                </tr>
+              </thead>
+              <tbody>
+    #if ($subdirs)
+      #foreach ($status in $subdirs)
+                <tr>
+                  <td>
+                    <a href="${context}/hdfs${status.getPath().toUri().getPath()}">
+        #if ($status.isDir())
+                      <span class="glyphicon glyphicon-folder-close icon-directory"></span>
+        #else
+                      <span class="glyphicon glyphicon-file icon-file"></span>
+        #end
+                      ${status.path.name}#if($status.isDir())/#end</a>
+                    </a>
+                  </td>
+                  <td>${status.permission}</td>
+                  <td>${status.owner}/${status.group}</td>
+                  <td>
+        #if ($status.isDir())
+                    &ndash;
+        #else
+                    $utils.displayBytes(${status.len})
+        #end
+                  </td>
+                  <td>
+        #if ($status.isDir())
+                    &ndash;
+        #else
+                    $utils.displayBytes(${status.getBlockSize()})
+        #end
+                  </td>
+                  <td>
+        #if ($status.isDir())
+                    &ndash;
+        #else
+                    ${status.getReplication()}
+        #end
+                  </td>
+                  <td>$utils.formatDateTime(${status.modificationTime})</td>
+                </tr>
+      #end
+    #else
+                <tr><td>No Files In This Directory</td></tr>
+    #end
+              </tbody>
+            </table>
+  #end
+
+          </div><!-- /.panel -->
+        </div><!-- /.col-xs-12 -->
+      </div><!-- /.row -->
+
+#parse ("azkaban/viewer/hdfs/velocity/change-proxy-modal.vm")
+#end
+    </div><!-- /.container-full -->
+  </body>
+</html>
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-file.vm b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-file.vm
new file mode 100644
index 0000000..d57e4b5
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-file.vm
@@ -0,0 +1,158 @@
+#*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *#
+
+<!DOCTYPE html>
+<html>
+  <head lang="en">
+
+#parse ("azkaban/webapp/servlet/velocity/style.vm")
+#parse ("azkaban/webapp/servlet/velocity/javascript.vm")
+
+    <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-1.10.1.custom.js"></script>
+    <script type="text/javascript">
+      var contextURL = "${context}";
+      var currentTime = ${currentTime};
+      var timezone = "${timezone}";
+      var homeDir = "${homedir}";
+      var path = "${path}";
+      var hasSchema = ${hasSchema};
+      var viewerId = ${viewerId};
+      var contentType = "${contentType}";
+    </script>
+#if ($allowproxy)
+  #parse ("azkaban/viewer/hdfs/velocity/hdfs-proxy-js.vm")
+#end
+#parse ("azkaban/viewer/hdfs/velocity/hdfs-file-js.vm")
+#parse ("azkaban/viewer/hdfs/velocity/hdfs-viewer-css.vm")
+  </head>
+  <body>
+
+#set ($current_page = "$viewerName")
+#parse ("azkaban/webapp/servlet/velocity/nav.vm")
+
+#if ($errorMsg)
+  #parse ("azkaban/webapp/servlet/velocity/errormsg.vm")
+#else
+
+    <div class="az-page-header">
+      <div class="container-full">
+        <div class="row">
+          <div class="header-title">
+            <h1><a href="${context}/hdfs">HDFS Browser</a></h1>
+          </div>
+          <div class="header-control">
+            <div class="pull-right header-form">
+  #if ($allowproxy)
+              <strong>Proxy user</strong> ${user}
+              <button type="button" id="changeUserBtn" class="btn btn-sm btn-primary">Change User</button>
+  #end
+              <a id="goHomeLink" class="btn btn-sm btn-default" href="${context}/hdfs${homedir}">Home Directory</a>
+            </div>
+            <div class="clearfix"></div>
+          </div>
+        </div>
+      </div>
+    </div>
+
+    <div class="container-full">
+
+  #parse ("azkaban/webapp/servlet/velocity/alerts.vm")
+  #if (!$no_fs)
+
+      <div class="row">
+        <div class="col-xs-12">
+          <div class="panel panel-default" id="all-hdfsbrowser-content">
+            <div class="panel-heading">
+    #set ($size = $paths.size() - 1)
+              <a class="firstCrumb" href="${context}/hdfs/"> / </a>
+    #if ($size >= 0)
+      #set ($end = $size - 1)
+      #foreach ($i in [0 ..$end])
+              <a href="$context/hdfs${paths.get($i)}">${segments.get($i)}</a><span> / </span>
+      #end
+                ${segments.get($size)}
+    #end
+            </div>
+            <table class="table table-condensed table-striped table-bordered">
+              <tbody>
+                <tr>
+                  <td class="property-key">Permission</td>
+                  <td class="property-value-half">${status.permission}</td>
+                  <td class="property-key">Owner/Group</td>
+                  <td class="property-value-half">${status.owner}/${status.group}</td>
+                </tr>
+                <tr>
+                  <td class="property-key">Size</td>
+                  <td class="property-value-half">$utils.displayBytes(${status.len})</td>
+                  <td class="property-key">Block Size</td>
+                  <td class="property-value-half">$utils.displayBytes(${status.getBlockSize()})</td>
+                </tr>
+                <tr>
+                  <td class="property-key">Reps</td>
+                  <td class="property-value-half">${status.getReplication()}</td>
+                  <td class="property-key">Modified Date</td>
+                  <td class="property-value-half">$utils.formatDateTime(${status.modificationTime})</td>
+                </tr>
+              </tbody>
+            </table>
+          </div><!-- /.panel -->
+        </div><!-- /.col-xs-12 -->
+      </div><!-- /.row -->
+
+      <div class="row">
+        <div class="col-xs-12">
+          <ul class="nav nav-tabs nav-sm" id="file-tabs">
+            <li class="active"><a href="#contents" data-toggle="tab">Contents</a></li>
+    #if ($hasSchema)
+            <li><a href="#schema" data-toggle="tab">Schema</a></li>
+    #end
+          </ul>
+
+          <div class="tab-content">
+            <div class="tab-pane active" id="contents">
+              <div class="progress progress-striped active" id="file-contents-loading">
+                <div class="progress-bar progress-bar-info" role="progressbar" aria-valuenow="100" aria-valuemin="0" aria-valuemax="100" style="width: 100%">
+                  <span class="sr-only">Loading...</span>
+                </div>
+              </div>
+
+    #if ($contentType == "HTML")
+              <iframe id="file-contents-iframe" style="display:none;overflow:scroll" sandbox="allow-same-origin"></iframe>
+    #else
+              <pre id="file-contents"></pre>
+    #end
+            </div>
+
+    #if ($hasSchema)
+            <div class="tab-pane" id="schema">
+              <div class="progress progress-striped active" id="file-schema-loading">
+                <div class="progress-bar progress-bar-info" role="progressbar" aria-valuenow="100" aria-valuemin="0" aria-valuemax="100" style="width: 100%">
+                  <span class="sr-only">Loading...</span>
+                </div>
+              </div>
+              <pre id="file-schema"></pre>
+            </div>
+    #end
+          </div>
+        </div>
+      </div>
+
+  #end
+#parse ("azkaban/viewer/hdfs/velocity/change-proxy-modal.vm")
+#end
+    </div><!-- /.container-full -->
+  </body>
+</html>
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-file-js.vm b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-file-js.vm
new file mode 100644
index 0000000..197895a
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-file-js.vm
@@ -0,0 +1,192 @@
+#*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *#
+
+<script type="text/javascript">
+
+$.namespace('azkaban');
+
+azkaban.HdfsSchemaModel = Backbone.Model.extend({
+  initialize: function() {
+    this.set('schema', '');
+  },
+
+  fetchSchema: function(path, viewerId) {
+    var requestURL = '/hdfs';
+    var requestData = {
+      'ajax': 'fetchschema',
+      'path': path,
+      'viewerId': viewerId
+    };
+    var model = this;
+    var successHandler = function(data) {
+      if (data.error != null) {
+        model.set('error', data.error);
+      }
+      if (data.schema != null) {
+        model.set('schema', data.schema);
+      }
+    };
+    $.get(requestURL, requestData, successHandler, 'json');
+  }
+});
+
+azkaban.HdfsSchemaView = Backbone.View.extend({
+  events: {
+  },
+
+  initialize: function(settings) {
+    this.listenTo(this.model, 'change:schema', this.render);
+    this.rendered = false;
+  },
+
+  show: function() {
+    if (this.rendered == true) {
+      return;
+    }
+    this.model.fetchSchema(path, viewerId);
+  },
+
+  render: function(self) {
+    if (this.rendered == true) {
+      return;
+    }
+
+    var schema = this.model.get('schema');
+    if (schema == null) {
+      return;
+    }
+
+    $('#file-schema-loading').hide();
+    $('#file-schema').show().text(schema);
+    this.rendered = true;
+  }
+});
+
+azkaban.HdfsFileModel = Backbone.Model.extend({
+  initialize: function() {
+    this.set('contents', '');
+  },
+
+  fetchFile: function(path, viewerId) {
+    var requestURL = '/hdfs';
+    var requestData = {
+      'ajax': 'fetchfile',
+      'path': path,
+      'viewerId': viewerId,
+    };
+    var model = this;
+    var successHandler = function(data) {
+      if (data.error != null) {
+        model.set('error', data.error);
+      }
+      if (data === '') {
+        data = 'Oops this is an empty file !!';
+      }
+      model.set('contents', data);
+    };
+    $.get(requestURL, requestData, successHandler, 'text');
+  }
+});
+
+azkaban.HdfsFileView = Backbone.View.extend({
+  events: {
+  },
+
+  initialize: function(settings) {
+    this.listenTo(this.model, 'change:contents', this.render);
+    this.rendered = false;
+  },
+
+  show: function() {
+    if (this.rendered == true) {
+      return;
+    }
+    this.model.fetchFile(path, viewerId);
+  },
+
+  render: function(self) {
+    if (this.rendered == true) {
+      return;
+    }
+
+    var file = this.model.get('contents');
+    if (file == null) {
+      return;
+    }
+
+    $('#file-contents-loading').hide();
+
+    if (contentType == "HTML") {
+      var iframe = document.getElementById('file-contents-iframe');
+      // show iframe (initially it's hidden)
+      iframe.style.display = "inline";
+      // Write content to iframe. We can't just set the html source content as
+      // inner html of the iframe. The iframe hosts a completely separate dom
+      // structure than the current html, thus setting the inner HTML will not
+      // work. The only way to store html inside iframe is by either writing
+      // the content directly into it or set the content in srcdoc attribute.
+      // Since either way is fine, will go with write method since it looks
+      // nicer when you inspect the html structure.
+      var doc = iframe.contentWindow.document;
+      doc.open();
+      doc.write(file);
+      doc.close();
+      // adjust iframe size
+      iframe.width  = iframe.contentWindow.document.body.scrollWidth;
+      iframe.height = iframe.contentWindow.document.body.scrollHeight;
+    } else {
+      $('#file-contents').show().text(file);
+    }
+    this.rendered = true;
+  }
+});
+
+var schemaModel;
+var schemaView;
+
+var fileModel;
+var fileView;
+
+$(function () {
+  $('a[data-toggle="tab"]').on('shown.bs.tab', function (e) {
+    var current = e.target;
+    var previous = e.relatedTarget;
+    var hash = $(current).attr('href');
+    if (hash == '#schema' && schemaModel != null) {
+      schemaView.show();
+    }
+    else if (hash == '#contents') {
+      fileView.show();
+    }
+  });
+
+  if (hasSchema) {
+    schemaModel = new azkaban.HdfsSchemaModel();
+    schemaView = new azkaban.HdfsSchemaView({
+      el: $("#schema"),
+      model: schemaModel
+    });
+  }
+
+  fileModel = new azkaban.HdfsFileModel();
+  fileView = new azkaban.HdfsFileView({
+    el: $("#contents"),
+    model: fileModel
+  });
+
+  fileView.show();
+});
+</script>
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-proxy-js.vm b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-proxy-js.vm
new file mode 100644
index 0000000..99dfe27
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-proxy-js.vm
@@ -0,0 +1,47 @@
+#*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *#
+
+<script type="text/javascript">
+$(function() {
+  $('#changeUserBtn').click( function() {
+    $('#messageDialog').modal();
+    $('#proxyname').focus();
+  });
+
+  $('#submitChangeUserBtn').click( function() {
+    var user = $('#proxyname').val();
+    var requestData = {
+      'action': 'changeProxyUser',
+      'proxyname': user
+    };
+    var successHandler = function(data) {
+      if (data.error) {
+        $('#messageBox').text(data.error);
+      }
+      else {
+        location.reload(true);
+      }
+    };
+    $.post("", requestData, successHandler);
+  });
+
+  $('#proxyname').keyup(function(event) {
+    if (event.key == "Enter") {
+      $('#submitChangeUserBtn').click();
+    }
+  });
+});
+</script>
diff --git a/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-viewer-css.vm b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-viewer-css.vm
new file mode 100644
index 0000000..2b536e0
--- /dev/null
+++ b/az-hdfs-viewer/src/main/java/azkaban/viewer/hdfs/velocity/hdfs-viewer-css.vm
@@ -0,0 +1,30 @@
+#*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *#
+
+<style type="text/css">
+.icon-directory {
+  color: #428bca;
+}
+
+.icon-file {
+  color: #cccccc;
+}
+
+#file-contents,
+#file-schema {
+  display: none;
+}
+</style>
diff --git a/az-hdfs-viewer/src/test/java/azkaban/viewer/hdfs/HtmlFileViewerTest.java b/az-hdfs-viewer/src/test/java/azkaban/viewer/hdfs/HtmlFileViewerTest.java
new file mode 100644
index 0000000..f093973
--- /dev/null
+++ b/az-hdfs-viewer/src/test/java/azkaban/viewer/hdfs/HtmlFileViewerTest.java
@@ -0,0 +1,87 @@
+package azkaban.viewer.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AccessControlException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * <pre>
+ * Test cases for HtmlFileViewer
+ *
+ * Validate accepted capabilities for the viewer and ensures that it
+ * generates the correct content and content type.
+ * </pre>
+ */
+public class HtmlFileViewerTest {
+    private static final String EMPTY_HTM = "TestHtmEmptyFile.htm";
+    private static final String VALID_HTML = "TestHtmlFile.html";
+    FileSystem fs;
+
+    HtmlFileViewer viewer;
+
+    @Before
+    public void setUp() throws IOException {
+        this.fs = new LocalFileSystem();
+        this.fs.initialize(this.fs.getWorkingDirectory().toUri(), new Configuration());
+        this.viewer = new HtmlFileViewer();
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        this.fs.close();
+    }
+
+    @Test
+    public void testCapabilities() throws AccessControlException {
+        Set<Capability> capabilities = this.viewer
+            .getCapabilities(this.fs, getResourcePath(EMPTY_HTM));
+        // READ should be the the one and only capability
+        assertTrue(capabilities.contains(Capability.READ));
+        assertEquals(capabilities.size(), 1);
+
+        capabilities = this.viewer.getCapabilities(this.fs, getResourcePath(VALID_HTML));
+        // READ should be the the one and only capability
+        assertTrue(capabilities.contains(Capability.READ));
+        assertEquals(capabilities.size(), 1);
+    }
+
+    @Test
+    public void testContentType() {
+        assertEquals(ContentType.HTML, this.viewer.getContentType());
+    }
+
+    @Test
+    public void testEmptyFile() throws IOException {
+        final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+        this.viewer.displayFile(this.fs, getResourcePath(EMPTY_HTM), outStream, 1, 2);
+        final String output = outStream.toString();
+        assertTrue(output.isEmpty());
+    }
+
+    @Test
+    @SuppressWarnings("DefaultCharset")
+    public void testValidHtmlFile() throws IOException {
+        final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+        this.viewer.displayFile(this.fs, getResourcePath(VALID_HTML), outStream, 1, 2);
+        final String output = new String(outStream.toByteArray());
+        assertEquals(output, "<p>file content</p>\n");
+    }
+
+    /* Get Path to a file from resource dir */
+    private Path getResourcePath(final String filename) {
+        final String HDFS_VIEWER_ROOT_PATH = "../test/hdfs-viewer-sample-files/";
+        return new Path(HDFS_VIEWER_ROOT_PATH + filename);
+    }
+}
diff --git a/az-hdfs-viewer/src/test/java/azkaban/viewer/hdfs/ORCFileViewerTest.java b/az-hdfs-viewer/src/test/java/azkaban/viewer/hdfs/ORCFileViewerTest.java
new file mode 100644
index 0000000..4f92f25
--- /dev/null
+++ b/az-hdfs-viewer/src/test/java/azkaban/viewer/hdfs/ORCFileViewerTest.java
@@ -0,0 +1,207 @@
+package azkaban.viewer.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Set;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * <pre>
+ * Test cases for ORCFileViewer
+ *
+ * Validate capability for ORC files Validate false capability for avro, text
+ * and parquet schema for ORC files verify raw records from orc files
+ * </pre>
+ */
+@Ignore
+public class ORCFileViewerTest {
+    ORCFileViewer viewer;
+    Set<Capability> supportedCapabilities;
+    Set<Capability> unSupportedCapabilities;
+    FileSystem fs;
+
+    @Before
+    public void setUp() throws IOException {
+        this.fs = new LocalFileSystem();
+        this.fs.initialize(this.fs.getWorkingDirectory().toUri(), new Configuration());
+        this.viewer = new ORCFileViewer();
+        this.supportedCapabilities = EnumSet.of(Capability.READ, Capability.SCHEMA);
+        this.unSupportedCapabilities = EnumSet.noneOf(Capability.class);
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        this.fs.close();
+    }
+
+    @SuppressWarnings("DefaultCharset")
+        /* Calls ORCFileViewer#displayFile and parse results */
+    String displayRecordWrapper(final String filename, final int startRecord, final int endRecord)
+        throws IOException {
+        final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+        this.viewer.displayFile(this.fs, getResourcePath(filename), outStream,
+            startRecord, endRecord);
+        String records = new String(outStream.toByteArray());
+        records = records.replaceAll("Record [0-9]*:", "");
+        records = StringUtils.deleteWhitespace(records);
+        return records;
+    }
+
+    /* Get Path to a file from resource dir */
+    Path getResourcePath(final String filename) {
+        final String HDFS_VIEWER_ROOT_PATH = "../test/hdfs-viewer-sample-files/";
+        return new Path(HDFS_VIEWER_ROOT_PATH + filename);
+    }
+
+    /* verify capability for empty orc files */
+    @Test
+    public void orcEmptyFileCapability() throws IOException {
+        assertEquals(this.supportedCapabilities, this.viewer.getCapabilities(this.fs,
+            getResourcePath("TestOrcFile.emptyFile.orc")));
+    }
+
+    /* verify capability for generic orc files */
+    @Test
+    public void genericORCFileCapability() throws IOException {
+        assertEquals(this.supportedCapabilities, this.viewer.getCapabilities(this.fs,
+            getResourcePath("TestOrcFile.testPredicatePushdown.orc")));
+    }
+
+    /* verify capability for orc files with binary type */
+    @Test
+    public void binaryTypeORCFileCapability() throws IOException {
+        assertEquals(this.supportedCapabilities, this.viewer.getCapabilities(this.fs,
+            getResourcePath("TestOrcFile.testStringAndBinaryStatistics.orc")));
+    }
+
+    /* verify capability for snappy compressed orc file */
+    @Test
+    public void snappyCompressedORCFileCapability() throws IOException {
+        assertEquals(this.supportedCapabilities, this.viewer.getCapabilities(this.fs,
+            getResourcePath("TestOrcFile.testSnappy.orc")));
+    }
+
+    /* verify capability for union type orc file */
+    @Test
+    public void unionTypeORCFileCapability() throws IOException {
+        assertEquals(this.supportedCapabilities, this.viewer.getCapabilities(this.fs,
+            getResourcePath("TestOrcFile.testUnionAndTimestamp.orc")));
+    }
+
+    /* verify capability for avro files */
+    @Test
+    public void noAvroCapability() throws IOException {
+        assertEquals(this.unSupportedCapabilities,
+            this.viewer.getCapabilities(this.fs, getResourcePath("TestAvro.avro")));
+    }
+
+    /* verify capability for text files */
+    @Test
+    public void noTextCapability() throws IOException {
+        assertEquals(this.unSupportedCapabilities,
+            this.viewer.getCapabilities(this.fs, getResourcePath("TestTextFile.txt")));
+    }
+
+    /* verify capability for parquet files */
+    @Test
+    public void noParquetCapability() throws IOException {
+        assertEquals(this.unSupportedCapabilities, this.viewer.getCapabilities(this.fs,
+            getResourcePath("TestParquetFile.parquet")));
+    }
+
+    /* verify schema for empty orc files */
+    @Test
+    public void emptyORCFileSchema() throws IOException {
+        final String schema =
+            "struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int,long1:bigint,"
+                + "float1:float,double1:double,bytes1:binary,string1:string,"
+                + "middle:struct<list:array<struct<int1:int,string1:string>>>,"
+                + "list:array<struct<int1:int,string1:string>>,map:map<string,struct<int1:int,string1:string>>>";
+        assertEquals(schema,
+            this.viewer.getSchema(this.fs, getResourcePath("TestOrcFile.emptyFile.orc")));
+    }
+
+    /* verify schema for generic orc files */
+    @Test
+    public void genericORCFileSchema() throws IOException {
+        assertEquals("struct<int1:int,string1:string>", this.viewer.getSchema(this.fs,
+            getResourcePath("TestOrcFile.testPredicatePushdown.orc")));
+    }
+
+    /* verify schema for orc files with binary type */
+    @Test
+    public void binaryTypeFileSchema() throws IOException {
+        assertEquals("struct<bytes1:binary,string1:string>", this.viewer.getSchema(
+            this.fs,
+            getResourcePath("TestOrcFile.testStringAndBinaryStatistics.orc")));
+    }
+
+    /* verify schema for snappy compressed orc file */
+    @Test
+    public void snappyCompressedFileSchema() throws IOException {
+        assertEquals("struct<int1:int,string1:string>",
+            this.viewer.getSchema(this.fs, getResourcePath("TestOrcFile.testSnappy.orc")));
+    }
+
+    /* verify schema for union type orc file */
+    @Test
+    public void unionTypeFileSchema() throws IOException {
+        assertEquals(
+            "struct<time:timestamp,union:uniontype<int,string>,decimal:decimal>",
+            this.viewer.getSchema(this.fs,
+                getResourcePath("TestOrcFile.testUnionAndTimestamp.orc")));
+    }
+
+    /* verify record display for empty orc files */
+    @Test
+    public void emptyORCFileDisplay() throws IOException {
+        final String actual = displayRecordWrapper("TestOrcFile.emptyFile.orc", 1, 1);
+        assertEquals("", actual);
+    }
+
+    /* verify record display for generic orc files */
+    @Test
+    public void genericORCFileDisplay() throws IOException {
+        final String actual =
+            displayRecordWrapper("TestOrcFile.testPredicatePushdown.orc", 2, 2);
+        assertEquals("{\"int1\":300,\"string1\":\"a\"}", actual);
+    }
+
+    /* verify record display for orc files with binary type */
+    @Test
+    public void binaryTypeFileDisplay() throws IOException {
+        final String actual =
+            displayRecordWrapper(
+                "TestOrcFile.testStringAndBinaryStatistics.orc", 4, 4);
+        assertEquals("{\"bytes1\":null,\"string1\":\"hi\"}", actual);
+    }
+
+    /* verify record display for snappy compressed orc file */
+    @Test
+    public void snappyCompressedFileDisplay() throws IOException {
+        final String actual =
+            displayRecordWrapper("TestOrcFile.testSnappy.orc", 2, 2);
+        assertEquals("{\"int1\":1181413113,\"string1\":\"382fdaaa\"}", actual);
+    }
+
+    /* verify record display for union type orc file */
+    @Test
+    public void unionTypeFileDisplay() throws IOException {
+        final String actual =
+            displayRecordWrapper("TestOrcFile.testUnionAndTimestamp.orc", 5, 5);
+        assertEquals("{\"decimal\":null,\"time\":null,\"union\":{\"1\":null}}",
+            actual);
+    }
+
+}
diff --git a/az-hdfs-viewer/src/test/resources/TestAvro.avro b/az-hdfs-viewer/src/test/resources/TestAvro.avro
new file mode 100644
index 0000000..8ffdc97
Binary files /dev/null and b/az-hdfs-viewer/src/test/resources/TestAvro.avro differ
diff --git a/az-hdfs-viewer/src/test/resources/TestHtmEmptyFile.htm b/az-hdfs-viewer/src/test/resources/TestHtmEmptyFile.htm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/az-hdfs-viewer/src/test/resources/TestHtmEmptyFile.htm
diff --git a/az-hdfs-viewer/src/test/resources/TestHtmlFile.html b/az-hdfs-viewer/src/test/resources/TestHtmlFile.html
new file mode 100644
index 0000000..2c77cfb
--- /dev/null
+++ b/az-hdfs-viewer/src/test/resources/TestHtmlFile.html
@@ -0,0 +1 @@
+<p>file content</p>
\ No newline at end of file
diff --git a/az-hdfs-viewer/src/test/resources/TestOrcFile.emptyFile.orc b/az-hdfs-viewer/src/test/resources/TestOrcFile.emptyFile.orc
new file mode 100644
index 0000000..ecdadcb
Binary files /dev/null and b/az-hdfs-viewer/src/test/resources/TestOrcFile.emptyFile.orc differ
diff --git a/az-hdfs-viewer/src/test/resources/TestOrcFile.testPredicatePushdown.orc b/az-hdfs-viewer/src/test/resources/TestOrcFile.testPredicatePushdown.orc
new file mode 100644
index 0000000..4865dd8
Binary files /dev/null and b/az-hdfs-viewer/src/test/resources/TestOrcFile.testPredicatePushdown.orc differ
diff --git a/az-hdfs-viewer/src/test/resources/TestOrcFile.testSnappy.orc b/az-hdfs-viewer/src/test/resources/TestOrcFile.testSnappy.orc
new file mode 100644
index 0000000..aa6cc9c
Binary files /dev/null and b/az-hdfs-viewer/src/test/resources/TestOrcFile.testSnappy.orc differ
diff --git a/az-hdfs-viewer/src/test/resources/TestOrcFile.testStringAndBinaryStatistics.orc b/az-hdfs-viewer/src/test/resources/TestOrcFile.testStringAndBinaryStatistics.orc
new file mode 100644
index 0000000..4282c2a
Binary files /dev/null and b/az-hdfs-viewer/src/test/resources/TestOrcFile.testStringAndBinaryStatistics.orc differ
diff --git a/az-hdfs-viewer/src/test/resources/TestOrcFile.testUnionAndTimestamp.orc b/az-hdfs-viewer/src/test/resources/TestOrcFile.testUnionAndTimestamp.orc
new file mode 100644
index 0000000..377862d
Binary files /dev/null and b/az-hdfs-viewer/src/test/resources/TestOrcFile.testUnionAndTimestamp.orc differ
diff --git a/az-hdfs-viewer/src/test/resources/TestParquetFile.parquet b/az-hdfs-viewer/src/test/resources/TestParquetFile.parquet
new file mode 100644
index 0000000..bc61f97
Binary files /dev/null and b/az-hdfs-viewer/src/test/resources/TestParquetFile.parquet differ
diff --git a/az-hdfs-viewer/src/test/resources/TestTextFile.txt b/az-hdfs-viewer/src/test/resources/TestTextFile.txt
new file mode 100644
index 0000000..830e987
--- /dev/null
+++ b/az-hdfs-viewer/src/test/resources/TestTextFile.txt
@@ -0,0 +1 @@
+This is a dummy text file.

build.gradle 3(+3 -0)

diff --git a/build.gradle b/build.gradle
index 524eae0..2d5dafe 100644
--- a/build.gradle
+++ b/build.gradle
@@ -101,8 +101,11 @@ ext.deps = [
         metricsCore         : 'io.dropwizard.metrics:metrics-core:3.1.0',
         metricsJvm          : 'io.dropwizard.metrics:metrics-jvm:3.1.0',
         mockito             : 'org.mockito:mockito-core:2.10.0',
+        mongoDriver         : 'org.mongodb:mongo-java-driver:2.14.0',
         mysqlConnector      : 'mysql:mysql-connector-java:5.1.28',
         pig                 : 'org.apache.pig:pig:' + versions.pig,
+        parquetAvro         : 'com.twitter:parquet-avro:1.3.2',
+        parquetBundle       : 'com.twitter:parquet-hadoop-bundle:1.3.2',
         quartz              : 'org.quartz-scheduler:quartz:2.2.1',
         restliGenerator     : 'com.linkedin.pegasus:generator:' + versions.restli,
         restliServer        : 'com.linkedin.pegasus:restli-server:' + versions.restli,

settings.gradle 1(+1 -0)

diff --git a/settings.gradle b/settings.gradle
index a0d63cf..9a94fe6 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -29,3 +29,4 @@ include 'az-flow-trigger-dependency-plugin'
 include 'test'
 include 'az-reportal'
 include 'az-hadoop-jobtype-plugin'
+include 'az-hdfs-viewer'
diff --git a/test/hdfs-viewer-sample-files/TestAvro.avro b/test/hdfs-viewer-sample-files/TestAvro.avro
new file mode 100644
index 0000000..8ffdc97
Binary files /dev/null and b/test/hdfs-viewer-sample-files/TestAvro.avro differ
diff --git a/test/hdfs-viewer-sample-files/TestHtmEmptyFile.htm b/test/hdfs-viewer-sample-files/TestHtmEmptyFile.htm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/hdfs-viewer-sample-files/TestHtmEmptyFile.htm
diff --git a/test/hdfs-viewer-sample-files/TestHtmlFile.html b/test/hdfs-viewer-sample-files/TestHtmlFile.html
new file mode 100644
index 0000000..2c77cfb
--- /dev/null
+++ b/test/hdfs-viewer-sample-files/TestHtmlFile.html
@@ -0,0 +1 @@
+<p>file content</p>
\ No newline at end of file
diff --git a/test/hdfs-viewer-sample-files/TestOrcFile.emptyFile.orc b/test/hdfs-viewer-sample-files/TestOrcFile.emptyFile.orc
new file mode 100644
index 0000000..ecdadcb
Binary files /dev/null and b/test/hdfs-viewer-sample-files/TestOrcFile.emptyFile.orc differ
diff --git a/test/hdfs-viewer-sample-files/TestOrcFile.testPredicatePushdown.orc b/test/hdfs-viewer-sample-files/TestOrcFile.testPredicatePushdown.orc
new file mode 100644
index 0000000..4865dd8
Binary files /dev/null and b/test/hdfs-viewer-sample-files/TestOrcFile.testPredicatePushdown.orc differ
diff --git a/test/hdfs-viewer-sample-files/TestOrcFile.testSnappy.orc b/test/hdfs-viewer-sample-files/TestOrcFile.testSnappy.orc
new file mode 100644
index 0000000..aa6cc9c
Binary files /dev/null and b/test/hdfs-viewer-sample-files/TestOrcFile.testSnappy.orc differ
diff --git a/test/hdfs-viewer-sample-files/TestOrcFile.testStringAndBinaryStatistics.orc b/test/hdfs-viewer-sample-files/TestOrcFile.testStringAndBinaryStatistics.orc
new file mode 100644
index 0000000..4282c2a
Binary files /dev/null and b/test/hdfs-viewer-sample-files/TestOrcFile.testStringAndBinaryStatistics.orc differ
diff --git a/test/hdfs-viewer-sample-files/TestOrcFile.testUnionAndTimestamp.orc b/test/hdfs-viewer-sample-files/TestOrcFile.testUnionAndTimestamp.orc
new file mode 100644
index 0000000..377862d
Binary files /dev/null and b/test/hdfs-viewer-sample-files/TestOrcFile.testUnionAndTimestamp.orc differ
diff --git a/test/hdfs-viewer-sample-files/TestParquetFile.parquet b/test/hdfs-viewer-sample-files/TestParquetFile.parquet
new file mode 100644
index 0000000..bc61f97
Binary files /dev/null and b/test/hdfs-viewer-sample-files/TestParquetFile.parquet differ
diff --git a/test/hdfs-viewer-sample-files/TestTextFile.txt b/test/hdfs-viewer-sample-files/TestTextFile.txt
new file mode 100644
index 0000000..830e987
--- /dev/null
+++ b/test/hdfs-viewer-sample-files/TestTextFile.txt
@@ -0,0 +1 @@
+This is a dummy text file.