diff --git a/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java b/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java
index b31e1eb..9c82f2a 100644
--- a/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java
+++ b/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java
@@ -242,9 +242,15 @@ public class AzkabanDatabaseSetup {
upgradeList.put(key, upgradeVersions);
}
}
+ for (String key : missingTables) {
+ List<String> upgradeVersions = findOutOfDateTable(key, "");
+ if (upgradeVersions != null && !upgradeVersions.isEmpty()) {
+ upgradeList.put(key, upgradeVersions);
+ }
+ }
}
- private List<String> findOutOfDateTable(String table, String version) {
+ private List<String> findOutOfDateTable(String table, String currentVersion) {
File directory = new File(scriptPath);
ArrayList<String> versions = new ArrayList<String>();
@@ -255,30 +261,29 @@ public class AzkabanDatabaseSetup {
return null;
}
- String updateFileNameVersion = UPDATE_SCRIPT_PREFIX + table + "." + version;
+ String updateFileNameVersion = UPDATE_SCRIPT_PREFIX + table + "." + currentVersion;
for (File file : createScripts) {
String fileName = file.getName();
if (fileName.compareTo(updateFileNameVersion) > 0) {
- if (fileName.startsWith(updateFileNameVersion)) {
- continue;
- }
-
String[] split = fileName.split("\\.");
- String versionNum = "";
+ String updateScriptVersion = "";
for (int i = 2; i < split.length - 1; ++i) {
try {
Integer.parseInt(split[i]);
- versionNum += split[i] + ".";
+ updateScriptVersion += split[i] + ".";
} catch (NumberFormatException e) {
break;
}
}
- if (versionNum.endsWith(".")) {
- versionNum = versionNum.substring(0, versionNum.length() - 1);
-
- if (versionNum.compareTo(version) == 0) {
- versions.add(versionNum);
+ if (updateScriptVersion.endsWith(".")) {
+ updateScriptVersion = updateScriptVersion.substring(0, updateScriptVersion.length() - 1);
+
+ // add to update list if updateScript will update above current
+ // version and upto targetVersion in database.properties
+ if (updateScriptVersion.compareTo(currentVersion) > 0
+ && updateScriptVersion.compareTo(this.version) <= 0) {
+ versions.add(updateScriptVersion);
}
}
}
@@ -300,6 +305,8 @@ public class AzkabanDatabaseSetup {
for (String table : missingTables) {
if (!table.equals("properties")) {
runTableScripts(conn, table, version, dataSource.getDBType(), false);
+ // update version as we have create a new table
+ installedVersions.put(table, version);
}
}
} finally {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index d1120ea..39dd831 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -414,7 +414,12 @@ public class ExecutorManager extends EventHandler implements
for (Executor executor : activeExecutors) {
ports.add(executor.getHost() + ":" + executor.getPort());
}
-
+ // include executor which were initially active and still has flows running
+ for (Pair<ExecutionReference, ExecutableFlow> running : runningFlows
+ .values()) {
+ ExecutionReference ref = running.getFirst();
+ ports.add(ref.getHost() + ":" + ref.getPort());
+ }
return ports;
}