azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index d726216..7a74326 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -382,6 +382,43 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
     }
 
     logger.info("Md5 hash created");
+
+    /**
+     * Insert a new version record to TABLE project_versions before uploading files.
+     *
+     * The reason for this operation:
+     * When error chunking happens in remote mysql server, incomplete file data remains
+     * in DB, and an SQL exception is thrown. If we don't have this operation before uploading file,
+     * the SQL exception prevents AZ from creating the new version record in Table project_versions.
+     * However, the Table project_files still reserve the incomplete files, which causes troubles
+     * when uploading a new file: Since the version in TABLE project_versions is still old, mysql will stop
+     * inserting new files to db.
+     *
+     * Why this operation is safe:
+     * When AZ uploads a new zip file, it always fetches the latest version proj_v from TABLE project_version,
+     * proj_v+1 will be used as the new version for the uploading files.
+     *
+     * Assume error chunking happens on day 1. proj_v is created for this bad file (old file version + 1).
+     * When we upload a new project zip in day2, new file in day 2 will use the new version (proj_v + 1).
+     * When file uploading completes, AZ will clean all old chunks in DB afterward.
+     */
+    final String INSERT_PROJECT_VERSION =
+        "INSERT INTO project_versions (project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks) values (?,?,?,?,?,?,?,?)";
+
+    try {
+
+      /**
+       * As we don't know the num_chunks before uploading the file, we initialize it to 0,
+       * and will update it after uploading completes.
+       */
+      runner.update(connection, INSERT_PROJECT_VERSION, project.getId(),
+          version, updateTime, uploader, filetype, filename, md5, 0);
+    } catch (SQLException e) {
+      logger.error(e);
+      throw new ProjectManagerException("Error initializing project version "
+          + project.getName(), e);
+    }
+
     // Really... I doubt we'll get a > 2gig file. So int casting it is!
     byte[] buffer = new byte[CHUCK_SIZE];
     final String INSERT_PROJECT_FILES =
@@ -402,9 +439,18 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
           logger.info("Running update for " + filename + " chunk " + chunk);
           runner.update(connection, INSERT_PROJECT_FILES, project.getId(),
               version, chunk, size, buf);
+
+          /**
+           * We enforce az committing to db when uploading every single chunk,
+           * in order to reduce the transaction duration and conserve sql server resources.
+           *
+           * If the files to be uploaded is very large and we don't commit every single chunk,
+           * the remote mysql server will run into memory troubles.
+           */
+          connection.commit();
           logger.info("Finished update for " + filename + " chunk " + chunk);
         } catch (SQLException e) {
-          throw new ProjectManagerException("Error chunking", e);
+          throw new ProjectManagerException("Error Chunking during uploading files to db...");
         }
         ++chunk;
 
@@ -416,16 +462,19 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
       IOUtils.closeQuietly(bufferedStream);
     }
 
-    final String INSERT_PROJECT_VERSION =
-        "INSERT INTO project_versions (project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks) values (?,?,?,?,?,?,?,?)";
+    /**
+     * we update num_chunks's actual number to db here.
+     */
+    final String UPDATE_PROJECT_NUM_CHUNKS =
+        "UPDATE project_versions SET num_chunks=? WHERE project_id=? AND version=?";
 
     try {
-      runner.update(connection, INSERT_PROJECT_VERSION, project.getId(),
-          version, updateTime, uploader, filetype, filename, md5, chunk);
+      runner.update(connection, UPDATE_PROJECT_NUM_CHUNKS, chunk, project.getId(), version);
+      connection.commit();
     } catch (SQLException e) {
-      logger.error(e);
-      throw new ProjectManagerException("Error updating project version "
-          + project.getName(), e);
+      throw new ProjectManagerException(
+          "Error updating project " + project.getId() + " : chunk_num "
+              + chunk, e);
     }
   }