azkaban-aplcache

Merge pull request #491 from logiclord/multipleexecutors Auto

9/16/2015 7:54:07 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java b/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java
index b31e1eb..9c82f2a 100644
--- a/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java
+++ b/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java
@@ -242,9 +242,15 @@ public class AzkabanDatabaseSetup {
         upgradeList.put(key, upgradeVersions);
       }
     }
+    for (String key : missingTables) {
+      List<String> upgradeVersions = findOutOfDateTable(key, "");
+      if (upgradeVersions != null && !upgradeVersions.isEmpty()) {
+        upgradeList.put(key, upgradeVersions);
+      }
+    }
   }
 
-  private List<String> findOutOfDateTable(String table, String version) {
+  private List<String> findOutOfDateTable(String table, String currentVersion) {
     File directory = new File(scriptPath);
     ArrayList<String> versions = new ArrayList<String>();
 
@@ -255,30 +261,29 @@ public class AzkabanDatabaseSetup {
       return null;
     }
 
-    String updateFileNameVersion = UPDATE_SCRIPT_PREFIX + table + "." + version;
+    String updateFileNameVersion = UPDATE_SCRIPT_PREFIX + table + "." + currentVersion;
     for (File file : createScripts) {
       String fileName = file.getName();
       if (fileName.compareTo(updateFileNameVersion) > 0) {
-        if (fileName.startsWith(updateFileNameVersion)) {
-          continue;
-        }
-
         String[] split = fileName.split("\\.");
-        String versionNum = "";
+        String updateScriptVersion = "";
 
         for (int i = 2; i < split.length - 1; ++i) {
           try {
             Integer.parseInt(split[i]);
-            versionNum += split[i] + ".";
+            updateScriptVersion += split[i] + ".";
           } catch (NumberFormatException e) {
             break;
           }
         }
-        if (versionNum.endsWith(".")) {
-          versionNum = versionNum.substring(0, versionNum.length() - 1);
-
-          if (versionNum.compareTo(version) == 0) {
-            versions.add(versionNum);
+        if (updateScriptVersion.endsWith(".")) {
+          updateScriptVersion = updateScriptVersion.substring(0, updateScriptVersion.length() - 1);
+
+          // add to update list if updateScript will update above current
+          // version and upto targetVersion in database.properties
+          if (updateScriptVersion.compareTo(currentVersion) > 0
+            && updateScriptVersion.compareTo(this.version) <= 0) {
+            versions.add(updateScriptVersion);
           }
         }
       }
@@ -300,6 +305,8 @@ public class AzkabanDatabaseSetup {
       for (String table : missingTables) {
         if (!table.equals("properties")) {
           runTableScripts(conn, table, version, dataSource.getDBType(), false);
+          // update version as we have create a new table
+          installedVersions.put(table, version);
         }
       }
     } finally {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index d1120ea..39dd831 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -414,7 +414,12 @@ public class ExecutorManager extends EventHandler implements
     for (Executor executor : activeExecutors) {
       ports.add(executor.getHost() + ":" + executor.getPort());
     }
-
+    // include executor which were initially active and still has flows running
+    for (Pair<ExecutionReference, ExecutableFlow> running : runningFlows
+      .values()) {
+      ExecutionReference ref = running.getFirst();
+      ports.add(ref.getHost() + ":" + ref.getPort());
+    }
     return ports;
   }
 
diff --git a/azkaban-sql/src/sql/database.properties b/azkaban-sql/src/sql/database.properties
index b68be28..b6802bc 100644
--- a/azkaban-sql/src/sql/database.properties
+++ b/azkaban-sql/src/sql/database.properties
@@ -1 +1 @@
-version=
+version=3.0