ReportalUtil.java

166 lines | 5.201 kB Blame History Raw Download
/*
 * Copyright 2018 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.reportal.util;

import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.reportal.util.Reportal.Variable;
import java.io.OutputStream;
import java.io.PrintStream;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class ReportalUtil {

  public static IStreamProvider getStreamProvider(final String fileSystem) {
    if (fileSystem.equalsIgnoreCase("hdfs")) {
      return new StreamProviderHDFS();
    }
    return new StreamProviderLocal();
  }

  /**
   * Returns a list of the executable nodes in the specified flow in execution
   * order. Assumes that the flow is linear.
   */
  public static List<ExecutableNode> sortExecutableNodes(final ExecutableFlow flow) {
    final List<ExecutableNode> sortedNodes = new ArrayList<>();

    if (flow != null) {
      final List<String> startNodeIds = flow.getStartNodes();

      String nextNodeId = startNodeIds.isEmpty() ? null : startNodeIds.get(0);

      while (nextNodeId != null) {
        final ExecutableNode node = flow.getExecutableNode(nextNodeId);
        sortedNodes.add(node);

        final Set<String> outNodes = node.getOutNodes();
        nextNodeId = outNodes.isEmpty() ? null : outNodes.iterator().next();
      }
    }

    return sortedNodes;
  }

  /**
   * Get runtime variables to be set in unscheduled mode of execution.
   * Returns empty list, if no runtime variable is found
   */
  public static List<Variable> getRunTimeVariables(
      final Collection<Variable> variables) {
    final List<Variable> runtimeVariables =
        ReportalUtil.getVariablesByRegex(variables,
            Reportal.REPORTAL_CONFIG_PREFIX_NEGATION_REGEX);

    return runtimeVariables;
  }

  /**
   * Shortlist variables which match a given regex. Returns empty empty list, if no
   * eligible variable is found
   */
  public static List<Variable> getVariablesByRegex(
      final Collection<Variable> variables, final String regex) {
    final List<Variable> shortlistedVariables = new ArrayList<>();
    if (variables != null && regex != null) {
      for (final Variable var : variables) {
        if (var.getTitle().matches(regex)) {
          shortlistedVariables.add(var);
        }
      }
    }
    return shortlistedVariables;
  }

  /**
   * Shortlist variables which match a given prefix. Returns empty map, if no
   * eligible variable is found.
   *
   * @param variables variables to be processed
   * @param prefix prefix to be matched
   * @return a map with shortlisted variables and prefix removed
   */
  public static Map<String, String> getVariableMapByPrefix(
      final Collection<Variable> variables, final String prefix) {
    final Map<String, String> shortlistMap = new HashMap<>();
    if (variables != null && prefix != null) {
      for (final Variable var : getVariablesByRegex(variables,
          Reportal.REPORTAL_CONFIG_PREFIX_REGEX)) {
        shortlistMap
            .put(var.getTitle().replaceFirst(prefix, ""), var.getName());
      }
    }
    return shortlistMap;
  }

  private static String formatValue(final String value) {
    return "\"" + value.replace("\"", "") + "\"";
  }

  public static void outputQueryResult(final ResultSet result, final OutputStream outputStream)
      throws SQLException {
    final PrintStream outFile = new PrintStream(outputStream);
    final String delim = ",";
    boolean isHeaderPending = true;
    if (result != null) {
      while (result.next()) {
        final int numColumns = result.getMetaData().getColumnCount();
        final StringBuilder dataString = new StringBuilder();

        if (isHeaderPending) {
          final StringBuilder headerString = new StringBuilder();
          for (int j = 1; j <= numColumns; j++) {
            final String colName = formatValue(result.getMetaData().getColumnName(j));
            if (j > 1) {
              headerString.append(delim).append(colName);
            } else {
              headerString.append(colName);
            }
          }
          isHeaderPending = false;
          outFile.println(headerString.toString());
        }

        for (int j = 1; j <= numColumns; j++) {
          String colVal = result.getString(j);
          if (colVal == null) {
            colVal = "\"null\"";
          } else {
            colVal = formatValue(colVal);
          }

          if (j > 1) {
            dataString.append(delim).append(colVal);
          } else {
            dataString.append(colVal);
          }
        }

        outFile.println(dataString.toString());
      }
    }
    outFile.close();
  }
}