azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 131e1f8..6f390c5 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -382,6 +382,25 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
     }
 
     logger.info("Md5 hash created");
+
+    /**
+     * Insert a version row to table project_versions.
+     * If something goes wrong during uploading project files to DB, the latest version number
+     * in project_versions refer to the destructive project_file. However, az always make the number+1
+     * when uploading files next time.
+     */
+    final String INSERT_PROJECT_VERSION =
+        "INSERT INTO project_versions (project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks) values (?,?,?,?,?,?,?,?)";
+
+    try {
+      runner.update(connection, INSERT_PROJECT_VERSION, project.getId(),
+          version, updateTime, uploader, filetype, filename, md5, 0);
+    } catch (SQLException e) {
+      logger.error(e);
+      throw new ProjectManagerException("Error initializing project version "
+          + project.getName(), e);
+    }
+
     // Really... I doubt we'll get a > 2gig file. So int casting it is!
     byte[] buffer = new byte[CHUCK_SIZE];
     final String INSERT_PROJECT_FILES =
@@ -419,16 +438,20 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
       IOUtils.closeQuietly(bufferedStream);
     }
 
-    final String INSERT_PROJECT_VERSION =
-        "INSERT INTO project_versions (project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks) values (?,?,?,?,?,?,?,?)";
+    /**
+     * Since we initialized num_chunks to 0 before uploading files, we set it
+     * to right number here.
+     */
+    final String UPDATE_PROJECT_NUM_CHUNKS =
+        "UPDATE project_versions SET num_chunks=? WHERE project_id=? AND version=?";
 
     try {
-      runner.update(connection, INSERT_PROJECT_VERSION, project.getId(),
-          version, updateTime, uploader, filetype, filename, md5, chunk);
+      runner.update(connection, UPDATE_PROJECT_NUM_CHUNKS, chunk, project.getId(), version);
+      connection.commit();
     } catch (SQLException e) {
-      logger.error(e);
-      throw new ProjectManagerException("Error updating project version "
-          + project.getName(), e);
+      throw new ProjectManagerException(
+          "Error updating project file chunk_num " + project.getId() + ":"
+              + chunk, e);
     }
   }