JdbcProjectHandlerSet.java

365 lines | 12.627 kB Blame History Raw Download
/*
 * Copyright 2017 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package azkaban.project;

import azkaban.db.EncodingType;
import azkaban.flow.Flow;
import azkaban.user.Permission;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.Triple;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.dbutils.ResultSetHandler;


/**
 * This is a JDBC Handler collection place for all project handler classes.
 */
class JdbcProjectHandlerSet {

  public static class ProjectResultHandler implements ResultSetHandler<List<Project>> {

    public static String SELECT_PROJECT_BY_NAME =
        "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=?";

    public static String SELECT_PROJECT_BY_ID =
        "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE id=?";

    public static String SELECT_ALL_ACTIVE_PROJECTS =
        "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE active=true";

    public static String SELECT_ACTIVE_PROJECT_BY_NAME =
        "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=? AND active=true";

    @Override
    public List<Project> handle(final ResultSet rs) throws SQLException {
      if (!rs.next()) {
        return Collections.emptyList();
      }

      final ArrayList<Project> projects = new ArrayList<>();
      do {
        final int id = rs.getInt(1);
        final String name = rs.getString(2);
        final boolean active = rs.getBoolean(3);
        final long modifiedTime = rs.getLong(4);
        final long createTime = rs.getLong(5);
        final int version = rs.getInt(6);
        final String lastModifiedBy = rs.getString(7);
        final String description = rs.getString(8);
        final int encodingType = rs.getInt(9);
        final byte[] data = rs.getBytes(10);

        final Project project;
        if (data != null) {
          final EncodingType encType = EncodingType.fromInteger(encodingType);
          final Object blobObj;
          try {
            // Convoluted way to inflate strings. Should find common package or
            // helper function.
            if (encType == EncodingType.GZIP) {
              // Decompress the sucker.
              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
              blobObj = JSONUtils.parseJSONFromString(jsonString);
            } else {
              final String jsonString = new String(data, "UTF-8");
              blobObj = JSONUtils.parseJSONFromString(jsonString);
            }
            project = Project.projectFromObject(blobObj);
          } catch (final IOException e) {
            throw new SQLException("Failed to get project.", e);
          }
        } else {
          project = new Project(id, name);
        }

        // update the fields as they may have changed

        project.setActive(active);
        project.setLastModifiedTimestamp(modifiedTime);
        project.setCreateTimestamp(createTime);
        project.setVersion(version);
        project.setLastModifiedUser(lastModifiedBy);
        project.setDescription(description);

        projects.add(project);
      } while (rs.next());

      return projects;
    }
  }

  public static class ProjectPermissionsResultHandler implements
      ResultSetHandler<List<Triple<String, Boolean, Permission>>> {

    public static String SELECT_PROJECT_PERMISSION =
        "SELECT project_id, modified_time, name, permissions, isGroup FROM project_permissions WHERE project_id=?";

    @Override
    public List<Triple<String, Boolean, Permission>> handle(final ResultSet rs)
        throws SQLException {
      if (!rs.next()) {
        return Collections.emptyList();
      }

      final List<Triple<String, Boolean, Permission>> permissions = new ArrayList<>();
      do {
        final String username = rs.getString(3);
        final int permissionFlag = rs.getInt(4);
        final boolean val = rs.getBoolean(5);

        final Permission perm = new Permission(permissionFlag);
        permissions.add(new Triple<>(username, val, perm));
      } while (rs.next());

      return permissions;
    }
  }

  public static class ProjectFlowsResultHandler implements ResultSetHandler<List<Flow>> {

    public static String SELECT_PROJECT_FLOW =
        "SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=? AND flow_id=?";

    public static String SELECT_ALL_PROJECT_FLOWS =
        "SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=?";

    @Override
    public List<Flow> handle(final ResultSet rs) throws SQLException {
      if (!rs.next()) {
        return Collections.emptyList();
      }

      final ArrayList<Flow> flows = new ArrayList<>();
      do {
        final String flowId = rs.getString(3);
        final int encodingType = rs.getInt(5);
        final byte[] dataBytes = rs.getBytes(6);

        if (dataBytes == null) {
          continue;
        }

        final EncodingType encType = EncodingType.fromInteger(encodingType);

        Object flowObj = null;
        try {
          // Convoluted way to inflate strings. Should find common package or
          // helper function.
          if (encType == EncodingType.GZIP) {
            // Decompress the sucker.
            final String jsonString = GZIPUtils.unGzipString(dataBytes, "UTF-8");
            flowObj = JSONUtils.parseJSONFromString(jsonString);
          } else {
            final String jsonString = new String(dataBytes, "UTF-8");
            flowObj = JSONUtils.parseJSONFromString(jsonString);
          }

          final Flow flow = Flow.flowFromObject(flowObj);
          flows.add(flow);
        } catch (final IOException e) {
          throw new SQLException("Error retrieving flow data " + flowId, e);
        }
      } while (rs.next());

      return flows;
    }
  }

  public static class ProjectPropertiesResultsHandler implements
      ResultSetHandler<List<Pair<String, Props>>> {

    public static String SELECT_PROJECT_PROPERTY =
        "SELECT project_id, version, name, modified_time, encoding_type, property FROM project_properties WHERE project_id=? AND version=? AND name=?";

    public static String SELECT_PROJECT_PROPERTIES =
        "SELECT project_id, version, name, modified_time, encoding_type, property FROM project_properties WHERE project_id=? AND version=?";

    @Override
    public List<Pair<String, Props>> handle(final ResultSet rs) throws SQLException {
      if (!rs.next()) {
        return Collections.emptyList();
      }

      final List<Pair<String, Props>> properties = new ArrayList<>();
      do {
        final String name = rs.getString(3);
        final int eventType = rs.getInt(5);
        final byte[] dataBytes = rs.getBytes(6);

        final EncodingType encType = EncodingType.fromInteger(eventType);
        String propertyString = null;

        try {
          if (encType == EncodingType.GZIP) {
            // Decompress the sucker.
            propertyString = GZIPUtils.unGzipString(dataBytes, "UTF-8");
          } else {
            propertyString = new String(dataBytes, "UTF-8");
          }

          final Props props = PropsUtils.fromJSONString(propertyString);
          props.setSource(name);
          properties.add(new Pair<>(name, props));
        } catch (final IOException e) {
          throw new SQLException(e);
        }
      } while (rs.next());

      return properties;
    }
  }

  public static class ProjectLogsResultHandler implements ResultSetHandler<List<ProjectLogEvent>> {

    public static String SELECT_PROJECT_EVENTS_ORDER =
        "SELECT project_id, event_type, event_time, username, message FROM project_events WHERE project_id=? ORDER BY event_time DESC LIMIT ? OFFSET ?";

    @Override
    public List<ProjectLogEvent> handle(final ResultSet rs) throws SQLException {
      if (!rs.next()) {
        return Collections.emptyList();
      }

      final ArrayList<ProjectLogEvent> events = new ArrayList<>();
      do {
        final int projectId = rs.getInt(1);
        final int eventType = rs.getInt(2);
        final long eventTime = rs.getLong(3);
        final String username = rs.getString(4);
        final String message = rs.getString(5);

        final ProjectLogEvent event =
            new ProjectLogEvent(projectId, ProjectLogEvent.EventType.fromInteger(eventType),
                eventTime, username,
                message);
        events.add(event);
      } while (rs.next());

      return events;
    }
  }

  public static class ProjectFileChunkResultHandler implements ResultSetHandler<List<byte[]>> {

    public static String SELECT_PROJECT_CHUNKS_FILE =
        "SELECT project_id, version, chunk, size, file FROM project_files WHERE project_id=? AND version=? AND chunk >= ? AND chunk < ? ORDER BY chunk ASC";

    @Override
    public List<byte[]> handle(final ResultSet rs) throws SQLException {
      if (!rs.next()) {
        return Collections.emptyList();
      }

      final ArrayList<byte[]> data = new ArrayList<>();
      do {
        final byte[] bytes = rs.getBytes(5);

        data.add(bytes);
      } while (rs.next());

      return data;
    }
  }

  public static class ProjectVersionResultHandler implements
      ResultSetHandler<List<ProjectFileHandler>> {

    public static String SELECT_PROJECT_VERSION =
        "SELECT project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, resource_id "
            + "FROM project_versions WHERE project_id=? AND version=?";

    @Override
    public List<ProjectFileHandler> handle(final ResultSet rs) throws SQLException {
      if (!rs.next()) {
        return null;
      }

      final List<ProjectFileHandler> handlers = new ArrayList<>();
      do {
        final int projectId = rs.getInt(1);
        final int version = rs.getInt(2);
        final long uploadTime = rs.getLong(3);
        final String uploader = rs.getString(4);
        final String fileType = rs.getString(5);
        final String fileName = rs.getString(6);
        final byte[] md5 = rs.getBytes(7);
        final int numChunks = rs.getInt(8);
        final String resourceId = rs.getString(9);

        final ProjectFileHandler handler =
            new ProjectFileHandler(projectId, version, uploadTime, uploader, fileType, fileName,
                numChunks, md5,
                resourceId);

        handlers.add(handler);
      } while (rs.next());

      return handlers;
    }
  }

  public static class IntHandler implements ResultSetHandler<Integer> {

    public static String SELECT_LATEST_VERSION = "SELECT MAX(version) FROM project_versions WHERE project_id=?";
    public static String SELECT_LATEST_FLOW_VERSION = "SELECT MAX(flow_version) FROM "
        + "project_flow_files WHERE project_id=? AND project_version=? AND flow_name=?";

    @Override
    public Integer handle(final ResultSet rs) throws SQLException {
      if (!rs.next()) {
        return 0;
      }

      return rs.getInt(1);
    }
  }

  public static class FlowFileResultHandler implements ResultSetHandler<List<byte[]>> {

    public static String SELECT_FLOW_FILE =
        "SELECT flow_file FROM project_flow_files WHERE "
            + "project_id=? AND project_version=? AND flow_name=? AND flow_version=?";

    public static String SELECT_ALL_FLOW_FILES =
        "SELECT flow_file FROM project_flow_files WHERE "
            + "project_id=? AND project_version=?";

    @Override
    public List<byte[]> handle(final ResultSet rs) throws SQLException {
      if (!rs.next()) {
        return Collections.emptyList();
      }

      final List<byte[]> data = new ArrayList<>();
      do {
        final byte[] bytes = rs.getBytes(1);
        data.add(bytes);
      } while (rs.next());

      return data;
    }
  }
}