BsonFileViewer.java

114 lines | 3.056 kB Blame History Raw Download
/*
 * Copyright 2012 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.viewer.hdfs;

import java.util.EnumSet;
import java.util.Set;
import java.io.IOException;
import java.io.OutputStream;

import org.apache.hadoop.fs.permission.AccessControlException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.bson.BSONObject;
import org.bson.BasicBSONCallback;
import org.bson.BasicBSONDecoder;

import com.mongodb.util.JSON;

/**
 * File viewer for Mongo bson files.
 *
 * @author adilaijaz
 */
public final class BsonFileViewer extends HdfsFileViewer {

  /**
   * The maximum time spent, in milliseconds, while reading records from the
   * file.
   */
  private static long STOP_TIME = 2000l;

  private static final String VIEWER_NAME = "BSON";

  @Override
  public String getName() {
    return VIEWER_NAME;
  }

  @Override
  public Set<Capability> getCapabilities(FileSystem fs, Path path)
      throws AccessControlException {
    if (path.getName().endsWith(".bson")) {
      return EnumSet.of(Capability.READ);
    }
    return EnumSet.noneOf(Capability.class);
  }

  @Override
  public void displayFile(FileSystem fs, Path path, OutputStream outStream,
      int startLine, int endLine) throws IOException {

    FSDataInputStream in = null;
    try {
      in = fs.open(path, 16 * 1024 * 1024);

      long endTime = System.currentTimeMillis() + STOP_TIME;

      BasicBSONCallback callback = new BasicBSONCallback();
      BasicBSONDecoder decoder = new BasicBSONDecoder();

      /*
       * keep reading and rendering bsonObjects until one of these conditions is
       * met:
       *
       * a. we have rendered all bsonObjects desired. b. we have run out of
       * time.
       */
      for (int lineno = 1; lineno <= endLine
          && System.currentTimeMillis() <= endTime; lineno++) {
        if (lineno < startLine) {
          continue;
        }

        callback.reset();
        decoder.decode(in, callback);

        BSONObject value = (BSONObject) callback.get();

        StringBuilder bldr = new StringBuilder();
        bldr.append("\n\n Record ");
        bldr.append(lineno);
        bldr.append('\n');
        JSON.serialize(value, bldr);
        outStream.write(bldr.toString().getBytes("UTF-8"));
      }
    } catch (IOException e) {
      outStream
          .write(("Error in display avro file: " + e.getLocalizedMessage())
              .getBytes("UTF-8"));
    } finally {
      if (in != null) {
        in.close();
      }
      outStream.flush();
    }
  }
}