azkaban-aplcache
Changes
azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java 157(+157 -0)
azkaban-sql/src/sql/create.executors.sql 10(+10 -0)
azkaban-webserver/.gitignore 1(+1 -0)
Details
diff --git a/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java b/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java
index b31e1eb..9c82f2a 100644
--- a/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java
+++ b/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java
@@ -242,9 +242,15 @@ public class AzkabanDatabaseSetup {
upgradeList.put(key, upgradeVersions);
}
}
+ for (String key : missingTables) {
+ List<String> upgradeVersions = findOutOfDateTable(key, "");
+ if (upgradeVersions != null && !upgradeVersions.isEmpty()) {
+ upgradeList.put(key, upgradeVersions);
+ }
+ }
}
- private List<String> findOutOfDateTable(String table, String version) {
+ private List<String> findOutOfDateTable(String table, String currentVersion) {
File directory = new File(scriptPath);
ArrayList<String> versions = new ArrayList<String>();
@@ -255,30 +261,29 @@ public class AzkabanDatabaseSetup {
return null;
}
- String updateFileNameVersion = UPDATE_SCRIPT_PREFIX + table + "." + version;
+ String updateFileNameVersion = UPDATE_SCRIPT_PREFIX + table + "." + currentVersion;
for (File file : createScripts) {
String fileName = file.getName();
if (fileName.compareTo(updateFileNameVersion) > 0) {
- if (fileName.startsWith(updateFileNameVersion)) {
- continue;
- }
-
String[] split = fileName.split("\\.");
- String versionNum = "";
+ String updateScriptVersion = "";
for (int i = 2; i < split.length - 1; ++i) {
try {
Integer.parseInt(split[i]);
- versionNum += split[i] + ".";
+ updateScriptVersion += split[i] + ".";
} catch (NumberFormatException e) {
break;
}
}
- if (versionNum.endsWith(".")) {
- versionNum = versionNum.substring(0, versionNum.length() - 1);
-
- if (versionNum.compareTo(version) == 0) {
- versions.add(versionNum);
+ if (updateScriptVersion.endsWith(".")) {
+ updateScriptVersion = updateScriptVersion.substring(0, updateScriptVersion.length() - 1);
+
+ // add to update list if updateScript will update above current
+ // version and upto targetVersion in database.properties
+ if (updateScriptVersion.compareTo(currentVersion) > 0
+ && updateScriptVersion.compareTo(this.version) <= 0) {
+ versions.add(updateScriptVersion);
}
}
}
@@ -300,6 +305,8 @@ public class AzkabanDatabaseSetup {
for (String table : missingTables) {
if (!table.equals("properties")) {
runTableScripts(conn, table, version, dataSource.getDBType(), false);
+ // update version as we have create a new table
+ installedVersions.put(table, version);
}
}
} finally {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
index 8bc725f..8abbd61 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
@@ -17,6 +17,7 @@
package azkaban.executor;
public interface ConnectorParams {
+ public static final String EXECUTOR_ID_PARAM = "executorId";
public static final String ACTION_PARAM = "action";
public static final String EXECID_PARAM = "execid";
public static final String SHAREDTOKEN_PARAM = "token";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
new file mode 100644
index 0000000..3050a8d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.Comparator;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/**
+ * Comparator implicitly used in priority queue for QueuedExecutions.
+ */
+public final class ExecutableFlowPriorityComparator implements
+ Comparator<Pair<ExecutionReference, ExecutableFlow>> {
+ private static Logger logger = Logger
+ .getLogger(ExecutableFlowPriorityComparator.class);
+
+ /**
+ * <pre>
+ * Sorting order is determined by:-
+ * 1. descending order of priority
+ * 2. if same priority, ascending order of update time
+ * 3. if same priority and updateTime, ascending order of execution id
+ * </pre>
+ *
+ * {@inheritDoc}
+ *
+ * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
+ */
+ @Override
+ public int compare(Pair<ExecutionReference, ExecutableFlow> pair1,
+ Pair<ExecutionReference, ExecutableFlow> pair2) {
+ ExecutableFlow exflow1 = null, exflow2 = null;
+ if (pair1 != null && pair1.getSecond() != null) {
+ exflow1 = pair1.getSecond();
+ }
+ if (pair2 != null && pair2.getSecond() != null) {
+ exflow2 = pair2.getSecond();
+ }
+ if (exflow1 == null && exflow2 == null)
+ return 0;
+ else if (exflow1 == null)
+ return -1;
+ else if (exflow2 == null)
+ return 1;
+ else {
+ // descending order of priority
+ int diff = getPriority(exflow2) - getPriority(exflow1);
+ if (diff == 0) {
+ // ascending order of update time, if same priority
+ diff = (int) (exflow1.getUpdateTime() - exflow2.getUpdateTime());
+ }
+ if (diff == 0) {
+ // ascending order of execution id, if same priority and updateTime
+ diff = exflow1.getExecutionId() - exflow2.getExecutionId();
+ }
+ return diff;
+ }
+ }
+
+ /* Helper method to fetch flow priority from flow props */
+ private int getPriority(ExecutableFlow exflow) {
+ ExecutionOptions options = exflow.getExecutionOptions();
+ int priority = ExecutionOptions.DEFAULT_FLOW_PRIORITY;
+ if (options != null
+ && options.getFlowParameters() != null
+ && options.getFlowParameters()
+ .containsKey(ExecutionOptions.FLOW_PRIORITY)) {
+ try {
+ priority =
+ Integer.valueOf(options.getFlowParameters().get(
+ ExecutionOptions.FLOW_PRIORITY));
+ } catch (NumberFormatException ex) {
+ priority = ExecutionOptions.DEFAULT_FLOW_PRIORITY;
+ logger.error(
+ "Failed to parse flow priority for exec_id = "
+ + exflow.getExecutionId(), ex);
+ }
+ }
+ return priority;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
index d4cb262..d8b10f1 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
@@ -33,6 +33,10 @@ public class ExecutionOptions {
public static final String CONCURRENT_OPTION_SKIP = "skip";
public static final String CONCURRENT_OPTION_PIPELINE = "pipeline";
public static final String CONCURRENT_OPTION_IGNORE = "ignore";
+ public static final String FLOW_PRIORITY = "flowPriority";
+ /* override dispatcher selection and use executor id specified */
+ public static final String USE_EXECUTOR = "useExecutor";
+ public static final int DEFAULT_FLOW_PRIORITY = 5;
private static final String FLOW_PARAMETERS = "flowParameters";
private static final String NOTIFY_ON_FIRST_FAILURE = "notifyOnFirstFailure";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
index e314206..9d93476 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
@@ -18,16 +18,23 @@ package azkaban.executor;
public class ExecutionReference {
private final int execId;
- private final String host;
- private final int port;
+ private Executor executor;
private long updateTime;
private long nextCheckTime = -1;
private int numErrors = 0;
- public ExecutionReference(int execId, String host, int port) {
+
+ public ExecutionReference(int execId) {
+ this.execId = execId;
+ }
+
+ public ExecutionReference(int execId, Executor executor) {
+ if (executor == null) {
+ throw new IllegalArgumentException(String.format(
+ "Executor cannot be null for exec id: %d ExecutionReference", execId));
+ }
this.execId = execId;
- this.host = host;
- this.port = port;
+ this.executor = executor;
}
public void setUpdateTime(long updateTime) {
@@ -51,11 +58,11 @@ public class ExecutionReference {
}
public String getHost() {
- return host;
+ return executor.getHost();
}
public int getPort() {
- return port;
+ return executor.getPort();
}
public int getNumErrors() {
@@ -65,4 +72,12 @@ public class ExecutionReference {
public void setNumErrors(int numErrors) {
this.numErrors = numErrors;
}
-}
+
+ public void setExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
new file mode 100644
index 0000000..f0600ab
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.Date;
+import azkaban.utils.Utils;
+
+/**
+ * Class to represent an AzkabanExecutorServer details for ExecutorManager
+ *
+ * @author gaggarwa
+ */
+public class Executor implements Comparable<Executor> {
+ private final int id;
+ private final String host;
+ private final int port;
+ private boolean isActive;
+ // cached copy of the latest statistics from the executor.
+ private ExecutorInfo cachedExecutorStats;
+ private Date lastStatsUpdatedTime;
+
+ /**
+ * <pre>
+ * Construct an Executor Object
+ * Note: port should be a within unsigned 2 byte
+ * integer range
+ * </pre>
+ *
+ * @param executor_id
+ * @param executor_host
+ * @param executor_port
+ */
+ public Executor(int id, String host, int port, boolean isActive) {
+ if (!Utils.isValidPort(port)) {
+ throw new IllegalArgumentException(String.format(
+ "Invalid port number %d for host %s, executor_id %d", port, host, id));
+ }
+
+ this.id = id;
+ this.host = host;
+ this.port = port;
+ this.isActive = isActive;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (isActive ? 1231 : 1237);
+ result = prime * result + ((host == null) ? 0 : host.hashCode());
+ result = prime * result + id;
+ result = prime * result + port;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (!(obj instanceof Executor))
+ return false;
+ Executor other = (Executor) obj;
+ if (isActive != other.isActive)
+ return false;
+ if (host == null) {
+ if (other.host != null)
+ return false;
+ } else if (!host.equals(other.host))
+ return false;
+ if (id != other.id)
+ return false;
+ if (port != other.port)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString(){
+ return String.format("%s:%s (id: %s)",
+ null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
+ this.port, this.id);
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public boolean isActive() {
+ return isActive;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public ExecutorInfo getExecutorInfo() {
+ return this.cachedExecutorStats;
+ }
+
+ public void setExecutorInfo(ExecutorInfo info) {
+ this.cachedExecutorStats = info;
+ this.lastStatsUpdatedTime = new Date();
+ }
+
+ /**
+ * Gets the timestamp when the executor info is last updated.
+ * @return date object represents the timestamp, null if the executor info of this
+ * specific executor is never refreshed.
+ * */
+ public Date getLastStatsUpdatedTime(){
+ return this.lastStatsUpdatedTime;
+ }
+
+ public void setActive(boolean isActive) {
+ this.isActive = isActive;
+ }
+
+ @Override
+ public int compareTo(Executor o) {
+ return null == o ? 1 : this.hashCode() - o.hashCode();
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
new file mode 100644
index 0000000..2ab814d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.IOException;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.util.EntityUtils;
+import azkaban.utils.RestfulApiClient;
+
+/** Client class that will be used to handle all Restful API calls between Executor and the host application.
+ * */
+public class ExecutorApiClient extends RestfulApiClient<String> {
+ private static ExecutorApiClient instance = null;
+ private ExecutorApiClient(){}
+
+ /**
+ * Singleton method to return the instance of the current object.
+ * */
+ public static ExecutorApiClient getInstance(){
+ if (null == instance){
+ instance = new ExecutorApiClient();
+ }
+
+ return instance;
+ }
+
+ /**Implementing the parseResponse function to return de-serialized Json object.
+ * @param response the returned response from the HttpClient.
+ * @return de-serialized object from Json or null if the response doesn't have a body.
+ * */
+ @Override
+ protected String parseResponse(HttpResponse response)
+ throws HttpResponseException, IOException {
+ final StatusLine statusLine = response.getStatusLine();
+ String responseBody = response.getEntity() != null ?
+ EntityUtils.toString(response.getEntity()) : "";
+
+ if (statusLine.getStatusCode() >= 300) {
+
+ logger.error(String.format("unable to parse response as the response status is %s",
+ statusLine.getStatusCode()));
+
+ throw new HttpResponseException(statusLine.getStatusCode(),responseBody);
+ }
+
+ return responseBody;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
new file mode 100644
index 0000000..03de432
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.IOException;
+
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+ /** Class that exposes the statistics from the executor server.
+ * List of the statistics -
+ * remainingMemoryPercent;
+ * remainingMemory;
+ * remainingFlowCapacity;
+ * numberOfAssignedFlows;
+ * lastDispatchedTime;
+ * cpuUsage;
+ *
+ * */
+ public class ExecutorInfo implements java.io.Serializable{
+ private static final long serialVersionUID = 3009746603773371263L;
+ private double remainingMemoryPercent;
+ private long remainingMemoryInMB;
+ private int remainingFlowCapacity;
+ private int numberOfAssignedFlows;
+ private long lastDispatchedTime;
+ private double cpuUsage;
+
+ public double getCpuUsage() {
+ return this.cpuUsage;
+ }
+
+ public void setCpuUpsage(double value){
+ this.cpuUsage = value;
+ }
+
+ public double getRemainingMemoryPercent() {
+ return this.remainingMemoryPercent;
+ }
+
+ public void setRemainingMemoryPercent(double value){
+ this.remainingMemoryPercent = value;
+ }
+
+ public long getRemainingMemoryInMB(){
+ return this.remainingMemoryInMB;
+ }
+
+ public void setRemainingMemoryInMB(long value){
+ this.remainingMemoryInMB = value;
+ }
+
+ public int getRemainingFlowCapacity(){
+ return this.remainingFlowCapacity;
+ }
+
+ public void setRemainingFlowCapacity(int value){
+ this.remainingFlowCapacity = value;
+ }
+
+ public long getLastDispatchedTime(){
+ return this.lastDispatchedTime;
+ }
+
+ public void setLastDispatchedTime(long value){
+ this.lastDispatchedTime = value;
+ }
+
+ public int getNumberOfAssignedFlows () {
+ return this.numberOfAssignedFlows;
+ }
+
+ public void setNumberOfAssignedFlows (int value) {
+ this.numberOfAssignedFlows = value;
+ }
+
+ public ExecutorInfo(){}
+
+ public ExecutorInfo (double remainingMemoryPercent,
+ long remainingMemory,
+ int remainingFlowCapacity,
+ long lastDispatched,
+ double cpuUsage,
+ int numberOfAssignedFlows){
+ this.remainingMemoryInMB = remainingMemory;
+ this.cpuUsage = cpuUsage;
+ this.remainingFlowCapacity = remainingFlowCapacity;
+ this.remainingMemoryPercent = remainingMemoryPercent;
+ this.lastDispatchedTime = lastDispatched;
+ this.numberOfAssignedFlows = numberOfAssignedFlows;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj instanceof ExecutorInfo)
+ {
+ boolean result = true;
+ ExecutorInfo stat = (ExecutorInfo) obj;
+
+ result &=this.remainingMemoryInMB == stat.remainingMemoryInMB;
+ result &=this.cpuUsage == stat.cpuUsage;
+ result &=this.remainingFlowCapacity == stat.remainingFlowCapacity;
+ result &=this.remainingMemoryPercent == stat.remainingMemoryPercent;
+ result &=this.numberOfAssignedFlows == stat.numberOfAssignedFlows;
+ result &= this.lastDispatchedTime == stat.lastDispatchedTime;
+ return result;
+ }
+ return false;
+ }
+
+ /**
+ * Helper function to get an ExecutorInfo instance from the JSon String serialized from another object.
+ * @param jsonString the string that will be de-serialized from.
+ * @return instance of the object if the parsing is successful, null other wise.
+ * @throws JsonParseException,JsonMappingException,IOException
+ * */
+ public static ExecutorInfo fromJSONString(String jsonString) throws
+ JsonParseException,
+ JsonMappingException,
+ IOException{
+ if (null == jsonString || jsonString.length() == 0) return null;
+ return new ObjectMapper().readValue(jsonString, ExecutorInfo.class);
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 6dc0e11..80e8167 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -20,6 +20,7 @@ import java.io.File;
import java.util.List;
import java.util.Map;
+import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
@@ -47,12 +48,187 @@ public interface ExecutorLoader {
String flowContains, String userNameContains, int status, long startData,
long endData, int skip, int num) throws ExecutorManagerException;
+ /**
+ * <pre>
+ * Fetch all executors from executors table
+ * Note:-
+ * 1 throws an Exception in case of a SQL issue
+ * 2 returns an empty list in case of no executor
+ * </pre>
+ *
+ * @return List<Executor>
+ * @throws ExecutorManagerException
+ */
+ public List<Executor> fetchAllExecutors() throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Fetch all executors from executors table with active = true
+ * Note:-
+ * 1 throws an Exception in case of a SQL issue
+ * 2 returns an empty list in case of no active executor
+ * </pre>
+ *
+ * @return List<Executor>
+ * @throws ExecutorManagerException
+ */
+ public List<Executor> fetchActiveExecutors() throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Fetch executor from executors with a given (host, port)
+ * Note:
+ * 1. throws an Exception in case of a SQL issue
+ * 2. return null when no executor is found
+ * with the given (host,port)
+ * </pre>
+ *
+ * @return Executor
+ * @throws ExecutorManagerException
+ */
+ public Executor fetchExecutor(String host, int port)
+ throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Fetch executor from executors with a given executorId
+ * Note:
+ * 1. throws an Exception in case of a SQL issue
+ * 2. return null when no executor is found with the given executorId
+ * </pre>
+ *
+ * @return Executor
+ * @throws ExecutorManagerException
+ */
+ public Executor fetchExecutor(int executorId) throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * create an executor and insert in executors table.
+ * Note:-
+ * 1. throws an Exception in case of a SQL issue
+ * 2. throws an Exception if a executor with (host, port) already exist
+ * 3. return null when no executor is found with the given executorId
+ * </pre>
+ *
+ * @return Executor
+ * @throws ExecutorManagerException
+ */
+ public Executor addExecutor(String host, int port)
+ throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * create an executor and insert in executors table.
+ * Note:-
+ * 1. throws an Exception in case of a SQL issue
+ * 2. throws an Exception if there is no executor with the given id
+ * 3. return null when no executor is found with the given executorId
+ * </pre>
+ *
+ * @param executorId
+ * @throws ExecutorManagerException
+ */
+ public void updateExecutor(Executor executor) throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Log an event in executor_event audit table Note:- throws an Exception in
+ * case of a SQL issue
+ * Note: throws an Exception in case of a SQL issue
+ * </pre>
+ *
+ * @param executor
+ * @param type
+ * @param user
+ * @param message
+ * @return isSuccess
+ */
+ public void postExecutorEvent(Executor executor, EventType type, String user,
+ String message) throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * This method is to fetch events recorded in executor audit table, inserted
+ * by postExecutorEvents with a given executor, starting from skip
+ * Note:-
+ * 1. throws an Exception in case of a SQL issue
+ * 2. Returns an empty list in case of no events
+ * </pre>
+ *
+ * @param executor
+ * @param num
+ * @param skip
+ * @return List<ExecutorLogEvent>
+ * @throws ExecutorManagerException
+ */
+ List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
+ int offset) throws ExecutorManagerException;
+
public void addActiveExecutableReference(ExecutionReference ref)
throws ExecutorManagerException;
public void removeActiveExecutableReference(int execId)
throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Unset executor Id for an execution
+ * Note:-
+ * throws an Exception in case of a SQL issue
+ * </pre>
+ *
+ * @param executorId
+ * @param execId
+ * @throws ExecutorManagerException
+ */
+ public void unassignExecutor(int executionId) throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Set an executor Id to an execution
+ * Note:-
+ * 1. throws an Exception in case of a SQL issue
+ * 2. throws an Exception in case executionId or executorId do not exist
+ * </pre>
+ *
+ * @param executorId
+ * @param execId
+ * @throws ExecutorManagerException
+ */
+ public void assignExecutor(int executorId, int execId)
+ throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Fetches an executor corresponding to a given execution
+ * Note:-
+ * 1. throws an Exception in case of a SQL issue
+ * 2. return null when no executor is found with the given executionId
+ * </pre>
+ *
+ * @param executionId
+ * @return fetched Executor
+ * @throws ExecutorManagerException
+ */
+ public Executor fetchExecutorByExecutionId(int executionId)
+ throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Fetch queued flows which have not yet dispatched
+ * Note:
+ * 1. throws an Exception in case of a SQL issue
+ * 2. return empty list when no queued execution is found
+ * </pre>
+ *
+ * @return List of queued flows and corresponding execution reference
+ * @throws ExecutorManagerException
+ */
+ public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+ throws ExecutorManagerException;
+
public boolean updateExecutableReference(int execId, long updateTime)
throws ExecutorManagerException;
@@ -105,5 +281,4 @@ public interface ExecutorLoader {
public int removeExecutionLogsByTime(long millis)
throws ExecutorManagerException;
-
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
new file mode 100644
index 0000000..5598a0d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.Date;
+
+/**
+ * Class to represent events on Azkaban executors
+ *
+ * @author gaggarwa
+ */
+public class ExecutorLogEvent {
+ /**
+ * Log event type messages. Do not change the numeric representation of each
+ * enum. Only represent from 0 to 255 different codes.
+ */
+ public static enum EventType {
+ ERROR(128), HOST_UPDATE(1), PORT_UPDATE(2), ACTIVATION(3), INACTIVATION(4),
+ CREATED(5);
+
+ private int numVal;
+
+ EventType(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static EventType fromInteger(int x)
+ throws IllegalArgumentException {
+ switch (x) {
+ case 1:
+ return HOST_UPDATE;
+ case 2:
+ return PORT_UPDATE;
+ case 3:
+ return ACTIVATION;
+ case 4:
+ return INACTIVATION;
+ case 5:
+ return CREATED;
+ case 128:
+ return ERROR;
+ default:
+ throw new IllegalArgumentException(String.format(
+ "inalid status code %d", x));
+ }
+ }
+ }
+
+ private final int executorId;
+ private final String user;
+ private final Date time;
+ private final EventType type;
+ private final String message;
+
+ public ExecutorLogEvent(int executorId, String user, Date time,
+ EventType type, String message) {
+ this.executorId = executorId;
+ this.user = user;
+ this.time = time;
+ this.type = type;
+ this.message = message;
+ }
+
+ public int getExecutorId() {
+ return executorId;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public Date getTime() {
+ return time;
+ }
+
+ public EventType getType() {
+ return type;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 7fb61bd..1a46230 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -20,23 +20,25 @@ import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.impl.client.BasicResponseHandler;
-import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
@@ -44,6 +46,9 @@ import azkaban.alert.Alerter;
import azkaban.event.Event;
import azkaban.event.Event.Type;
import azkaban.event.EventHandler;
+import azkaban.executor.selector.ExecutorComparator;
+import azkaban.executor.selector.ExecutorFilter;
+import azkaban.executor.selector.ExecutorSelector;
import azkaban.project.Project;
import azkaban.project.ProjectWhitelist;
import azkaban.scheduler.ScheduleStatisticManager;
@@ -59,10 +64,27 @@ import azkaban.utils.Props;
*/
public class ExecutorManager extends EventHandler implements
ExecutorManagerAdapter {
+ static final String AZKABAN_EXECUTOR_SELECTOR_FILTERS =
+ "azkaban.executorselector.filters";
+ static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
+ "azkaban.executorselector.comparator.";
+ static final String AZKABAN_QUEUEPROCESSING_ENABLED =
+ "azkaban.queueprocessing.enabled";
+ static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
+ "azkaban.use.multiple.executors";
+ private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
+ "azkaban.webserver.queue.size";
+ private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS =
+ "azkaban.activeexecutor.refresh.milisecinterval";
+ private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
+ "azkaban.activeexecutor.refresh.flowinterval";
+ private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
+ "azkaban.executorinfo.refresh.maxThreads";
+ private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED =
+ "azkaban.maxDispatchingErrors";
+
private static Logger logger = Logger.getLogger(ExecutorManager.class);
private ExecutorLoader executorLoader;
- private String executorHost;
- private int executorPort;
private CleanerThread cleanerThread;
@@ -71,8 +93,13 @@ public class ExecutorManager extends EventHandler implements
private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
new ConcurrentHashMap<Integer, ExecutableFlow>();
- private ExecutingManagerUpdaterThread executingManager;
+ QueuedExecutions queuedFlows;
+
+ final private Set<Executor> activeExecutors = new HashSet<Executor>();
+ private QueueProcessorThread queueProcessor;
+ private ExecutingManagerUpdaterThread executingManager;
+ // 12 weeks
private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
* 24 * 60 * 60 * 1000L;
private long lastCleanerThreadCheckTime = -1;
@@ -84,28 +111,252 @@ public class ExecutorManager extends EventHandler implements
File cacheDir;
- public ExecutorManager(Props props, ExecutorLoader loader,
- Map<String, Alerter> alters) throws ExecutorManagerException {
+ private final Props azkProps;
+ private List<String> filterList;
+ private Map<String, Integer> comparatorWeightsMap;
+ private long lastSuccessfulExecutorInfoRefresh;
+ private ExecutorService executorInforRefresherService;
+
+ public ExecutorManager(Props azkProps, ExecutorLoader loader,
+ Map<String, Alerter> alerters) throws ExecutorManagerException {
+ this.alerters = alerters;
+ this.azkProps = azkProps;
this.executorLoader = loader;
+ this.setupExecutors();
this.loadRunningFlows();
- executorHost = props.getString("executor.host", "localhost");
- executorPort = props.getInt("executor.port");
- alerters = alters;
+ queuedFlows =
+ new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+ this.loadQueuedFlows();
- cacheDir = new File(props.getString("cache.directory", "cache"));
+ cacheDir = new File(azkProps.getString("cache.directory", "cache"));
executingManager = new ExecutingManagerUpdaterThread();
executingManager.start();
+ if(isMultiExecutorMode()) {
+ setupMultiExecutorMode();
+ }
+
long executionLogsRetentionMs =
- props.getLong("execution.logs.retention.ms",
- DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+ azkProps.getLong("execution.logs.retention.ms",
+ DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+
cleanerThread = new CleanerThread(executionLogsRetentionMs);
cleanerThread.start();
}
+ private void setupMultiExecutorMode() {
+ // initliatize hard filters for executor selector from azkaban.properties
+ String filters = azkProps.getString(AZKABAN_EXECUTOR_SELECTOR_FILTERS, "");
+ if (filters != null) {
+ filterList = Arrays.asList(StringUtils.split(filters, ","));
+ }
+
+ // initliatize comparator feature weights for executor selector from
+ // azkaban.properties
+ Map<String, String> compListStrings =
+ azkProps.getMapByPrefix(AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
+ if (compListStrings != null) {
+ comparatorWeightsMap = new TreeMap<String, Integer>();
+ for (Map.Entry<String, String> entry : compListStrings.entrySet()) {
+ comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
+ }
+ }
+
+ executorInforRefresherService =
+ Executors.newFixedThreadPool(azkProps.getInt(
+ AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));
+
+ // configure queue processor
+ queueProcessor =
+ new QueueProcessorThread(azkProps.getBoolean(
+ AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
+ AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000), azkProps.getInt(
+ AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5), azkProps.getInt(
+ AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED, activeExecutors.size()));
+
+ queueProcessor.start();
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorManagerAdapter#setupExecutors()
+ */
+ @Override
+ public void setupExecutors() throws ExecutorManagerException {
+ Set<Executor> newExecutors = new HashSet<Executor>();
+
+ if (isMultiExecutorMode()) {
+ logger.info("Initializing multi executors from database");
+ newExecutors.addAll(executorLoader.fetchActiveExecutors());
+ } else if (azkProps.containsKey("executor.port")) {
+ // Add local executor, if specified as per properties
+ String executorHost = azkProps.getString("executor.host", "localhost");
+ int executorPort = azkProps.getInt("executor.port");
+ logger.info(String.format("Initializing local executor %s:%d",
+ executorHost, executorPort));
+ Executor executor =
+ executorLoader.fetchExecutor(executorHost, executorPort);
+ if (executor == null) {
+ executor = executorLoader.addExecutor(executorHost, executorPort);
+ } else if (!executor.isActive()) {
+ executor.setActive(true);
+ executorLoader.updateExecutor(executor);
+ }
+ newExecutors.add(new Executor(executor.getId(), executorHost,
+ executorPort, true));
+ }
+
+ if (newExecutors.isEmpty()) {
+ logger.error("No active executor found");
+ throw new ExecutorManagerException("No active executor found");
+ } else if(newExecutors.size() > 1 && !isMultiExecutorMode()) {
+ logger.error("Multiple local executors specified");
+ throw new ExecutorManagerException("Multiple local executors specified");
+ } else {
+ // clear all active executors, only if we have at least one new active
+ // executors
+ activeExecutors.clear();
+ activeExecutors.addAll(newExecutors);
+ }
+ }
+
+ private boolean isMultiExecutorMode() {
+ return azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false);
+ }
+
+ /**
+ * Refresh Executor stats for all the actie executors in this executorManager
+ */
+ private void refreshExecutors() {
+ synchronized (activeExecutors) {
+
+ List<Pair<Executor, Future<String>>> futures =
+ new ArrayList<Pair<Executor, Future<String>>>();
+ for (final Executor executor : activeExecutors) {
+ // execute each executorInfo refresh task to fetch
+ Future<String> fetchExecutionInfo =
+ executorInforRefresherService.submit(new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ return callExecutorForJsonString(executor.getHost(),
+ executor.getPort(), "/serverStatistics", null);
+ }
+ });
+ futures.add(new Pair<Executor, Future<String>>(executor,
+ fetchExecutionInfo));
+ }
+
+ boolean wasSuccess = true;
+ for (Pair<Executor, Future<String>> refreshPair : futures) {
+ Executor executor = refreshPair.getFirst();
+ executor.setExecutorInfo(null); // invalidate cached EXecutorInfo
+ try {
+ // max 5 secs
+ String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
+ executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
+ logger.info(String.format(
+ "Successfully refreshed executor: %s with executor info : %s",
+ executor, jsonString));
+ } catch (TimeoutException e) {
+ wasSuccess = false;
+ logger.error("Timed out while waiting for ExecutorInfo refresh"
+ + executor, e);
+ } catch (Exception e) {
+ wasSuccess = false;
+ logger.error("Failed to update ExecutorInfo for executor : "
+ + executor, e);
+ }
+ }
+
+ // update is successful for all executors
+ if (wasSuccess) {
+ lastSuccessfulExecutorInfoRefresh = System.currentTimeMillis();
+ }
+ }
+ }
+
+ /**
+ * Throws exception if running in local mode
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorManagerAdapter#disableQueueProcessorThread()
+ */
+ @Override
+ public void disableQueueProcessorThread() throws ExecutorManagerException {
+ if (isMultiExecutorMode()) {
+ queueProcessor.setActive(false);
+ } else {
+ throw new ExecutorManagerException(
+ "Cannot disable QueueProcessor in local mode");
+ }
+ }
+
+ /**
+ * Throws exception if running in local mode
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorManagerAdapter#enableQueueProcessorThread()
+ */
+ @Override
+ public void enableQueueProcessorThread() throws ExecutorManagerException {
+ if (isMultiExecutorMode()) {
+ queueProcessor.setActive(true);
+ } else {
+ throw new ExecutorManagerException(
+ "Cannot enable QueueProcessor in local mode");
+ }
+ }
+
+ public State getQueueProcessorThreadState() {
+ if (isMultiExecutorMode())
+ return queueProcessor.getState();
+ else
+ return State.NEW; // not started in local mode
+ }
+
+ /**
+ * Returns state of QueueProcessor False, no flow is being dispatched True ,
+ * flows are being dispatched as expected
+ *
+ * @return
+ */
+ public boolean isQueueProcessorThreadActive() {
+ if (isMultiExecutorMode())
+ return queueProcessor.isActive();
+ else
+ return false;
+ }
+
+ /**
+ * Return last Successful ExecutorInfo Refresh for all active executors
+ *
+ * @return
+ */
+ public long getLastSuccessfulExecutorInfoRefresh() {
+ return this.lastSuccessfulExecutorInfoRefresh;
+ }
+
+ /**
+ * Get currently supported Comparators available to use via azkaban.properties
+ *
+ * @return
+ */
+ public Set<String> getAvailableExecutorComparatorNames() {
+ return ExecutorComparator.getAvailableComparatorNames();
+
+ }
+
+ /**
+ * Get currently supported filters available to use via azkaban.properties
+ *
+ * @return
+ */
+ public Set<String> getAvailableExecutorFilterNames() {
+ return ExecutorFilter.getAvailableFilterNames();
+ }
+
@Override
public State getExecutorManagerThreadState() {
return executingManager.getState();
@@ -130,10 +381,33 @@ public class ExecutorManager extends EventHandler implements
}
@Override
+ public Collection<Executor> getAllActiveExecutors() {
+ return Collections.unmodifiableCollection(activeExecutors);
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#fetchExecutor(int)
+ */
+ @Override
+ public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
+ for (Executor executor : activeExecutors) {
+ if (executor.getId() == executorId) {
+ return executor;
+ }
+ }
+ return executorLoader.fetchExecutor(executorId);
+ }
+
+ @Override
public Set<String> getPrimaryServerHosts() {
// Only one for now. More probably later.
HashSet<String> ports = new HashSet<String>();
- ports.add(executorHost + ":" + executorPort);
+ for (Executor executor : activeExecutors) {
+ ports.add(executor.getHost() + ":" + executor.getPort());
+ }
return ports;
}
@@ -141,71 +415,211 @@ public class ExecutorManager extends EventHandler implements
public Set<String> getAllActiveExecutorServerHosts() {
// Includes non primary server/hosts
HashSet<String> ports = new HashSet<String>();
- ports.add(executorHost + ":" + executorPort);
+ for (Executor executor : activeExecutors) {
+ ports.add(executor.getHost() + ":" + executor.getPort());
+ }
+ // include executor which were initially active and still has flows running
for (Pair<ExecutionReference, ExecutableFlow> running : runningFlows
- .values()) {
+ .values()) {
ExecutionReference ref = running.getFirst();
ports.add(ref.getHost() + ":" + ref.getPort());
}
-
return ports;
}
private void loadRunningFlows() throws ExecutorManagerException {
runningFlows.putAll(executorLoader.fetchActiveFlows());
+ // Finalize all flows which were running on an executor which is now
+ // inactive
+ for (Pair<ExecutionReference, ExecutableFlow> pair : runningFlows.values()) {
+ if (!activeExecutors.contains(pair.getFirst().getExecutor())) {
+ finalizeFlows(pair.getSecond());
+ }
+ }
}
+ /*
+ * load queued flows i.e with active_execution_reference and not assigned to
+ * any executor
+ */
+ private void loadQueuedFlows() throws ExecutorManagerException {
+ List<Pair<ExecutionReference, ExecutableFlow>> retrievedExecutions =
+ executorLoader.fetchQueuedFlows();
+ if (retrievedExecutions != null) {
+ for (Pair<ExecutionReference, ExecutableFlow> pair : retrievedExecutions) {
+ queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
+ }
+ }
+ }
+
+ /**
+ * Gets a list of all the active (running flows and non-dispatched flows)
+ * executions for a given project and flow {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int,
+ * java.lang.String)
+ */
@Override
public List<Integer> getRunningFlows(int projectId, String flowId) {
- ArrayList<Integer> executionIds = new ArrayList<Integer>();
- for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ List<Integer> executionIds = new ArrayList<Integer>();
+ executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+ queuedFlows.getAllEntries()));
+ executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+ runningFlows.values()));
+ return executionIds;
+ }
+
+ /* Helper method for getRunningFlows */
+ private List<Integer> getRunningFlowsHelper(int projectId, String flowId,
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ List<Integer> executionIds = new ArrayList<Integer>();
+ for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
if (ref.getSecond().getFlowId().equals(flowId)
- && ref.getSecond().getProjectId() == projectId) {
+ && ref.getSecond().getProjectId() == projectId) {
executionIds.add(ref.getFirst().getExecId());
}
}
return executionIds;
}
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getActiveFlowsWithExecutor()
+ */
+ @Override
+ public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
+ throws IOException {
+ List<Pair<ExecutableFlow, Executor>> flows =
+ new ArrayList<Pair<ExecutableFlow, Executor>>();
+ getActiveFlowsWithExecutorHelper(flows, queuedFlows.getAllEntries());
+ getActiveFlowsWithExecutorHelper(flows, runningFlows.values());
+ return flows;
+ }
+
+ /* Helper method for getActiveFlowsWithExecutor */
+ private void getActiveFlowsWithExecutorHelper(
+ List<Pair<ExecutableFlow, Executor>> flows,
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+ flows.add(new Pair<ExecutableFlow, Executor>(ref.getSecond(), ref
+ .getFirst().getExecutor()));
+ }
+ }
+
+ /**
+ * Checks whether the given flow has an active (running, non-dispatched)
+ * executions {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#isFlowRunning(int,
+ * java.lang.String)
+ */
@Override
public boolean isFlowRunning(int projectId, String flowId) {
- for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ boolean isRunning = false;
+ isRunning =
+ isRunning
+ || isFlowRunningHelper(projectId, flowId, queuedFlows.getAllEntries());
+ isRunning =
+ isRunning
+ || isFlowRunningHelper(projectId, flowId, runningFlows.values());
+ return isRunning;
+ }
+
+ /* Search a running flow in a collection */
+ private boolean isFlowRunningHelper(int projectId, String flowId,
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
if (ref.getSecond().getProjectId() == projectId
- && ref.getSecond().getFlowId().equals(flowId)) {
+ && ref.getSecond().getFlowId().equals(flowId)) {
return true;
}
}
return false;
}
+ /**
+ * Fetch ExecutableFlow from an active (running, non-dispatched) or from
+ * database {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getExecutableFlow(int)
+ */
@Override
public ExecutableFlow getExecutableFlow(int execId)
- throws ExecutorManagerException {
- Pair<ExecutionReference, ExecutableFlow> active = runningFlows.get(execId);
- if (active == null) {
+ throws ExecutorManagerException {
+ if (runningFlows.containsKey(execId)) {
+ return runningFlows.get(execId).getSecond();
+ } else if (queuedFlows.hasExecution(execId)) {
+ return queuedFlows.getFlow(execId);
+ } else {
return executorLoader.fetchExecutableFlow(execId);
}
- return active.getSecond();
}
+ /**
+ * Get all active (running, non-dispatched) flows
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+ */
@Override
public List<ExecutableFlow> getRunningFlows() {
ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
- for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ getActiveFlowHelper(flows, queuedFlows.getAllEntries());
+ getActiveFlowHelper(flows, runningFlows.values());
+ return flows;
+ }
+
+ /*
+ * Helper method to get all running flows from a Pair<ExecutionReference,
+ * ExecutableFlow collection
+ */
+ private void getActiveFlowHelper(ArrayList<ExecutableFlow> flows,
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
flows.add(ref.getSecond());
}
- return flows;
}
+ /**
+ * Get execution Ids of all active (running, non-dispatched) flows
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+ */
public String getRunningFlowIds() {
List<Integer> allIds = new ArrayList<Integer>();
- for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
- allIds.add(ref.getSecond().getExecutionId());
- }
+ getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
+ getRunningFlowsIdsHelper(allIds, runningFlows.values());
Collections.sort(allIds);
return allIds.toString();
}
+ /**
+ * Get execution Ids of all non-dispatched flows
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+ */
+ public String getQueuedFlowIds() {
+ List<Integer> allIds = new ArrayList<Integer>();
+ getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
+ Collections.sort(allIds);
+ return allIds.toString();
+ }
+
+ /* Helper method to flow ids of all running flows */
+ private void getRunningFlowsIdsHelper(List<Integer> allIds,
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+ allIds.add(ref.getSecond().getExecutionId());
+ }
+ }
+
public List<ExecutableFlow> getRecentlyFinishedFlows() {
return new ArrayList<ExecutableFlow>(recentlyFinished.values());
}
@@ -371,18 +785,30 @@ public class ExecutorManager extends EventHandler implements
}
}
+ /**
+ * if flows was dispatched to an executor, cancel by calling Executor else if
+ * flow is still in queue, remove from queue and finalize {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#cancelFlow(azkaban.executor.ExecutableFlow,
+ * java.lang.String)
+ */
@Override
public void cancelFlow(ExecutableFlow exFlow, String userId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
synchronized (exFlow) {
- Pair<ExecutionReference, ExecutableFlow> pair =
+ if (runningFlows.containsKey(exFlow.getExecutionId())) {
+ Pair<ExecutionReference, ExecutableFlow> pair =
runningFlows.get(exFlow.getExecutionId());
- if (pair == null) {
+ callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
+ userId);
+ } else if (queuedFlows.hasExecution(exFlow.getExecutionId())) {
+ queuedFlows.dequeue(exFlow.getExecutionId());
+ finalizeFlows(exFlow);
+ } else {
throw new ExecutorManagerException("Execution "
- + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
- + " isn't running.");
+ + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ + " isn't running.");
}
- callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION, userId);
}
}
@@ -539,80 +965,95 @@ public class ExecutorManager extends EventHandler implements
@Override
public String submitExecutableFlow(ExecutableFlow exflow, String userId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
synchronized (exflow) {
- logger.info("Submitting execution flow " + exflow.getFlowId() + " by "
- + userId);
-
- int projectId = exflow.getProjectId();
String flowId = exflow.getFlowId();
- exflow.setSubmitUser(userId);
- exflow.setSubmitTime(System.currentTimeMillis());
- List<Integer> running = getRunningFlows(projectId, flowId);
-
- ExecutionOptions options = exflow.getExecutionOptions();
- if (options == null) {
- options = new ExecutionOptions();
- }
+ logger.info("Submitting execution flow " + flowId + " by " + userId);
String message = "";
- if (options.getDisabledJobs() != null) {
- applyDisabledJobs(options.getDisabledJobs(), exflow);
- }
+ if (queuedFlows.isFull()) {
+ message =
+ String
+ .format(
+ "Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
+ flowId, exflow.getProjectName());
+ logger.error(message);
+ } else {
+ int projectId = exflow.getProjectId();
+ exflow.setSubmitUser(userId);
+ exflow.setSubmitTime(System.currentTimeMillis());
+
+ List<Integer> running = getRunningFlows(projectId, flowId);
+
+ ExecutionOptions options = exflow.getExecutionOptions();
+ if (options == null) {
+ options = new ExecutionOptions();
+ }
- if (!running.isEmpty()) {
- if (options.getConcurrentOption().equals(
+ if (options.getDisabledJobs() != null) {
+ applyDisabledJobs(options.getDisabledJobs(), exflow);
+ }
+
+ if (!running.isEmpty()) {
+ if (options.getConcurrentOption().equals(
ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
- Collections.sort(running);
- Integer runningExecId = running.get(running.size() - 1);
+ Collections.sort(running);
+ Integer runningExecId = running.get(running.size() - 1);
- options.setPipelineExecutionId(runningExecId);
- message =
+ options.setPipelineExecutionId(runningExecId);
+ message =
"Flow " + flowId + " is already running with exec id "
- + runningExecId + ". Pipelining level "
- + options.getPipelineLevel() + ". \n";
- } else if (options.getConcurrentOption().equals(
+ + runningExecId + ". Pipelining level "
+ + options.getPipelineLevel() + ". \n";
+ } else if (options.getConcurrentOption().equals(
ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
- throw new ExecutorManagerException("Flow " + flowId
+ throw new ExecutorManagerException("Flow " + flowId
+ " is already running. Skipping execution.",
ExecutorManagerException.Reason.SkippedExecution);
- } else {
- // The settings is to run anyways.
- message =
+ } else {
+ // The settings is to run anyways.
+ message =
"Flow " + flowId + " is already running with exec id "
- + StringUtils.join(running, ",")
- + ". Will execute concurrently. \n";
+ + StringUtils.join(running, ",")
+ + ". Will execute concurrently. \n";
+ }
}
- }
- boolean memoryCheck = !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
- ProjectWhitelist.WhitelistType.MemoryCheck);
- options.setMemoryCheck(memoryCheck);
-
- // The exflow id is set by the loader. So it's unavailable until after
- // this call.
- executorLoader.uploadExecutableFlow(exflow);
-
- // We create an active flow reference in the datastore. If the upload
- // fails, we remove the reference.
- ExecutionReference reference =
- new ExecutionReference(exflow.getExecutionId(), executorHost,
- executorPort);
- executorLoader.addActiveExecutableReference(reference);
- try {
- callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
- runningFlows.put(exflow.getExecutionId(),
- new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+ boolean memoryCheck =
+ !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
+ ProjectWhitelist.WhitelistType.MemoryCheck);
+ options.setMemoryCheck(memoryCheck);
+ // The exflow id is set by the loader. So it's unavailable until after
+ // this call.
+ executorLoader.uploadExecutableFlow(exflow);
+
+ // We create an active flow reference in the datastore. If the upload
+ // fails, we remove the reference.
+ ExecutionReference reference =
+ new ExecutionReference(exflow.getExecutionId());
+
+ if (isMultiExecutorMode()) {
+ //Take MultiExecutor route
+ executorLoader.addActiveExecutableReference(reference);
+ queuedFlows.enqueue(exflow, reference);
+ } else {
+ // assign only local executor we have
+ Executor choosenExecutor = activeExecutors.iterator().next();
+ executorLoader.addActiveExecutableReference(reference);
+ try {
+ dispatch(reference, exflow, choosenExecutor);
+ } catch (ExecutorManagerException e) {
+ executorLoader.removeActiveExecutableReference(reference
+ .getExecId());
+ throw e;
+ }
+ }
message +=
- "Execution submitted successfully with exec id "
- + exflow.getExecutionId();
- } catch (ExecutorManagerException e) {
- executorLoader.removeActiveExecutableReference(reference.getExecId());
- throw e;
+ "Execution submitted successfully with exec id "
+ + exflow.getExecutionId();
}
-
return message;
}
}
@@ -626,11 +1067,11 @@ public class ExecutorManager extends EventHandler implements
}
}
- private Map<String, Object> callExecutorServer(ExecutionReference ref,
- String action) throws ExecutorManagerException {
+ private Map<String, Object> callExecutorServer(ExecutableFlow exflow,
+ Executor executor, String action) throws ExecutorManagerException {
try {
- return callExecutorServer(ref.getHost(), ref.getPort(), action,
- ref.getExecId(), null, (Pair<String, String>[]) null);
+ return callExecutorServer(executor.getHost(), executor.getPort(), action,
+ exflow.getExecutionId(), null, (Pair<String, String>[]) null);
} catch (IOException e) {
throw new ExecutorManagerException(e);
}
@@ -671,150 +1112,114 @@ public class ExecutorManager extends EventHandler implements
private Map<String, Object> callExecutorServer(String host, int port,
String action, Integer executionId, String user,
Pair<String, String>... params) throws IOException {
- URIBuilder builder = new URIBuilder();
- builder.setScheme("http").setHost(host).setPort(port).setPath("/executor");
-
- builder.setParameter(ConnectorParams.ACTION_PARAM, action);
+ List<Pair<String, String>> paramList = new ArrayList<Pair<String,String>>();
- if (executionId != null) {
- builder.setParameter(ConnectorParams.EXECID_PARAM,
- String.valueOf(executionId));
+ // if params = null
+ if(params != null) {
+ paramList.addAll(Arrays.asList(params));
}
- if (user != null) {
- builder.setParameter(ConnectorParams.USER_PARAM, user);
- }
-
- if (params != null) {
- for (Pair<String, String> pair : params) {
- builder.setParameter(pair.getFirst(), pair.getSecond());
- }
- }
+ paramList
+ .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
+ paramList.add(new Pair<String, String>(ConnectorParams.EXECID_PARAM, String
+ .valueOf(executionId)));
+ paramList.add(new Pair<String, String>(ConnectorParams.USER_PARAM, user));
- URI uri = null;
- try {
- uri = builder.build();
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
+ Map<String, Object> jsonResponse =
+ callExecutorForJsonObject(host, port, "/executor", paramList);
- ResponseHandler<String> responseHandler = new BasicResponseHandler();
+ return jsonResponse;
+ }
- HttpClient httpclient = new DefaultHttpClient();
- HttpGet httpget = new HttpGet(uri);
- String response = null;
- try {
- response = httpclient.execute(httpget, responseHandler);
- } catch (IOException e) {
- throw e;
- } finally {
- httpclient.getConnectionManager().shutdown();
- }
+ /*
+ * Helper method used by ExecutorManager to call executor and return json
+ * object map
+ */
+ private Map<String, Object> callExecutorForJsonObject(String host, int port,
+ String path, List<Pair<String, String>> paramList) throws IOException {
+ String responseString =
+ callExecutorForJsonString(host, port, path, paramList);
@SuppressWarnings("unchecked")
Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+ (Map<String, Object>) JSONUtils.parseJSONFromString(responseString);
String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
if (error != null) {
throw new IOException(error);
}
-
return jsonResponse;
}
+ /*
+ * Helper method used by ExecutorManager to call executor and return raw json
+ * string
+ */
+ private String callExecutorForJsonString(String host, int port, String path,
+ List<Pair<String, String>> paramList) throws IOException {
+ if (paramList == null) {
+ paramList = new ArrayList<Pair<String, String>>();
+ }
+
+ ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
+ @SuppressWarnings("unchecked")
+ URI uri =
+ ExecutorApiClient.buildUri(host, port, path, true,
+ paramList.toArray(new Pair[0]));
+
+ return apiclient.httpGet(uri, null);
+ }
+
/**
* Manage servlet call for stats servlet in Azkaban execution server
* {@inheritDoc}
- * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String, azkaban.utils.Pair[])
+ *
+ * @throws ExecutorManagerException
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String,
+ * azkaban.utils.Pair[])
*/
@Override
- public Map<String, Object> callExecutorStats(String action, Pair<String, String>... params) throws IOException {
-
- URIBuilder builder = new URIBuilder();
- builder.setScheme("http").setHost(executorHost).setPort(executorPort).setPath("/stats");
+ public Map<String, Object> callExecutorStats(int executorId, String action,
+ Pair<String, String>... params) throws IOException, ExecutorManagerException {
+ Executor executor = fetchExecutor(executorId);
- builder.setParameter(ConnectorParams.ACTION_PARAM, action);
+ List<Pair<String, String>> paramList =
+ new ArrayList<Pair<String, String>>();
+ // if params = null
if (params != null) {
- for (Pair<String, String> pair : params) {
- builder.setParameter(pair.getFirst(), pair.getSecond());
- }
+ paramList.addAll(Arrays.asList(params));
}
- URI uri = null;
- try {
- uri = builder.build();
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
-
- ResponseHandler<String> responseHandler = new BasicResponseHandler();
+ paramList
+ .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
- HttpClient httpclient = new DefaultHttpClient();
- HttpGet httpget = new HttpGet(uri);
- String response = null;
- try {
- response = httpclient.execute(httpget, responseHandler);
- } catch (IOException e) {
- throw e;
- } finally {
- httpclient.getConnectionManager().shutdown();
- }
-
- @SuppressWarnings("unchecked")
- Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(response);
-
- return jsonResponse;
+ return callExecutorForJsonObject(executor.getHost(), executor.getPort(),
+ "/stats", paramList);
}
@Override
public Map<String, Object> callExecutorJMX(String hostPort, String action,
String mBean) throws IOException {
- URIBuilder builder = new URIBuilder();
-
- String[] hostPortSplit = hostPort.split(":");
- builder.setScheme("http").setHost(hostPortSplit[0])
- .setPort(Integer.parseInt(hostPortSplit[1])).setPath("/jmx");
+ List<Pair<String, String>> paramList =
+ new ArrayList<Pair<String, String>>();
- builder.setParameter(action, "");
- if (mBean != null) {
- builder.setParameter(ConnectorParams.JMX_MBEAN, mBean);
+ paramList.add(new Pair<String, String>(action, ""));
+ if(mBean != null) {
+ paramList.add(new Pair<String, String>(ConnectorParams.JMX_MBEAN, mBean));
}
- URI uri = null;
- try {
- uri = builder.build();
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
-
- ResponseHandler<String> responseHandler = new BasicResponseHandler();
-
- HttpClient httpclient = new DefaultHttpClient();
- HttpGet httpget = new HttpGet(uri);
- String response = null;
- try {
- response = httpclient.execute(httpget, responseHandler);
- } catch (IOException e) {
- throw e;
- } finally {
- httpclient.getConnectionManager().shutdown();
- }
-
- @SuppressWarnings("unchecked")
- Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(response);
- String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
- if (error != null) {
- throw new IOException(error);
- }
- return jsonResponse;
+ String[] hostPortSplit = hostPort.split(":");
+ return callExecutorForJsonObject(hostPortSplit[0],
+ Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
}
@Override
public void shutdown() {
+ if (isMultiExecutorMode()) {
+ queueProcessor.shutdown();
+ }
executingManager.shutdown();
}
@@ -846,7 +1251,7 @@ public class ExecutorManager extends EventHandler implements
lastThreadCheckTime = System.currentTimeMillis();
updaterStage = "Starting update all flows.";
- Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap =
+ Map<Executor, List<ExecutableFlow>> exFlowMap =
getFlowToExecutorMap();
ArrayList<ExecutableFlow> finishedFlows =
new ArrayList<ExecutableFlow>();
@@ -854,16 +1259,16 @@ public class ExecutorManager extends EventHandler implements
new ArrayList<ExecutableFlow>();
if (exFlowMap.size() > 0) {
- for (Map.Entry<ConnectionInfo, List<ExecutableFlow>> entry : exFlowMap
+ for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
.entrySet()) {
List<Long> updateTimesList = new ArrayList<Long>();
List<Integer> executionIdsList = new ArrayList<Integer>();
- ConnectionInfo connection = entry.getKey();
+ Executor executor = entry.getKey();
updaterStage =
- "Starting update flows on " + connection.getHost() + ":"
- + connection.getPort();
+ "Starting update flows on " + executor.getHost() + ":"
+ + executor.getPort();
// We pack the parameters of the same host together before we
// query.
@@ -881,8 +1286,8 @@ public class ExecutorManager extends EventHandler implements
Map<String, Object> results = null;
try {
results =
- callExecutorServer(connection.getHost(),
- connection.getPort(), ConnectorParams.UPDATE_ACTION,
+ callExecutorServer(executor.getHost(),
+ executor.getPort(), ConnectorParams.UPDATE_ACTION,
null, null, executionIds, updateTimes);
} catch (IOException e) {
logger.error(e);
@@ -1222,15 +1627,16 @@ public class ExecutorManager extends EventHandler implements
}
}
- private Map<ConnectionInfo, List<ExecutableFlow>> getFlowToExecutorMap() {
- HashMap<ConnectionInfo, List<ExecutableFlow>> exFlowMap =
- new HashMap<ConnectionInfo, List<ExecutableFlow>>();
+ /* Group Executable flow by Executors to reduce number of REST calls */
+ private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
+ HashMap<Executor, List<ExecutableFlow>> exFlowMap =
+ new HashMap<Executor, List<ExecutableFlow>>();
- ConnectionInfo lastPort = new ConnectionInfo(executorHost, executorPort);
for (Pair<ExecutionReference, ExecutableFlow> runningFlow : runningFlows
- .values()) {
+ .values()) {
ExecutionReference ref = runningFlow.getFirst();
ExecutableFlow flow = runningFlow.getSecond();
+ Executor executor = ref.getExecutor();
// We can set the next check time to prevent the checking of certain
// flows.
@@ -1238,16 +1644,10 @@ public class ExecutorManager extends EventHandler implements
continue;
}
- // Just a silly way to reduce object creation construction of objects
- // since it's most likely that the values will be the same.
- if (!lastPort.isEqual(ref.getHost(), ref.getPort())) {
- lastPort = new ConnectionInfo(ref.getHost(), ref.getPort());
- }
-
- List<ExecutableFlow> flows = exFlowMap.get(lastPort);
+ List<ExecutableFlow> flows = exFlowMap.get(executor);
if (flows == null) {
flows = new ArrayList<ExecutableFlow>();
- exFlowMap.put(lastPort, flows);
+ exFlowMap.put(executor, flows);
}
flows.add(flow);
@@ -1256,61 +1656,6 @@ public class ExecutorManager extends EventHandler implements
return exFlowMap;
}
- private static class ConnectionInfo {
- private String host;
- private int port;
-
- public ConnectionInfo(String host, int port) {
- this.host = host;
- this.port = port;
- }
-
- @SuppressWarnings("unused")
- private ConnectionInfo getOuterType() {
- return ConnectionInfo.this;
- }
-
- public boolean isEqual(String host, int port) {
- return this.port == port && this.host.equals(host);
- }
-
- public String getHost() {
- return host;
- }
-
- public int getPort() {
- return port;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((host == null) ? 0 : host.hashCode());
- result = prime * result + port;
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- ConnectionInfo other = (ConnectionInfo) obj;
- if (host == null) {
- if (other.host != null)
- return false;
- } else if (!host.equals(other.host))
- return false;
- if (port != other.port)
- return false;
- return true;
- }
- }
-
@Override
public int getExecutableFlows(int projectId, String flowId, int from,
int length, List<ExecutableFlow> outputList)
@@ -1384,4 +1729,257 @@ public class ExecutorManager extends EventHandler implements
- executionLogsRetentionMs);
}
}
+
+ /**
+ * Calls executor to dispatch the flow, update db to assign the executor and
+ * in-memory state of executableFlow
+ */
+ private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
+ Executor choosenExecutor) throws ExecutorManagerException {
+ exflow.setUpdateTime(System.currentTimeMillis());
+
+ executorLoader.assignExecutor(choosenExecutor.getId(),
+ exflow.getExecutionId());
+ try {
+ callExecutorServer(exflow, choosenExecutor,
+ ConnectorParams.EXECUTE_ACTION);
+ } catch (ExecutorManagerException ex) {
+ logger.error("Rolling back executor assignment for execution id:"
+ + exflow.getExecutionId(), ex);
+ executorLoader.unassignExecutor(exflow.getExecutionId());
+ throw new ExecutorManagerException(ex);
+ }
+ reference.setExecutor(choosenExecutor);
+
+ // move from flow to running flows
+ runningFlows.put(exflow.getExecutionId(),
+ new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+
+ logger.info(String.format(
+ "Successfully dispatched exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
+ }
+
+ /*
+ * This thread is responsible for processing queued flows using dispatcher and
+ * making rest api calls to executor server
+ */
+ private class QueueProcessorThread extends Thread {
+ private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
+ private final int maxDispatchingErrors;
+ private final long activeExecutorRefreshWindowInMilisec;
+ private final int activeExecutorRefreshWindowInFlows;
+
+ private volatile boolean shutdown = false;
+ private volatile boolean isActive = true;
+
+ public QueueProcessorThread(boolean isActive,
+ long activeExecutorRefreshWindowInTime,
+ int activeExecutorRefreshWindowInFlows,
+ int maxDispatchingErrors) {
+ setActive(isActive);
+ this.maxDispatchingErrors = maxDispatchingErrors;
+ this.activeExecutorRefreshWindowInFlows =
+ activeExecutorRefreshWindowInFlows;
+ this.activeExecutorRefreshWindowInMilisec =
+ activeExecutorRefreshWindowInTime;
+ this.setName("AzkabanWebServer-QueueProcessor-Thread");
+ }
+
+ public void setActive(boolean isActive) {
+ this.isActive = isActive;
+ logger.info("QueueProcessorThread active turned " + this.isActive);
+ }
+
+ public boolean isActive() {
+ return isActive;
+ }
+
+ public void shutdown() {
+ shutdown = true;
+ this.interrupt();
+ }
+
+ public void run() {
+ // Loops till QueueProcessorThread is shutdown
+ while (!shutdown) {
+ synchronized (this) {
+ try {
+ // start processing queue if active, other wait for sometime
+ if (isActive) {
+ processQueuedFlows(activeExecutorRefreshWindowInMilisec,
+ activeExecutorRefreshWindowInFlows);
+ }
+ wait(QUEUE_PROCESSOR_WAIT_IN_MS);
+ } catch (Exception e) {
+ logger.error(
+ "QueueProcessorThread Interrupted. Probably to shut down.", e);
+ }
+ }
+ }
+ }
+
+ /* Method responsible for processing the non-dispatched flows */
+ private void processQueuedFlows(long activeExecutorsRefreshWindow,
+ int maxContinuousFlowProcessed) throws InterruptedException,
+ ExecutorManagerException {
+ long lastExecutorRefreshTime = 0;
+ Pair<ExecutionReference, ExecutableFlow> runningCandidate;
+ int currentContinuousFlowProcessed = 0;
+
+ while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
+ ExecutionReference reference = runningCandidate.getFirst();
+ ExecutableFlow exflow = runningCandidate.getSecond();
+
+ long currentTime = System.currentTimeMillis();
+
+ // if we have dispatched more than maxContinuousFlowProcessed or
+ // It has been more then activeExecutorsRefreshWindow millisec since we
+ // refreshed
+ if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
+ || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
+ // Refresh executorInfo for all activeExecutors
+ refreshExecutors();
+ lastExecutorRefreshTime = currentTime;
+ currentContinuousFlowProcessed = 0;
+ }
+
+ /**
+ * <pre>
+ * TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
+ * Currently we try each queued flow once to infer a global busy state
+ * Possible improvements:-
+ * 1. Move system level filters in refreshExecutors and sleep if we have all executors busy after refresh
+ * 2. Implement GlobalSystemState in selector or in a third place to manage system filters. Basically
+ * taking out all the filters which do not depend on the flow but are still being part of Selector.
+ * Assumptions:-
+ * 1. no one else except QueueProcessor is updating ExecutableFlow update time
+ * 2. re-attempting a flow (which has been tried before) is considered as all executors are busy
+ * </pre>
+ */
+ if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
+ // put back in the queue
+ queuedFlows.enqueue(exflow, reference);
+ long sleepInterval =
+ activeExecutorsRefreshWindow
+ - (currentTime - lastExecutorRefreshTime);
+ // wait till next executor refresh
+ sleep(sleepInterval);
+ } else {
+ exflow.setUpdateTime(currentTime);
+ // process flow with current snapshot of activeExecutors
+ selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+ }
+ currentContinuousFlowProcessed++;
+ }
+ }
+
+ /* process flow with a snapshot of available Executors */
+ private void selectExecutorAndDispatchFlow(ExecutionReference reference,
+ ExecutableFlow exflow, Set<Executor> availableExecutors)
+ throws ExecutorManagerException {
+ synchronized (exflow) {
+ Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
+ if (selectedExecutor != null) {
+ try {
+ dispatch(reference, exflow, selectedExecutor);
+ } catch (ExecutorManagerException e) {
+ logger.warn(String.format(
+ "Executor %s responded with exception for exec: %d",
+ selectedExecutor, exflow.getExecutionId()), e);
+ handleDispatchExceptionCase(reference, exflow, selectedExecutor,
+ availableExecutors);
+ }
+ } else {
+ handleNoExecutorSelectedCase(reference, exflow);
+ }
+ }
+ }
+
+ /* Helper method to fetch overriding Executor, if a valid user has specifed otherwise return null */
+ private Executor getUserSpecifiedExecutor(ExecutionOptions options,
+ int executionId) {
+ Executor executor = null;
+ if (options != null
+ && options.getFlowParameters() != null
+ && options.getFlowParameters().containsKey(
+ ExecutionOptions.USE_EXECUTOR)) {
+ try {
+ int executorId =
+ Integer.valueOf(options.getFlowParameters().get(
+ ExecutionOptions.USE_EXECUTOR));
+ executor = fetchExecutor(executorId);
+
+ if (executor == null) {
+ logger
+ .warn(String
+ .format(
+ "User specified executor id: %d for execution id: %d is not active, Looking up db.",
+ executorId, executionId));
+ executor = executorLoader.fetchExecutor(executorId);
+ if (executor == null) {
+ logger
+ .warn(String
+ .format(
+ "User specified executor id: %d for execution id: %d is missing from db. Defaulting to availableExecutors",
+ executorId, executionId));
+ }
+ }
+ } catch (ExecutorManagerException ex) {
+ logger.error("Failed to fetch user specified executor for exec_id = "
+ + executionId, ex);
+ }
+ }
+ return executor;
+ }
+
+ /* Choose Executor for exflow among the available executors */
+ private Executor selectExecutor(ExecutableFlow exflow,
+ Set<Executor> availableExecutors) {
+ Executor choosenExecutor =
+ getUserSpecifiedExecutor(exflow.getExecutionOptions(),
+ exflow.getExecutionId());
+
+ // If no executor was specified by admin
+ if (choosenExecutor == null) {
+ logger.info("Using dispatcher for execution id :"
+ + exflow.getExecutionId());
+ ExecutorSelector selector = new ExecutorSelector(filterList, comparatorWeightsMap);
+ choosenExecutor = selector.getBest(availableExecutors, exflow);
+ }
+ return choosenExecutor;
+ }
+
+ private void handleDispatchExceptionCase(ExecutionReference reference,
+ ExecutableFlow exflow, Executor lastSelectedExecutor,
+ Set<Executor> remainingExecutors) throws ExecutorManagerException {
+ logger
+ .info(String
+ .format(
+ "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
+ reference.setNumErrors(reference.getNumErrors() + 1);
+ if (reference.getNumErrors() > this.maxDispatchingErrors
+ || remainingExecutors.size() <= 1) {
+ logger.error("Failed to process queued flow");
+ finalizeFlows(exflow);
+ } else {
+ remainingExecutors.remove(lastSelectedExecutor);
+ // try other executors except chosenExecutor
+ selectExecutorAndDispatchFlow(reference, exflow, remainingExecutors);
+ }
+ }
+
+ private void handleNoExecutorSelectedCase(ExecutionReference reference,
+ ExecutableFlow exflow) throws ExecutorManagerException {
+ logger
+ .info(String
+ .format(
+ "Reached handleNoExecutorSelectedCase stage for exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
+ // TODO: handle scenario where a high priority flow failing to get
+ // schedule can starve all others
+ queuedFlows.enqueue(exflow, reference);
+ }
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index 10379ec..c50b0bc 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -18,6 +18,7 @@ package azkaban.executor;
import java.io.IOException;
import java.lang.Thread.State;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -88,6 +89,18 @@ public interface ExecutorManagerAdapter {
public List<ExecutableFlow> getRunningFlows() throws IOException;
+ /**
+ * <pre>
+ * Returns All running with executors and queued flows
+ * Note, returns empty list if there isn't any running or queued flows
+ * </pre>
+ *
+ * @return
+ * @throws IOException
+ */
+ public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
+ throws IOException;
+
public List<ExecutableFlow> getRecentlyFinishedFlows();
public List<ExecutableFlow> getExecutableFlows(Project project,
@@ -177,9 +190,10 @@ public interface ExecutorManagerAdapter {
* <li>{@link azkaban.executor.ConnectorParams#STATS_SET_ENABLEMETRICS}<li>
* <li>{@link azkaban.executor.ConnectorParams#STATS_SET_DISABLEMETRICS}<li>
* </ul>
+ * @throws ExecutorManagerException
*/
- public Map<String, Object> callExecutorStats(String action,
- Pair<String, String>... params) throws IOException;
+ public Map<String, Object> callExecutorStats(int executorId, String action,
+ Pair<String, String>... param) throws IOException, ExecutorManagerException;
public Map<String, Object> callExecutorJMX(String hostPort, String action,
String mBean) throws IOException;
@@ -196,4 +210,54 @@ public interface ExecutorManagerAdapter {
public Set<? extends String> getPrimaryServerHosts();
+ /**
+ * Returns a collection of all the active executors maintained by active
+ * executors
+ *
+ * @return
+ */
+ public Collection<Executor> getAllActiveExecutors();
+
+ /**
+ * <pre>
+ * Fetch executor from executors with a given executorId
+ * Note:
+ * 1. throws an Exception in case of a SQL issue
+ * 2. return null when no executor is found with the given executorId
+ * </pre>
+ *
+ * @throws ExecutorManagerException
+ *
+ */
+ public Executor fetchExecutor(int executorId) throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Setup activeExecutors using azkaban.properties and database executors
+ * Note:
+ * 1. If azkaban.use.multiple.executors is set true, this method will
+ * load all active executors
+ * 2. In local mode, If a local executor is specified and it is missing from db,
+ * this method add local executor as active in DB
+ * 3. In local mode, If a local executor is specified and it is marked inactive in db,
+ * this method will convert local executor as active in DB
+ * </pre>
+ *
+ * @throws ExecutorManagerException
+ */
+ public void setupExecutors() throws ExecutorManagerException;
+
+ /**
+ * Enable flow dispatching in QueueProcessor
+ *
+ * @throws ExecutorManagerException
+ */
+ public void enableQueueProcessorThread() throws ExecutorManagerException;
+
+ /**
+ * Disable flow dispatching in QueueProcessor
+ *
+ * @throws ExecutorManagerException
+ */
+ public void disableQueueProcessorThread() throws ExecutorManagerException;
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 25e61c3..a588cca 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -21,12 +21,14 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.lang.annotation.Inherited;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -40,6 +42,7 @@ import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import azkaban.database.AbstractJdbcLoader;
+import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.GZIPUtils;
@@ -177,6 +180,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
+ /**
+ *
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorLoader#fetchQueuedFlows()
+ */
+ @Override
+ public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+ throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchQueuedExecutableFlows flowHandler = new FetchQueuedExecutableFlows();
+
+ try {
+ List<Pair<ExecutionReference, ExecutableFlow>> flows =
+ runner.query(FetchQueuedExecutableFlows.FETCH_QUEUED_EXECUTABLE_FLOW,
+ flowHandler);
+ return flows;
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error fetching active flows", e);
+ }
+ }
+
@Override
public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
throws ExecutorManagerException {
@@ -380,12 +404,11 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
throws ExecutorManagerException {
final String INSERT =
"INSERT INTO active_executing_flows "
- + "(exec_id, host, port, update_time) values (?,?,?,?)";
+ + "(exec_id, update_time) values (?,?)";
QueryRunner runner = createQueryRunner();
try {
- runner.update(INSERT, reference.getExecId(), reference.getHost(),
- reference.getPort(), reference.getUpdateTime());
+ runner.update(INSERT, reference.getExecId(), reference.getUpdateTime());
} catch (SQLException e) {
throw new ExecutorManagerException(
"Error updating active flow reference " + reference.getExecId(), e);
@@ -773,6 +796,265 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return connection;
}
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#fetchActiveExecutors()
+ */
+ @Override
+ public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+ try {
+ List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_ALL_EXECUTORS, executorHandler);
+ return executors;
+ } catch (Exception e) {
+ throw new ExecutorManagerException("Error fetching executors", e);
+ }
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#fetchActiveExecutors()
+ */
+ @Override
+ public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+ try {
+ List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_ACTIVE_EXECUTORS,
+ executorHandler);
+ return executors;
+ } catch (Exception e) {
+ throw new ExecutorManagerException("Error fetching active executors", e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#fetchExecutor(java.lang.String, int)
+ */
+ @Override
+ public Executor fetchExecutor(String host, int port)
+ throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+ try {
+ List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_HOST_PORT,
+ executorHandler, host, port);
+ if (executors.isEmpty()) {
+ return null;
+ } else {
+ return executors.get(0);
+ }
+ } catch (Exception e) {
+ throw new ExecutorManagerException(String.format(
+ "Error fetching executor %s:%d", host, port), e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#fetchExecutor(int)
+ */
+ @Override
+ public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+ try {
+ List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_ID,
+ executorHandler, executorId);
+ if (executors.isEmpty()) {
+ return null;
+ } else {
+ return executors.get(0);
+ }
+ } catch (Exception e) {
+ throw new ExecutorManagerException(String.format(
+ "Error fetching executor with id: %d", executorId), e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#updateExecutor(int)
+ */
+ @Override
+ public void updateExecutor(Executor executor) throws ExecutorManagerException {
+ final String UPDATE =
+ "UPDATE executors SET host=?, port=?, active=? where id=?";
+
+ QueryRunner runner = createQueryRunner();
+ try {
+ int rows =
+ runner.update(UPDATE, executor.getHost(), executor.getPort(),
+ executor.isActive(), executor.getId());
+ if (rows == 0) {
+ throw new ExecutorManagerException("No executor with id :"
+ + executor.getId());
+ }
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error inactivating executor "
+ + executor.getId(), e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#addExecutor(java.lang.String, int)
+ */
+ @Override
+ public Executor addExecutor(String host, int port)
+ throws ExecutorManagerException {
+ // verify, if executor already exists
+ Executor executor = fetchExecutor(host, port);
+ if (executor != null) {
+ throw new ExecutorManagerException(String.format(
+ "Executor %s:%d already exist", host, port));
+ }
+ // add new executor
+ addExecutorHelper(host, port);
+ // fetch newly added executor
+ executor = fetchExecutor(host, port);
+
+ return executor;
+ }
+
+ private void addExecutorHelper(String host, int port)
+ throws ExecutorManagerException {
+ final String INSERT = "INSERT INTO executors (host, port) values (?,?)";
+ QueryRunner runner = createQueryRunner();
+ try {
+ runner.update(INSERT, host, port);
+ } catch (SQLException e) {
+ throw new ExecutorManagerException(String.format("Error adding %s:%d ",
+ host, port), e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#postExecutorEvent(azkaban.executor.Executor,
+ * azkaban.executor.ExecutorLogEvent.EventType, java.lang.String,
+ * java.lang.String)
+ */
+ @Override
+ public void postExecutorEvent(Executor executor, EventType type, String user,
+ String message) throws ExecutorManagerException{
+ QueryRunner runner = createQueryRunner();
+
+ final String INSERT_PROJECT_EVENTS =
+ "INSERT INTO executor_events (executor_id, event_type, event_time, username, message) values (?,?,?,?,?)";
+ Date updateDate = new Date();
+ try {
+ runner.update(INSERT_PROJECT_EVENTS, executor.getId(), type.getNumVal(),
+ updateDate, user, message);
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Failed to post executor event", e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#getExecutorEvents(azkaban.executor.Executor,
+ * int, int)
+ */
+ @Override
+ public List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
+ int offset) throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+
+ ExecutorLogsResultHandler logHandler = new ExecutorLogsResultHandler();
+ List<ExecutorLogEvent> events = null;
+ try {
+ events =
+ runner.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
+ logHandler, executor.getId(), num, offset);
+ } catch (SQLException e) {
+ throw new ExecutorManagerException(
+ "Failed to fetch events for executor id : " + executor.getId(), e);
+ }
+
+ return events;
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#assignExecutor(int, int)
+ */
+ @Override
+ public void assignExecutor(int executorId, int executionId)
+ throws ExecutorManagerException {
+ final String UPDATE =
+ "UPDATE execution_flows SET executor_id=? where exec_id=?";
+
+ QueryRunner runner = createQueryRunner();
+ try {
+ Executor executor = fetchExecutor(executorId);
+ if (executor == null) {
+ throw new ExecutorManagerException(String.format(
+ "Failed to assign non-existent executor Id: %d to execution : %d ",
+ executorId, executionId));
+ }
+
+ int rows = runner.update(UPDATE, executorId, executionId);
+ if (rows == 0) {
+ throw new ExecutorManagerException(String.format(
+ "Failed to assign executor Id: %d to non-existent execution : %d ",
+ executorId, executionId));
+ }
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error updating executor id "
+ + executorId, e);
+ }
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#fetchExecutorByExecutionId(int)
+ */
+ @Override
+ public Executor fetchExecutorByExecutionId(int executionId)
+ throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+ Executor executor = null;
+ try {
+ List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
+ executorHandler, executionId);
+ if (executors.size() > 0) {
+ executor = executors.get(0);
+ }
+ } catch (SQLException e) {
+ throw new ExecutorManagerException(
+ "Error fetching executor for exec_id : " + executionId, e);
+ }
+ return executor;
+ }
+
private static class LastInsertID implements ResultSetHandler<Long> {
private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
@@ -975,13 +1257,80 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
+ /**
+ * JDBC ResultSetHandler to fetch queued executions
+ */
+ private static class FetchQueuedExecutableFlows implements
+ ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
+ // Select queued unassigned flows
+ private static String FETCH_QUEUED_EXECUTABLE_FLOW =
+ "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, "
+ + " ax.update_time axUpdateTime FROM execution_flows ex"
+ + " INNER JOIN"
+ + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
+ + " Where ex.executor_id is NULL";
+
+ @Override
+ public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs)
+ throws SQLException {
+ if (!rs.next()) {
+ return Collections
+ .<Pair<ExecutionReference, ExecutableFlow>> emptyList();
+ }
+
+ List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
+ new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+ do {
+ int id = rs.getInt(1);
+ int encodingType = rs.getInt(2);
+ byte[] data = rs.getBytes(3);
+ long updateTime = rs.getLong(4);
+
+ if (data == null) {
+ logger.error("Found a flow with empty data blob exec_id: " + id);
+ } else {
+ EncodingType encType = EncodingType.fromInteger(encodingType);
+ Object flowObj;
+ try {
+ // Convoluted way to inflate strings. Should find common package or
+ // helper function.
+ if (encType == EncodingType.GZIP) {
+ // Decompress the sucker.
+ String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ } else {
+ String jsonString = new String(data, "UTF-8");
+ flowObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+
+ ExecutableFlow exFlow =
+ ExecutableFlow.createExecutableFlowFromObject(flowObj);
+ ExecutionReference ref = new ExecutionReference(id);
+ ref.setUpdateTime(updateTime);
+
+ execFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref,
+ exFlow));
+ } catch (IOException e) {
+ throw new SQLException("Error retrieving flow data " + id, e);
+ }
+ }
+ } while (rs.next());
+
+ return execFlows;
+ }
+ }
+
private static class FetchActiveExecutableFlows implements
ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
+ // Select running and executor assigned flows
private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
- "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data "
- + "flow_data, ax.host host, ax.port port, ax.update_time "
- + "axUpdateTime " + "FROM execution_flows ex "
- + "INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
+ "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+ + "et.port port, ax.update_time axUpdateTime, et.id executorId, et.active executorStatus"
+ + " FROM execution_flows ex"
+ + " INNER JOIN "
+ + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
+ + " INNER JOIN "
+ + " executors et ON ex.executor_id = et.id";
@Override
public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
@@ -1000,6 +1349,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
String host = rs.getString(4);
int port = rs.getInt(5);
long updateTime = rs.getLong(6);
+ int executorId = rs.getInt(7);
+ boolean executorStatus = rs.getBoolean(8);
if (data == null) {
execFlows.put(id, null);
@@ -1020,7 +1371,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
ExecutableFlow exFlow =
ExecutableFlow.createExecutableFlowFromObject(flowObj);
- ExecutionReference ref = new ExecutionReference(id, host, port);
+ Executor executor = new Executor(executorId, host, port, executorStatus);
+ ExecutionReference ref = new ExecutionReference(id, executor);
ref.setUpdateTime(updateTime);
execFlows.put(id, new Pair<ExecutionReference, ExecutableFlow>(ref,
@@ -1106,6 +1458,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
"SELECT COUNT(1) FROM execution_flows WHERE project_id=? AND flow_id=?";
private static String NUM_JOB_EXECUTIONS =
"SELECT COUNT(1) FROM execution_jobs WHERE project_id=? AND job_id=?";
+ private static String FETCH_EXECUTOR_ID =
+ "SELECT executor_id FROM execution_flows WHERE exec_id=?";
@Override
public Integer handle(ResultSet rs) throws SQLException {
@@ -1134,4 +1488,98 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return updateNum;
}
+
+ /**
+ * JDBC ResultSetHandler to fetch records from executors table
+ */
+ private static class FetchExecutorHandler implements
+ ResultSetHandler<List<Executor>> {
+ private static String FETCH_ALL_EXECUTORS =
+ "SELECT id, host, port, active FROM executors";
+ private static String FETCH_ACTIVE_EXECUTORS =
+ "SELECT id, host, port, active FROM executors where active=true";
+ private static String FETCH_EXECUTOR_BY_ID =
+ "SELECT id, host, port, active FROM executors where id=?";
+ private static String FETCH_EXECUTOR_BY_HOST_PORT =
+ "SELECT id, host, port, active FROM executors where host=? AND port=?";
+ private static String FETCH_EXECUTION_EXECUTOR =
+ "SELECT ex.id, ex.host, ex.port, ex.active FROM "
+ + " executors ex INNER JOIN execution_flows ef "
+ + "on ex.id = ef.executor_id where exec_id=?";
+
+ @Override
+ public List<Executor> handle(ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.<Executor> emptyList();
+ }
+
+ List<Executor> executors = new ArrayList<Executor>();
+ do {
+ int id = rs.getInt(1);
+ String host = rs.getString(2);
+ int port = rs.getInt(3);
+ boolean active = rs.getBoolean(4);
+ Executor executor = new Executor(id, host, port, active);
+ executors.add(executor);
+ } while (rs.next());
+
+ return executors;
+ }
+ }
+
+ /**
+ * JDBC ResultSetHandler to fetch records from executor_events table
+ */
+ private static class ExecutorLogsResultHandler implements
+ ResultSetHandler<List<ExecutorLogEvent>> {
+ private static String SELECT_EXECUTOR_EVENTS_ORDER =
+ "SELECT executor_id, event_type, event_time, username, message FROM executor_events "
+ + " WHERE executor_id=? ORDER BY event_time LIMIT ? OFFSET ?";
+
+ @Override
+ public List<ExecutorLogEvent> handle(ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.<ExecutorLogEvent> emptyList();
+ }
+
+ ArrayList<ExecutorLogEvent> events = new ArrayList<ExecutorLogEvent>();
+ do {
+ int executorId = rs.getInt(1);
+ int eventType = rs.getInt(2);
+ Date eventTime = rs.getDate(3);
+ String username = rs.getString(4);
+ String message = rs.getString(5);
+
+ ExecutorLogEvent event =
+ new ExecutorLogEvent(executorId, username, eventTime,
+ EventType.fromInteger(eventType), message);
+ events.add(event);
+ } while (rs.next());
+
+ return events;
+ }
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ * @see azkaban.executor.ExecutorLoader#unassignExecutor(int)
+ */
+ @Override
+ public void unassignExecutor(int executionId) throws ExecutorManagerException {
+ final String UPDATE =
+ "UPDATE execution_flows SET executor_id=NULL where exec_id=?";
+
+ QueryRunner runner = createQueryRunner();
+ try {
+ int rows = runner.update(UPDATE, executionId);
+ if (rows == 0) {
+ throw new ExecutorManagerException(String.format(
+ "Failed to unassign executor for execution : %d ", executionId));
+ }
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error updating execution id "
+ + executionId, e);
+ }
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
new file mode 100644
index 0000000..641ffae
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
@@ -0,0 +1,201 @@
+package azkaban.executor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/**
+ * <pre>
+ * Composite data structure to represent non-dispatched flows in webserver.
+ * This data structure wraps a blocking queue and a concurrent hashmap.
+ * </pre>
+ */
+public class QueuedExecutions {
+ private static Logger logger = Logger.getLogger(QueuedExecutions.class);
+ final long capacity;
+
+ /* map to easily access queued flows */
+ final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap;
+ /* actual queue */
+ final private BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlowList;
+
+ public QueuedExecutions(long capacity) {
+ this.capacity = capacity;
+ queuedFlowMap =
+ new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+ queuedFlowList =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
+ }
+
+ /**
+ * Wraps BoundedQueue Take method to have a corresponding update in
+ * queuedFlowMap lookup table
+ *
+ * @return
+ * @throws InterruptedException
+ */
+ public Pair<ExecutionReference, ExecutableFlow> fetchHead()
+ throws InterruptedException {
+ Pair<ExecutionReference, ExecutableFlow> pair = queuedFlowList.take();
+ if (pair != null && pair.getFirst() != null) {
+ queuedFlowMap.remove(pair.getFirst().getExecId());
+ }
+ return pair;
+ }
+
+ /**
+ * Helper method to have a single point of deletion in the queued flows
+ *
+ * @param executionId
+ */
+ public void dequeue(int executionId) {
+ if (queuedFlowMap.containsKey(executionId)) {
+ queuedFlowList.remove(queuedFlowMap.get(executionId));
+ queuedFlowMap.remove(executionId);
+ }
+ }
+
+ /**
+ * <pre>
+ * Helper method to have a single point of insertion in the queued flows
+ *
+ * @param exflow
+ * flow to be enqueued
+ * @param ref
+ * reference to be enqueued
+ * @throws ExecutorManagerException
+ * case 1: if blocking queue put method fails due to
+ * InterruptedException
+ * case 2: if there already an element with
+ * same execution Id
+ * </pre>
+ */
+ public void enqueue(ExecutableFlow exflow, ExecutionReference ref)
+ throws ExecutorManagerException {
+ if (hasExecution(exflow.getExecutionId())) {
+ String errMsg = "Flow already in queue " + exflow.getExecutionId();
+ throw new ExecutorManagerException(errMsg);
+ }
+
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ new Pair<ExecutionReference, ExecutableFlow>(ref, exflow);
+ try {
+ queuedFlowMap.put(exflow.getExecutionId(), pair);
+ queuedFlowList.put(pair);
+ } catch (InterruptedException e) {
+ String errMsg = "Failed to insert flow " + exflow.getExecutionId();
+ logger.error(errMsg, e);
+ throw new ExecutorManagerException(errMsg);
+ }
+ }
+
+ /**
+ * <pre>
+ * Enqueues all the elements of a collection
+ *
+ * @param collection
+ *
+ * @throws ExecutorManagerException
+ * case 1: if blocking queue put method fails due to
+ * InterruptedException
+ * case 2: if there already an element with
+ * same execution Id
+ * </pre>
+ */
+ public void enqueueAll(
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection)
+ throws ExecutorManagerException {
+ for (Pair<ExecutionReference, ExecutableFlow> pair : collection) {
+ enqueue(pair.getSecond(), pair.getFirst());
+ }
+ }
+
+ /**
+ * Returns a read only collection of all the queued (flows, reference) pairs
+ *
+ * @return
+ */
+ public Collection<Pair<ExecutionReference, ExecutableFlow>> getAllEntries() {
+ return Collections.unmodifiableCollection(queuedFlowMap.values());
+ }
+
+ /**
+ * Checks if an execution is queued or not
+ *
+ * @param executionId
+ * @return
+ */
+ public boolean hasExecution(int executionId) {
+ return queuedFlowMap.containsKey(executionId);
+ }
+
+ /**
+ * Fetch flow for an execution. Returns null, if execution not in queue
+ *
+ * @param executionId
+ * @return
+ */
+ public ExecutableFlow getFlow(int executionId) {
+ if (hasExecution(executionId)) {
+ return queuedFlowMap.get(executionId).getSecond();
+ }
+ return null;
+ }
+
+ /**
+ * Fetch Activereference for an execution. Returns null, if execution not in
+ * queue
+ *
+ * @param executionId
+ * @return
+ */
+ public ExecutionReference getReference(int executionId) {
+ if (hasExecution(executionId)) {
+ return queuedFlowMap.get(executionId).getFirst();
+ }
+ return null;
+ }
+
+ /**
+ * Size of the queue
+ *
+ * @return
+ */
+ public long size() {
+ return queuedFlowList.size();
+ }
+
+ /**
+ * Verify, if queue is full as per initialized capacity
+ *
+ * @return
+ */
+ public boolean isFull() {
+ return size() >= capacity;
+ }
+
+ /**
+ * Verify, if queue is empty or not
+ *
+ * @return
+ */
+ public boolean isEmpty() {
+ return queuedFlowList.isEmpty() && queuedFlowMap.isEmpty();
+ }
+
+ /**
+ * Empties queue by dequeuing all the elements
+ */
+ public void clear() {
+ for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowMap.values()) {
+ dequeue(pair.getFirst().getExecId());
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
new file mode 100644
index 0000000..f83788f
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/**
+ * <pre>
+ * Abstract class for a candidate comparator.
+ * this class contains implementation of most of the core logics. Implementing classes is expected only to
+ * register factor comparators using the provided register function.
+ * <pre>
+ */
+public abstract class CandidateComparator<T> implements Comparator<T> {
+ protected static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+ // internal repository of the registered comparators .
+ private Map<String,FactorComparator<T>> factorComparatorList =
+ new ConcurrentHashMap<String,FactorComparator<T>>();
+
+ /** gets the name of the current implementation of the candidate comparator.
+ * @returns : name of the comparator.
+ * */
+ public abstract String getName();
+
+ /** tieBreak method which will kick in when the comparator list generated an equality result for
+ * both sides. the tieBreak method will try best to make sure a stable result is returned.
+ * */
+ protected boolean tieBreak(T object1, T object2){
+ if (null == object2) return true;
+ if (null == object1) return false;
+ return object1.hashCode() >= object2.hashCode();
+ }
+
+ /** function to register a factorComparator to the internal Map for future reference.
+ * @param factorComparator : the comparator object to be registered.
+ * @throws IllegalArgumentException
+ * */
+ protected void registerFactorComparator(FactorComparator<T> comparator){
+ if (null == comparator ||
+ Integer.MAX_VALUE - this.getTotalWeight() < comparator.getWeight() ) {
+ throw new IllegalArgumentException("unable to register comparator."+
+ " The passed comparator is null or has an invalid weight value.");
+ }
+
+ // add or replace the Comparator.
+ this.factorComparatorList.put(comparator.getFactorName(),comparator);
+ logger.debug(String.format("Factor comparator added for '%s'. Weight = '%s'",
+ comparator.getFactorName(), comparator.getWeight()));
+ }
+
+ /** function returns the total weight of the registered comparators.
+ * @return the value of total weight.
+ * */
+ public int getTotalWeight(){
+ int totalWeight = 0 ;
+
+ // save out a copy of the values as HashMap.values() takes o(n) to return the value.
+ Collection<FactorComparator<T>> allValues = this.factorComparatorList.values();
+ for (FactorComparator<T> item : allValues){
+ if (item != null){
+ totalWeight += item.getWeight();
+ }
+ }
+
+ return totalWeight;
+ }
+
+ /**
+ * <pre>
+ * function to actually calculate the scores for the two objects that are being compared.
+ * the comparison follows the following logic -
+ * 1. if both objects are equal return 0 score for both.
+ * 2. if one side is null, the other side gets all the score.
+ * 3. if both sides are non-null value, both values will be passed to all the registered FactorComparators
+ * each factor comparator will generate a result based off it sole logic the weight of the comparator will be
+ * added to the wining side, if equal, no value will be added to either side.
+ * 4. final result will be returned in a Pair container.
+ *
+ * </pre>
+ * @param object1 the first object (left side) to be compared.
+ * @param object2 the second object (right side) to be compared.
+ * @return a pair structure contains the score for both sides.
+ * */
+ public Pair<Integer,Integer> getComparisonScore(T object1, T object2){
+ logger.debug(String.format("start comparing '%s' with '%s', total weight = %s ",
+ object1 == null ? "(null)" : object1.toString(),
+ object2 == null ? "(null)" : object2.toString(),
+ this.getTotalWeight()));
+
+ int result1 = 0 ;
+ int result2 = 0 ;
+
+ // short cut if object equals.
+ if (object1 == object2){
+ logger.debug("[Comparator] same object.");
+ } else
+ // left side is null.
+ if (object1 == null){
+ logger.debug("[Comparator] left side is null, right side gets total weight.");
+ result2 = this.getTotalWeight();
+ } else
+ // right side is null.
+ if (object2 == null){
+ logger.debug("[Comparator] right side is null, left side gets total weight.");
+ result1 = this.getTotalWeight();
+ } else
+ // both side is not null,put them thru the full loop
+ {
+ Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values();
+ for (FactorComparator<T> comparator :comparatorList){
+ int result = comparator.compare(object1, object2);
+ result1 = result1 + (result > 0 ? comparator.getWeight() : 0);
+ result2 = result2 + (result < 0 ? comparator.getWeight() : 0);
+ logger.debug(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
+ comparator.getFactorName(), result, result1, result2));
+ }
+ }
+ // in case of same score, use tie-breaker to stabilize the result.
+ if (result1 == result2){
+ boolean result = this.tieBreak(object1, object2);
+ logger.debug("[TieBreaker] TieBreaker chose " +
+ (result? String.format("left side (%s)", null== object1 ? "null": object1.toString()) :
+ String.format("right side (%s)", null== object2 ? "null": object2.toString()) ));
+ if (result) result1++; else result2++;
+ }
+
+ logger.debug(String.format("Result : %s vs %s ",result1,result2));
+ return new Pair<Integer,Integer>(result1,result2);
+ }
+
+ @Override
+ public int compare(T o1, T o2) {
+ Pair<Integer,Integer> result = this.getComparisonScore(o1,o2);
+ return result.getFirst() == result.getSecond() ? 0 :
+ result.getFirst() > result.getSecond() ? 1 : -1;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
new file mode 100644
index 0000000..f927a2a
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+
+/** Abstract class for a candidate filter.
+ * this class contains implementation of most of the core logics. Implementing classes is expected only to
+ * register filters using the provided register function.
+ */
+public abstract class CandidateFilter<T,V> {
+ protected static Logger logger = Logger.getLogger(CandidateFilter.class);
+
+ // internal repository of the registered filters .
+ private Map<String,FactorFilter<T,V>> factorFilterList =
+ new ConcurrentHashMap<String,FactorFilter<T,V>>();
+
+ /** gets the name of the current implementation of the candidate filter.
+ * @return : name of the filter.
+ * */
+ public abstract String getName();
+
+ /** function to register a factorFilter to the internal Map for future reference.
+ * @param factorfilter : the Filter object to be registered.
+ * @throws IllegalArgumentException
+ * */
+ protected void registerFactorFilter(FactorFilter<T,V> filter){
+ if (null == filter ) {
+ throw new IllegalArgumentException("unable to register factor filter. " +
+ "The passed comaractor is null or has an invalid weight value.");
+ }
+
+ // add or replace the filter.
+ this.factorFilterList.put(filter.getFactorName(),filter);
+ logger.debug(String.format("Factor filter added for '%s'.",
+ filter.getFactorName()));
+ }
+
+ /** function to analyze the target item according to the reference object to decide whether the item should be filtered.
+ * @param filteringTarget: object to be checked.
+ * @param referencingObject: object which contains statistics based on which a decision is made whether
+ * the object being checked need to be filtered or not.
+ * @return true if the check passed, false if check failed, which means the item need to be filtered.
+ * */
+ public boolean filterTarget(T filteringTarget, V referencingObject){
+ logger.debug(String.format("start filtering '%s' with factor filter for '%s'",
+ filteringTarget == null ? "(null)" : filteringTarget.toString(),
+ this.getName()));
+
+ Collection<FactorFilter<T,V>> filterList = this.factorFilterList.values();
+ boolean result = true;
+ for (FactorFilter<T,V> filter : filterList){
+ result &= filter.filterTarget(filteringTarget,referencingObject);
+ logger.debug(String.format("[Factor: %s] filter result : %s ",
+ filter.getFactorName(), result));
+ if (!result){
+ break;
+ }
+ }
+ logger.debug(String.format("Final filtering result : %s ",result));
+ return result;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
new file mode 100644
index 0000000..8fa91d0
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.log4j.Logger;
+
+/** Implementation of the CandidateSelector.
+ * @param K executor object type.
+ * @param V dispatching object type.
+ * */
+public class CandidateSelector<K extends Comparable<K>, V> implements Selector<K, V> {
+ private static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+ private CandidateFilter<K,V> filter;
+ private CandidateComparator<K> comparator;
+
+ /**constructor of the class.
+ * @param filter CandidateFilter object to be used to perform the candidate filtering.
+ * @param comparator CandidateComparator object to be used to find the best suit candidate from the filtered list.
+ * */
+ public CandidateSelector(CandidateFilter<K,V> filter,
+ CandidateComparator<K> comparator){
+ this.filter = filter;
+ this.comparator = comparator;
+ }
+
+ @Override
+ public K getBest(Collection<K> candidateList, V dispatchingObject) {
+
+ // shortcut if the candidateList is empty.
+ if ( null == candidateList || candidateList.size() == 0){
+ logger.error("failed to getNext candidate as the passed candidateList is null or empty.");
+ return null;
+ }
+
+ logger.debug("start candidate selection logic.");
+ logger.debug(String.format("candidate count before filtering: %s", candidateList.size()));
+
+ // to keep the input untouched, we will form up a new list based off the filtering result.
+ Collection<K> filteredList = new ArrayList<K>();
+
+ if (null != this.filter){
+ for (K candidateInfo : candidateList){
+ if (filter.filterTarget(candidateInfo,dispatchingObject)){
+ filteredList.add(candidateInfo);
+ }
+ }
+ } else{
+ filteredList = candidateList;
+ logger.debug("skipping the candidate filtering as the filter object is not specifed.");
+ }
+
+ logger.debug(String.format("candidate count after filtering: %s", filteredList.size()));
+ if (filteredList.size() == 0){
+ logger.debug("failed to select candidate as the filtered candidate list is empty.");
+ return null;
+ }
+
+ if (null == comparator){
+ logger.debug("candidate comparator is not specified, default hash code comparator class will be used.");
+ }
+
+ // final work - find the best candidate from the filtered list.
+ K executor = Collections.max(filteredList, comparator);
+ logger.debug(String.format("candidate selected %s",
+ null == executor ? "(null)" : executor.toString()));
+ return executor;
+ }
+
+ @Override
+ public String getName() {
+ return "CandidateSelector";
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
new file mode 100644
index 0000000..978bcb9
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorInfo;
+
+
+/**
+ * De-normalized version of the CandidateComparator, which also contains the implementation of the factor comparators.
+ * */
+public class ExecutorComparator extends CandidateComparator<Executor> {
+ private static Map<String, ComparatorCreator> comparatorCreatorRepository = null;
+
+ /**
+ * Gets the name list of all available comparators.
+ * @return the list of the names.
+ * */
+ public static Set<String> getAvailableComparatorNames(){
+ return comparatorCreatorRepository.keySet();
+ }
+
+ // factor comparator names
+ private static final String NUMOFASSIGNEDFLOW_COMPARATOR_NAME = "NumberOfAssignedFlowComparator";
+ private static final String MEMORY_COMPARATOR_NAME = "Memory";
+ private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
+ private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";
+
+ /**
+ * static initializer of the class.
+ * We will build the filter repository here.
+ * when a new comparator is added, please do remember to register it here.
+ * */
+ static {
+ comparatorCreatorRepository = new HashMap<String, ComparatorCreator>();
+
+ // register the creator for number of assigned flow comparator.
+ comparatorCreatorRepository.put(NUMOFASSIGNEDFLOW_COMPARATOR_NAME, new ComparatorCreator(){
+ public FactorComparator<Executor> create(int weight) { return getNumberOfAssignedFlowComparator(weight); }});
+
+ // register the creator for memory comparator.
+ comparatorCreatorRepository.put(MEMORY_COMPARATOR_NAME, new ComparatorCreator(){
+ public FactorComparator<Executor> create(int weight) { return getMemoryComparator(weight); }});
+
+ // register the creator for last dispatched time comparator.
+ comparatorCreatorRepository.put(LSTDISPATCHED_COMPARATOR_NAME, new ComparatorCreator(){
+ public FactorComparator<Executor> create(int weight) { return getLstDispatchedTimeComparator(weight); }});
+
+ // register the creator for CPU Usage comparator.
+ comparatorCreatorRepository.put(CPUUSAGE_COMPARATOR_NAME, new ComparatorCreator(){
+ public FactorComparator<Executor> create(int weight) { return getCpuUsageComparator(weight); }});
+ }
+
+
+ /**
+ * constructor of the ExecutorComparator.
+ * @param comparatorList the list of comparator, plus its weight information to be registered,
+ * the parameter must be a not-empty and valid list object.
+ * */
+ public ExecutorComparator(Map<String,Integer> comparatorList) {
+ if (null == comparatorList|| comparatorList.size() == 0){
+ throw new IllegalArgumentException("failed to initialize executor comparator" +
+ "as the passed comparator list is invalid or empty.");
+ }
+
+ // register the comparators, we will now throw here if the weight is invalid, it is handled in the super.
+ for (Entry<String,Integer> entry : comparatorList.entrySet()){
+ if (comparatorCreatorRepository.containsKey(entry.getKey())){
+ this.registerFactorComparator(comparatorCreatorRepository.
+ get(entry.getKey()).
+ create(entry.getValue()));
+ } else {
+ throw new IllegalArgumentException(String.format("failed to initialize executor comparator " +
+ "as the comparator implementation for requested factor '%s' doesn't exist.",
+ entry.getKey()));
+ }
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "ExecutorComparator";
+ }
+
+ private interface ComparatorCreator{
+ FactorComparator<Executor> create(int weight);
+ }
+
+ /**<pre>
+ * helper function that does the object on two statistics, comparator can leverage this function to provide
+ * shortcuts if the statistics object is missing from one or both sides of the executors.
+ * </pre>
+ * @param stat1 the first statistics object to be checked .
+ * @param stat2 the second statistics object to be checked.
+ * @param caller the name of the calling function, for logging purpose.
+ * @param result result Integer to pass out the result in case the statistics are not both valid.
+ * @return true if the passed statistics are NOT both valid, a shortcut can be made (caller can consume the result),
+ * false otherwise.
+ * */
+ private static boolean statisticsObjectCheck(ExecutorInfo statisticsObj1,
+ ExecutorInfo statisticsObj2, String caller, Integer result){
+ result = 0 ;
+ // both doesn't expose the info
+ if (null == statisticsObj1 && null == statisticsObj2){
+ logger.debug(String.format("%s : neither of the executors exposed statistics info.",
+ caller));
+ return true;
+ }
+
+ //right side doesn't expose the info.
+ if (null == statisticsObj2 ){
+ logger.debug(String.format("%s : choosing left side and the right side executor doesn't expose statistics info",
+ caller));
+ result = 1;
+ return true;
+ }
+
+ //left side doesn't expose the info.
+ if (null == statisticsObj1 ){
+ logger.debug(String.format("%s : choosing right side and the left side executor doesn't expose statistics info",
+ caller));
+ result = -1;
+ return true;
+ }
+
+ // both not null
+ return false;
+ }
+
+ /**
+ * function defines the number of assigned flow comparator.
+ * @param weight weight of the comparator.
+ * */
+ private static FactorComparator<Executor> getNumberOfAssignedFlowComparator(int weight){
+ return FactorComparator.create(NUMOFASSIGNEDFLOW_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+ @Override
+ public int compare(Executor o1, Executor o2) {
+ ExecutorInfo stat1 = o1.getExecutorInfo();
+ ExecutorInfo stat2 = o2.getExecutorInfo();
+
+ Integer result = 0;
+ if (statisticsObjectCheck(stat1,stat2,NUMOFASSIGNEDFLOW_COMPARATOR_NAME,result)){
+ return result;
+ }
+ return ((Integer)stat1.getRemainingFlowCapacity()).compareTo(stat2.getRemainingFlowCapacity());
+ }});
+ }
+
+ /**
+ * function defines the cpuUsage comparator.
+ * @param weight weight of the comparator.
+ * @return
+ * */
+ private static FactorComparator<Executor> getCpuUsageComparator(int weight){
+ return FactorComparator.create(CPUUSAGE_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+ @Override
+ public int compare(Executor o1, Executor o2) {
+ ExecutorInfo stat1 = o1.getExecutorInfo();
+ ExecutorInfo stat2 = o2.getExecutorInfo();
+
+ int result = 0;
+ if (statisticsObjectCheck(stat1,stat2,CPUUSAGE_COMPARATOR_NAME,result)){
+ return result;
+ }
+
+ // CPU usage , the lesser the value is, the better.
+ return ((Double)stat2.getCpuUsage()).compareTo(stat1.getCpuUsage());
+ }});
+ }
+
+
+ /**
+ * function defines the last dispatched time comparator.
+ * @param weight weight of the comparator.
+ * @return
+ * */
+ private static FactorComparator<Executor> getLstDispatchedTimeComparator(int weight){
+ return FactorComparator.create(LSTDISPATCHED_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+ @Override
+ public int compare(Executor o1, Executor o2) {
+ ExecutorInfo stat1 = o1.getExecutorInfo();
+ ExecutorInfo stat2 = o2.getExecutorInfo();
+
+ int result = 0;
+ if (statisticsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
+ return result;
+ }
+ // Note: an earlier date time indicates higher weight.
+ return ((Long)stat2.getLastDispatchedTime()).compareTo(stat1.getLastDispatchedTime());
+ }});
+ }
+
+
+ /**<pre>
+ * function defines the Memory comparator.
+ * Note: comparator firstly take the absolute value of the remaining memory, if both sides have the same value,
+ * it go further to check the percent of the remaining memory.
+ * </pre>
+ * @param weight weight of the comparator.
+
+ * @return
+ * */
+ private static FactorComparator<Executor> getMemoryComparator(int weight){
+ return FactorComparator.create(MEMORY_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+ @Override
+ public int compare(Executor o1, Executor o2) {
+ ExecutorInfo stat1 = o1.getExecutorInfo();
+ ExecutorInfo stat2 = o2.getExecutorInfo();
+
+ int result = 0;
+ if (statisticsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
+ return result;
+ }
+
+ if (stat1.getRemainingMemoryInMB() != stat2.getRemainingMemoryInMB()){
+ return stat1.getRemainingMemoryInMB() > stat2.getRemainingMemoryInMB() ? 1:-1;
+ }
+
+ return Double.compare(stat1.getRemainingMemoryPercent(), stat2.getRemainingMemoryPercent());
+ }});
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
new file mode 100644
index 0000000..a47bc16
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorInfo;
+
+/**
+ * De-normalized version of the candidateFilter, which also contains the implementation of the factor filters.
+ * */
+public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFlow> {
+ private static Map<String, FactorFilter<Executor, ExecutableFlow>> filterRepository = null;
+
+ /**
+ * Gets the name list of all available filters.
+ * @return the list of the names.
+ * */
+ public static Set<String> getAvailableFilterNames(){
+ return filterRepository.keySet();
+ }
+
+
+ // factor filter names.
+ private static final String STATICREMAININGFLOWSIZE_FILTER_NAME = "StaticRemainingFlowSize";
+ private static final String MINIMUMFREEMEMORY_FILTER_NAME = "MinimumFreeMemory";
+ private static final String CPUSTATUS_FILTER_NAME = "CpuStatus";
+
+ /**<pre>
+ * static initializer of the class.
+ * We will build the filter repository here.
+ * when a new filter is added, please do remember to register it here.
+ * </pre>
+ * */
+ static {
+ filterRepository = new HashMap<String, FactorFilter<Executor, ExecutableFlow>>();
+ filterRepository.put(STATICREMAININGFLOWSIZE_FILTER_NAME, getStaticRemainingFlowSizeFilter());
+ filterRepository.put(MINIMUMFREEMEMORY_FILTER_NAME, getMinimumReservedMemoryFilter());
+ filterRepository.put(CPUSTATUS_FILTER_NAME, getCpuStatusFilter());
+ }
+
+ /**
+ * constructor of the ExecutorFilter.
+ * @param filterList the list of filter to be registered, the parameter must be a not-empty and valid list object.
+ * */
+ public ExecutorFilter(Collection<String> filterList) {
+ // shortcut if the filter list is invalid. A little bit ugly to have to throw in constructor.
+ if (null == filterList || filterList.size() == 0){
+ logger.error("failed to initialize executor filter as the passed filter list is invalid or empty.");
+ throw new IllegalArgumentException("filterList");
+ }
+
+ // register the filters according to the list.
+ for (String filterName : filterList){
+ if (filterRepository.containsKey(filterName)){
+ this.registerFactorFilter(filterRepository.get(filterName));
+ } else {
+ logger.error(String.format("failed to initialize executor filter "+
+ "as the filter implementation for requested factor '%s' doesn't exist.",
+ filterName));
+ throw new IllegalArgumentException("filterList");
+ }
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "ExecutorFilter";
+ }
+
+ /**<pre>
+ * function to register the static remaining flow size filter.
+ * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
+ * Coming for the passed flow.
+ * Ideally this filter will make sure only the executor hasn't reached the Max allowed # of executing flows.
+ *</pre>
+ * */
+ private static FactorFilter<Executor, ExecutableFlow> getStaticRemainingFlowSizeFilter(){
+ return FactorFilter.create(STATICREMAININGFLOWSIZE_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+ public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+ if (null == filteringTarget){
+ logger.debug(String.format("%s : filtering out the target as it is null.", STATICREMAININGFLOWSIZE_FILTER_NAME));
+ return false;
+ }
+
+ ExecutorInfo stats = filteringTarget.getExecutorInfo();
+ if (null == stats) {
+ logger.debug(String.format("%s : filtering out %s as it's stats is unavailable.",
+ STATICREMAININGFLOWSIZE_FILTER_NAME,
+ filteringTarget.toString()));
+ return false;
+ }
+ return stats.getRemainingFlowCapacity() > 0 ;
+ }
+ });
+ }
+
+ /**<pre>
+ * function to register the static Minimum Reserved Memory filter.
+ * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
+ * Coming for the passed flow.
+ * This filter will filter out any executors that has the remaining memory below 6G
+ *</pre>
+ * */
+ private static FactorFilter<Executor, ExecutableFlow> getMinimumReservedMemoryFilter(){
+ return FactorFilter.create(MINIMUMFREEMEMORY_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+ private static final int MINIMUM_FREE_MEMORY = 6 * 1024;
+ public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+ if (null == filteringTarget){
+ logger.debug(String.format("%s : filtering out the target as it is null.", MINIMUMFREEMEMORY_FILTER_NAME));
+ return false;
+ }
+
+ ExecutorInfo stats = filteringTarget.getExecutorInfo();
+ if (null == stats) {
+ logger.debug(String.format("%s : filtering out %s as it's stats is unavailable.",
+ MINIMUMFREEMEMORY_FILTER_NAME,
+ filteringTarget.toString()));
+ return false;
+ }
+ return stats.getRemainingMemoryInMB() > MINIMUM_FREE_MEMORY ;
+ }
+ });
+ }
+
+
+ /**
+ * <pre>
+ * function to register the static Minimum Reserved Memory filter.
+ * NOTE : this is a static filter which means the filter will be filtering based on the system standard which
+ * is not Coming for the passed flow.
+ * This filter will filter out any executors that the current CPU usage exceed 95%
+ * </pre>
+ * */
+ private static FactorFilter<Executor, ExecutableFlow> getCpuStatusFilter(){
+ return FactorFilter.create(CPUSTATUS_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+ private static final int MAX_CPU_CURRENT_USAGE = 95;
+ public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+ if (null == filteringTarget){
+ logger.debug(String.format("%s : filtering out the target as it is null.", CPUSTATUS_FILTER_NAME));
+ return false;
+ }
+
+ ExecutorInfo stats = filteringTarget.getExecutorInfo();
+ if (null == stats) {
+ logger.debug(String.format("%s : filtering out %s as it's stats is unavailable.",
+ MINIMUMFREEMEMORY_FILTER_NAME,
+ filteringTarget.toString()));
+ return false;
+ }
+ return stats.getCpuUsage() < MAX_CPU_CURRENT_USAGE ;
+ }
+ });
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
new file mode 100644
index 0000000..2405c28
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Collection;
+import java.util.Map;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+
+/**<pre>
+ * Executor selector class implementation.
+ * NOTE: This class is a de-generalized version of the CandidateSelector, which provides a
+ * clean and convenient constructor to take in filter and comparator name list and build
+ * the instance from that.
+ *</pre>
+ * */
+public class ExecutorSelector extends CandidateSelector<Executor, ExecutableFlow> {
+
+ /**
+ * Contractor of the class.
+ * @param filterList name list of the filters to be registered,
+ * filter feature will be disabled if a null value is passed.
+ * @param comparatorList name/weight pair list of the comparators to be registered ,
+ * again comparator feature is disabled if a null value is passed.
+ * */
+ public ExecutorSelector(Collection<String> filterList, Map<String,Integer> comparatorList) {
+ super(null == filterList || filterList.isEmpty() ? null : new ExecutorFilter(filterList),
+ null == comparatorList || comparatorList.isEmpty() ? null : new ExecutorComparator(comparatorList));
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
new file mode 100644
index 0000000..6d4d2e0
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Comparator;
+import org.apache.log4j.Logger;
+
+/** wrapper class for a factor comparator .
+ *@param T: the type of the objects to be compared.
+ */
+public final class FactorComparator<T>{
+ private static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+ private String factorName;
+ private int weight;
+ private Comparator<T> comparator;
+
+ /** private constructor of the class. User will create the instance of the class by calling the static
+ * method provided below.
+ * @param factorName : the factor name .
+ * @param weight : the weight of the comparator.
+ * @param comparator : function to be provided by user on how the comparison should be made.
+ * */
+ private FactorComparator(String factorName, int weight, Comparator<T> comparator){
+ this.factorName = factorName;
+ this.weight = weight;
+ this.comparator = comparator;
+ }
+
+ /** static function to generate an instance of the class.
+ * refer to the constructor for the param definitions.
+ * */
+ public static <T> FactorComparator<T> create(String factorName, int weight, Comparator<T> comparator){
+
+ if (null == factorName || factorName.length() == 0 || weight < 0 || null == comparator){
+ logger.error("failed to create instance of FactorComparator, at least one of the input paramters are invalid");
+ return null;
+ }
+
+ return new FactorComparator<T>(factorName,weight,comparator);
+ }
+
+ // function to return the factor name.
+ public String getFactorName(){
+ return this.factorName;
+ }
+
+ // function to return the weight value.
+ public int getWeight(){
+ return this.weight;
+ }
+
+ // function to return the weight value.
+ public void updateWeight(int value){
+ this.weight = value;
+ }
+
+ // the actual compare function, which will leverage the user defined function.
+ public int compare(T object1, T object2){
+ return this.comparator.compare(object1, object2);
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
new file mode 100644
index 0000000..04b04b7
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import org.apache.log4j.Logger;
+
+/** wrapper class for a factor Filter .
+ *@param T: the type of the objects to be compared.
+ *@param V: the type of the object to be used for filtering.
+ */
+public final class FactorFilter<T,V>{
+ private static Logger logger = Logger.getLogger(FactorFilter.class);
+
+ private String factorName;
+ private Filter<T,V> filter;
+
+ /** private constructor of the class. User will create the instance of the class by calling the static
+ * method provided below.
+ * @param factorName : the factor name .
+ * @param filter : user defined function specifying how the filtering should be implemented.
+ * */
+ private FactorFilter(String factorName, Filter<T,V> filter){
+ this.factorName = factorName;
+ this.filter = filter;
+ }
+
+ /** static function to generate an instance of the class.
+ * refer to the constructor for the param definitions.
+ * */
+ public static <T,V> FactorFilter<T,V> create(String factorName, Filter<T,V> filter){
+
+ if (null == factorName || factorName.length() == 0 || null == filter){
+ logger.error("failed to create instance of FactorFilter, at least one of the input paramters are invalid");
+ return null;
+ }
+
+ return new FactorFilter<T,V>(factorName,filter);
+ }
+
+ // function to return the factor name.
+ public String getFactorName(){
+ return this.factorName;
+ }
+
+ // the actual check function, which will leverage the logic defined by user.
+ public boolean filterTarget(T filteringTarget, V referencingObject){
+ return this.filter.filterTarget(filteringTarget, referencingObject);
+ }
+
+ // interface of the filter.
+ public interface Filter<T,V>{
+
+ /**function to analyze the target item according to the reference object to decide whether the item should be filtered.
+ * @param filteringTarget object to be checked.
+ * @param referencingObject object which contains statistics based on which a decision is made whether
+ * the object being checked need to be filtered or not.
+ * @return true if the check passed, false if check failed, which means the item need to be filtered.
+ * */
+ public boolean filterTarget(T filteringTarget, V referencingObject);
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
new file mode 100644
index 0000000..a56b41a
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Collection;
+
+
+/**<pre>
+ * Definition of the selector interface.
+ * an implementation of the selector interface provides the functionality
+ * to return a candidate from the candidateList that suits best for the dispatchingObject.
+ * </pre>
+ * @param K : type of the candidate.
+ * @param V : type of the dispatching object.
+ */
+public interface Selector <K extends Comparable<K>,V> {
+
+ /** Function returns the next best suit candidate from the candidateList for the dispatching object.
+ * @param candidateList : List of the candidates to select from .
+ * @param dispatchingObject : the object to be dispatched .
+ * @return candidate from the candidate list that suits best for the dispatching object.
+ * */
+ public K getBest(Collection<K> candidateList, V dispatchingObject);
+
+ /** Function returns the name of the current Dispatcher
+ * @return name of the dispatcher.
+ * */
+ public String getName();
+}
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
index 08e5534..d459ae9 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
@@ -62,4 +62,35 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
public String getRunningFlows() {
return manager.getRunningFlowIds();
}
+
+ @Override
+ public boolean isQueueProcessorActive() {
+ return manager.isQueueProcessorThreadActive();
+ }
+
+ @Override
+ public String getQueuedFlows() {
+ return manager.getQueuedFlowIds();
+ }
+
+ @Override
+ public String getQueueProcessorThreadState() {
+ return manager.getQueueProcessorThreadState().toString();
+ }
+
+ @Override
+ public List<String> getAvailableExecutorComparatorNames() {
+ return new ArrayList<String>(manager.getAvailableExecutorComparatorNames());
+ }
+
+ @Override
+ public List<String> getAvailableExecutorFilterNames() {
+ return new ArrayList<String>(manager.getAvailableExecutorFilterNames());
+ }
+
+ @Override
+ public long getLastSuccessfulExecutorInfoRefresh() {
+ return manager.getLastSuccessfulExecutorInfoRefresh();
+ }
+
}
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
index 9bc1175..69e401c 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -39,4 +39,23 @@ public interface JmxExecutorManagerMBean {
@DisplayName("OPERATION: getPrimaryExecutorHostPorts")
public List<String> getPrimaryExecutorHostPorts();
+
+ @DisplayName("OPERATION: isQueueProcessorActive")
+ public boolean isQueueProcessorActive();
+
+ @DisplayName("OPERATION: getQueuedFlows")
+ public String getQueuedFlows();
+
+ @DisplayName("OPERATION: getQueueProcessorThreadState")
+ public String getQueueProcessorThreadState();
+
+ @DisplayName("OPERATION: getAvailableExecutorComparatorNames")
+ List<String> getAvailableExecutorComparatorNames();
+
+ @DisplayName("OPERATION: getAvailableExecutorFilterNames")
+ List<String> getAvailableExecutorFilterNames();
+
+ @DisplayName("OPERATION: getLastSuccessfulExecutorInfoRefresh")
+ long getLastSuccessfulExecutorInfoRefresh();
+
}
diff --git a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
index 80eff74..fc159ec 100644
--- a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
+++ b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
@@ -25,9 +25,17 @@ import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
+import org.apache.commons.lang.StringUtils;
+
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.ExecutorManagerException;
import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.user.Permission;
+import azkaban.user.Permission.Type;
+import azkaban.user.Role;
+import azkaban.user.User;
+import azkaban.user.UserManager;
import azkaban.utils.JSONUtils;
public class HttpRequestUtils {
@@ -113,6 +121,68 @@ public class HttpRequestUtils {
}
/**
+ * <pre>
+ * Remove following flow param if submitting user is not an Azkaban admin
+ * FLOW_PRIORITY
+ * USE_EXECUTOR
+ * @param userManager
+ * @param options
+ * @param user
+ * </pre>
+ */
+ public static void filterAdminOnlyFlowParams(UserManager userManager,
+ ExecutionOptions options, User user) throws ExecutorManagerException {
+ if (options == null || options.getFlowParameters() == null)
+ return;
+
+ Map<String, String> params = options.getFlowParameters();
+ // is azkaban Admin
+ if (!hasPermission(userManager, user, Type.ADMIN)) {
+ params.remove(ExecutionOptions.FLOW_PRIORITY);
+ params.remove(ExecutionOptions.USE_EXECUTOR);
+ } else {
+ validateIntegerParam(params, ExecutionOptions.FLOW_PRIORITY);
+ validateIntegerParam(params, ExecutionOptions.USE_EXECUTOR);
+ }
+ }
+
+ /**
+ * parse a string as number and throws exception if parsed value is not a
+ * valid integer
+ * @param params
+ * @param paramName
+ * @throws ExecutorManagerException if paramName is not a valid integer
+ */
+ public static boolean validateIntegerParam(Map<String, String> params,
+ String paramName) throws ExecutorManagerException {
+ if (params != null && params.containsKey(paramName)
+ && !StringUtils.isNumeric(params.get(paramName))) {
+ throw new ExecutorManagerException(paramName + " should be an integer");
+ }
+ return true;
+ }
+
+ /**
+ * returns true if user has access of type
+ *
+ * @param userManager
+ * @param user
+ * @param type
+ * @return
+ */
+ public static boolean hasPermission(UserManager userManager, User user,
+ Permission.Type type) {
+ for (String roleName : user.getRoles()) {
+ Role role = userManager.getRole(roleName);
+ if (role.getPermission().isPermissionSet(type)
+ || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Checks for the existance of the parameter in the request
*
* @param request
diff --git a/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
new file mode 100644
index 0000000..ef0ef46
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpMessage;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.ParseException;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.log4j.Logger;
+
+/** class handles the communication between the application and
+ * a Restful API based web server.
+ * @param T : type of the returning response object.
+ * Note: the idea of this abstract class is to provide a wrapper for the logic around HTTP layer communication so
+ * development work can take this as a black box and focus on processing the result.
+ * With that said the abstract class will be provided as a template, which ideally can support different types
+ * of returning object (Dictionary, xmlDoc , text etc.)
+ * */
+public abstract class RestfulApiClient<T> {
+ protected static Logger logger = Logger.getLogger(RestfulApiClient.class);
+
+ /** Method to transform the response returned by the httpClient into the
+ * type specified.
+ * Note: Method need to handle case such as failed request.
+ * Also method is not supposed to pass the response object out
+ * via the returning value as the response will be closed after the
+ * execution steps out of the method context.
+ * @throws HttpResponseException
+ * @throws IOException
+ * @throws ParseException
+ * **/
+ protected abstract T parseResponse(HttpResponse response)
+ throws HttpResponseException, IOException;
+
+ /** function to perform a Get http request.
+ * @param uri the URI of the request.
+ * @param headerEntries extra entries to be added to request header.
+ * @return the response object type of which is specified by user.
+ * @throws IOException */
+ public T httpGet(URI uri, List<NameValuePair> headerEntries) throws IOException{
+ // shortcut if the passed url is invalid.
+ if (null == uri){
+ logger.error(" unable to perform httpGet as the passed uri is null");
+ return null;
+ }
+
+ HttpGet get = new HttpGet(uri);
+ return this.sendAndReturn((HttpGet)completeRequest(get, headerEntries));
+ }
+
+ /** function to perform a Post http request.
+ * @param uri the URI of the request.
+ * @param headerEntries extra entries to be added to request header.
+ * @param postingBody the content to be posted , optional.
+ * @return the response object type of which is specified by user.
+ * @throws UnsupportedEncodingException, IOException */
+ public T httpPost(URI uri,
+ List<NameValuePair> headerEntries,
+ String postingBody) throws UnsupportedEncodingException, IOException{
+ // shortcut if the passed url is invalid.
+ if (null == uri){
+ logger.error(" unable to perform httpPost as the passed uri is null.");
+ return null;
+ }
+
+ HttpPost post = new HttpPost(uri);
+ return this.sendAndReturn(completeRequest(post,headerEntries,postingBody));
+ }
+
+ /** function to perform a Delete http request.
+ * @param uri the URI of the request.
+ * @param headerEntries extra entries to be added to request header.
+ * @return the response object type of which is specified by user.
+ * @throws IOException */
+ public T httpDelete(URI uri, List<NameValuePair> headerEntries) throws IOException{
+ // shortcut if the passed url is invalid.
+ if (null == uri){
+ logger.error(" unable to perform httpDelete as the passed uri is null.");
+ return null;
+ }
+
+ HttpDelete delete = new HttpDelete(uri);
+ return this.sendAndReturn((HttpDelete)completeRequest(delete, headerEntries));
+ }
+
+ /** function to perform a Put http request.
+ * @param uri the URI of the request.
+ * @param headerEntries extra entries to be added to request header.
+ * @param postingBody the content to be posted , optional.
+ * @return the response object type of which is specified by user.
+ * @throws UnsupportedEncodingException, IOException */
+ public T httpPut(URI uri, List<NameValuePair> headerEntries,
+ String postingBody) throws UnsupportedEncodingException, IOException{
+ // shortcut if the passed url is invalid.
+ if (null == uri){
+ logger.error(" unable to perform httpPut as the passed url is null or empty.");
+ return null;
+ }
+
+ HttpPut put = new HttpPut(uri);
+ return this.sendAndReturn(completeRequest(put, headerEntries, postingBody));
+ }
+
+ /** function to dispatch the request and pass back the response.
+ * */
+ protected T sendAndReturn(HttpUriRequest request) throws IOException{
+ CloseableHttpClient client = HttpClients.createDefault();
+ try {
+ return this.parseResponse(client.execute(request));
+ }finally{
+ client.close();
+ }
+ }
+
+ /** helper function to build a valid URI.
+ * @param host host name.
+ * @param port host port.
+ * @param path extra path after host.
+ * @param isHttp indicates if whether Http or HTTPS should be used.
+ * @param params extra query parameters.
+ * @return the URI built from the inputs.
+ * @throws IOException
+ * */
+ public static URI buildUri(String host, int port, String path,
+ boolean isHttp, Pair<String, String>... params) throws IOException{
+ URIBuilder builder = new URIBuilder();
+ builder.setScheme(isHttp? "http" : "https").setHost(host).setPort(port);
+
+ if (null != path && path.length() > 0){
+ builder.setPath(path);
+ }
+
+ if (params != null) {
+ for (Pair<String, String> pair : params) {
+ builder.setParameter(pair.getFirst(), pair.getSecond());
+ }
+ }
+
+ URI uri = null;
+ try {
+ uri = builder.build();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ return uri;
+ }
+
+ /** helper function to build a valid URI.
+ * @param uri the URI to start with.
+ * @param params extra query parameters to append.
+ * @return the URI built from the inputs.
+ * @throws IOException
+ * */
+ public static URI BuildUri(URI uri, Pair<String, String>... params) throws IOException{
+ URIBuilder builder = new URIBuilder(uri);
+
+ if (params != null) {
+ for (Pair<String, String> pair : params) {
+ builder.setParameter(pair.getFirst(), pair.getSecond());
+ }
+ }
+
+ URI returningUri = null;
+ try {
+ returningUri = builder.build();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ return returningUri;
+ }
+
+ /** helper function to fill the request with header entries .
+ * */
+ private static HttpMessage completeRequest(HttpMessage request,
+ List<NameValuePair> headerEntries){
+ if (null == request){
+ logger.error("unable to complete request as the passed request object is null");
+ return request;
+ }
+
+ // dump all the header entries to the request.
+ if (null != headerEntries && headerEntries.size() > 0){
+ for (NameValuePair pair : headerEntries){
+ request.addHeader(pair.getName(), pair.getValue());
+ }
+ }
+ return request;
+ }
+
+ /** helper function to fill the request with header entries and posting body .
+ * */
+ private static HttpEntityEnclosingRequestBase completeRequest(HttpEntityEnclosingRequestBase request,
+ List<NameValuePair> headerEntries,
+ String postingBody) throws UnsupportedEncodingException{
+ if (null != completeRequest(request, headerEntries)){
+ // dump the post body UTF-8 will be used as the default encoding type.
+ if (null != postingBody && postingBody.length() > 0){
+ HttpEntity entity = new ByteArrayEntity(postingBody.getBytes("UTF-8"));
+ request.setHeader("Content-Length", Long.toString(entity.getContentLength()));
+ request.setEntity(entity);
+ }
+ }
+ return request;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/Utils.java b/azkaban-common/src/main/java/azkaban/utils/Utils.java
index 7227b0b..138b80f 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Utils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Utils.java
@@ -111,6 +111,19 @@ public class Utils {
System.exit(exitCode);
}
+ /**
+ * Tests whether a port is valid or not
+ *
+ * @param port
+ * @return true, if port is valid
+ */
+ public static boolean isValidPort(int port) {
+ if (port >= 1 && port <= 65535) {
+ return true;
+ }
+ return false;
+ }
+
public static File createTempDir() {
return createTempDir(new File(System.getProperty("java.io.tmpdir")));
}
@@ -410,7 +423,7 @@ public class Utils {
}
/**
- * @param strMemSize : memory string in the format such as 1G, 500M, 3000K, 5000
+ * @param strMemSize : memory string in the format such as 1G, 500M, 3000K, 5000
* @return : long value of memory amount in kb
*/
public static long parseMemString(String strMemSize) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
new file mode 100644
index 0000000..ed6543f
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
+
+/**
+ * Test class for ExecutableFlowPriorityComparator
+ * */
+
+public class ExecutableFlowPriorityComparatorTest {
+
+ /* Helper method to create an ExecutableFlow from serialized description */
+ private ExecutableFlow createExecutableFlow(String flowName, int priority,
+ long updateTime, int executionId) throws IOException {
+ ExecutableFlow execFlow =
+ TestUtils.createExecutableFlow("exectest1", flowName);
+
+ execFlow.setUpdateTime(updateTime);
+ execFlow.setExecutionId(executionId);
+ if (priority > 0) {
+ execFlow.getExecutionOptions().getFlowParameters()
+ .put(ExecutionOptions.FLOW_PRIORITY, String.valueOf(priority));
+ }
+ return execFlow;
+ }
+
+ /* priority queue order when all priorities are explicitly specified */
+ @Test
+ public void testExplicitlySpecifiedPriorities() throws IOException,
+ InterruptedException {
+ ExecutableFlow flow1 = createExecutableFlow("exec1", 5, 3, 1);
+ ExecutableFlow flow2 = createExecutableFlow("exec2", 6, 3, 2);
+ ExecutableFlow flow3 = createExecutableFlow("exec3", 2, 3, 3);
+ ExecutionReference dummyRef = new ExecutionReference(0);
+
+ BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+ Assert.assertEquals(flow2, queue.take().getSecond());
+ Assert.assertEquals(flow1, queue.take().getSecond());
+ Assert.assertEquals(flow3, queue.take().getSecond());
+ }
+
+ /* priority queue order when some priorities are implicitly specified */
+ @Test
+ public void testMixedSpecifiedPriorities() throws IOException,
+ InterruptedException {
+ ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 3, 1);
+ ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 3, 2);
+ ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3, 3);
+ ExecutionReference dummyRef = new ExecutionReference(0);
+
+ BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+ Assert.assertEquals(flow3, queue.take().getSecond());
+ Assert.assertEquals(flow1, queue.take().getSecond());
+ Assert.assertEquals(flow2, queue.take().getSecond());
+ }
+
+ /*
+ * priority queue order when some priorities are equal, updatetime is used in
+ * this case
+ */
+ @Test
+ public void testEqualPriorities() throws IOException, InterruptedException {
+ ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 1, 1);
+ ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2, 2);
+ ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3, 3);
+ ExecutableFlow flow4 = createExecutableFlow("exec3", 3, 4, 4);
+ ExecutionReference dummyRef = new ExecutionReference(0);
+
+ BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
+
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow4));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+ Assert.assertEquals(flow3, queue.take().getSecond());
+ Assert.assertEquals(flow1, queue.take().getSecond());
+ Assert.assertEquals(flow4, queue.take().getSecond());
+ Assert.assertEquals(flow2, queue.take().getSecond());
+ }
+
+ /*
+ * priority queue order when some priorities and updatetime are equal,
+ * execution Id is used in this case
+ */
+ @Test
+ public void testEqualUpdateTimeAndPriority() throws IOException,
+ InterruptedException {
+ ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 1, 1);
+ ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2, 2);
+ ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 2, 3);
+ ExecutableFlow flow4 = createExecutableFlow("exec3", 3, 4, 4);
+ ExecutionReference dummyRef = new ExecutionReference(0);
+
+ BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
+
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow4));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+ Assert.assertEquals(flow3, queue.take().getSecond());
+ Assert.assertEquals(flow1, queue.take().getSecond());
+ Assert.assertEquals(flow4, queue.take().getSecond());
+ Assert.assertEquals(flow2, queue.take().getSecond());
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
new file mode 100644
index 0000000..62d187b
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -0,0 +1,258 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
+
+/**
+ * Test class for executor manager
+ */
+public class ExecutorManagerTest {
+
+ /* Helper method to create a ExecutorManager Instance */
+ private ExecutorManager createMultiExecutorManagerInstance()
+ throws ExecutorManagerException {
+ return createMultiExecutorManagerInstance(new MockExecutorLoader());
+ }
+
+ /*
+ * Helper method to create a ExecutorManager Instance with the given
+ * ExecutorLoader
+ */
+ private ExecutorManager createMultiExecutorManagerInstance(
+ ExecutorLoader loader) throws ExecutorManagerException {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
+
+ loader.addExecutor("localhost", 12345);
+ loader.addExecutor("localhost", 12346);
+ return new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ }
+
+ /*
+ * Test create an executor manager instance without any executor local or
+ * remote
+ */
+ @Test(expected = ExecutorManagerException.class)
+ public void testNoExecutorScenario() throws ExecutorManagerException {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ ExecutorLoader loader = new MockExecutorLoader();
+ @SuppressWarnings("unused")
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ }
+
+ /*
+ * Test backward compatibility with just local executor
+ */
+ @Test
+ public void testLocalExecutorScenario() throws ExecutorManagerException {
+ Props props = new Props();
+ props.put("executor.port", 12345);
+
+ ExecutorLoader loader = new MockExecutorLoader();
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ Set<Executor> activeExecutors =
+ new HashSet(manager.getAllActiveExecutors());
+
+ Assert.assertEquals(activeExecutors.size(), 1);
+ Executor executor = activeExecutors.iterator().next();
+ Assert.assertEquals(executor.getHost(), "localhost");
+ Assert.assertEquals(executor.getPort(), 12345);
+ Assert.assertArrayEquals(activeExecutors.toArray(), loader
+ .fetchActiveExecutors().toArray());
+ }
+
+ /*
+ * Test executor manager initialization with multiple executors
+ */
+ @Test
+ public void testMultipleExecutorScenario() throws ExecutorManagerException {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ ExecutorLoader loader = new MockExecutorLoader();
+ Executor executor1 = loader.addExecutor("localhost", 12345);
+ Executor executor2 = loader.addExecutor("localhost", 12346);
+
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ Set<Executor> activeExecutors =
+ new HashSet(manager.getAllActiveExecutors());
+ Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
+ executor1, executor2 });
+ }
+
+ /*
+ * Test executor manager active executor reload
+ */
+ @Test
+ public void testSetupExecutorsSucess() throws ExecutorManagerException {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ ExecutorLoader loader = new MockExecutorLoader();
+ Executor executor1 = loader.addExecutor("localhost", 12345);
+
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
+ new Executor[] { executor1 });
+
+ // mark older executor as inactive
+ executor1.setActive(false);
+ loader.updateExecutor(executor1);
+ Executor executor2 = loader.addExecutor("localhost", 12346);
+ Executor executor3 = loader.addExecutor("localhost", 12347);
+ manager.setupExecutors();
+
+ Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
+ new Executor[] { executor2, executor3 });
+ }
+
+ /*
+ * Test executor manager active executor reload and resulting in no active
+ * executors
+ */
+ @Test(expected = ExecutorManagerException.class)
+ public void testSetupExecutorsException() throws ExecutorManagerException {
+ Props props = new Props();
+ props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ ExecutorLoader loader = new MockExecutorLoader();
+ Executor executor1 = loader.addExecutor("localhost", 12345);
+
+ ExecutorManager manager =
+ new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+ Set<Executor> activeExecutors =
+ new HashSet(manager.getAllActiveExecutors());
+ Assert.assertArrayEquals(activeExecutors.toArray(),
+ new Executor[] { executor1 });
+
+ // mark older executor as inactive
+ executor1.setActive(false);
+ loader.updateExecutor(executor1);
+ manager.setupExecutors();
+ }
+
+ /* Test disabling queue process thread to pause dispatching */
+ @Test
+ public void testDisablingQueueProcessThread() throws ExecutorManagerException {
+ ExecutorManager manager = createMultiExecutorManagerInstance();
+ manager.enableQueueProcessorThread();
+ Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
+ manager.disableQueueProcessorThread();
+ Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
+ }
+
+ /* Test renabling queue process thread to pause restart dispatching */
+ @Test
+ public void testEnablingQueueProcessThread() throws ExecutorManagerException {
+ ExecutorManager manager = createMultiExecutorManagerInstance();
+ Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
+ manager.enableQueueProcessorThread();
+ Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
+ }
+
+ /* Test submit a non-dispatched flow */
+ @Test
+ public void testQueuedFlows() throws ExecutorManagerException, IOException {
+ ExecutorLoader loader = new MockExecutorLoader();
+ ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ flow1.setExecutionId(1);
+ ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ flow2.setExecutionId(2);
+
+ User testUser = TestUtils.getTestUser();
+ manager.submitExecutableFlow(flow1, testUser.getUserId());
+ manager.submitExecutableFlow(flow2, testUser.getUserId());
+
+ List<ExecutableFlow> testFlows = new LinkedList<ExecutableFlow>();
+ testFlows.add(flow1);
+ testFlows.add(flow2);
+
+ List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
+ loader.fetchQueuedFlows();
+ Assert.assertEquals(queuedFlowsDB.size(), testFlows.size());
+ // Verify things are correctly setup in db
+ for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
+ Assert.assertTrue(testFlows.contains(pair.getSecond()));
+ }
+
+ // Verify running flows using old definition of "running" flows i.e. a
+ // non-dispatched flow is also considered running
+ List<ExecutableFlow> managerActiveFlows = manager.getRunningFlows();
+ Assert.assertTrue(managerActiveFlows.containsAll(testFlows)
+ && testFlows.containsAll(managerActiveFlows));
+
+ // Verify getQueuedFlowIds method
+ Assert.assertEquals("[1, 2]", manager.getQueuedFlowIds());
+ }
+
+ /* Test submit duplicate flow when previous instance is not dispatched */
+ @Test(expected = ExecutorManagerException.class)
+ public void testDuplicateQueuedFlows() throws ExecutorManagerException,
+ IOException {
+ ExecutorManager manager = createMultiExecutorManagerInstance();
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ flow1.getExecutionOptions().setConcurrentOption(
+ ExecutionOptions.CONCURRENT_OPTION_SKIP);
+
+ User testUser = TestUtils.getTestUser();
+ manager.submitExecutableFlow(flow1, testUser.getUserId());
+ manager.submitExecutableFlow(flow1, testUser.getUserId());
+ }
+
+ /*
+ * Test killing a job in preparation stage at webserver side i.e. a
+ * non-dispatched flow
+ */
+ @Test
+ public void testKillQueuedFlow() throws ExecutorManagerException, IOException {
+ ExecutorLoader loader = new MockExecutorLoader();
+ ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ User testUser = TestUtils.getTestUser();
+ manager.submitExecutableFlow(flow1, testUser.getUserId());
+
+ manager.cancelFlow(flow1, testUser.getUserId());
+ ExecutableFlow fetchedFlow =
+ loader.fetchExecutableFlow(flow1.getExecutionId());
+ Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
+
+ Assert.assertFalse(manager.getRunningFlows().contains(flow1));
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index f453f0c..dd32302 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -21,8 +21,11 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
@@ -30,34 +33,37 @@ import javax.sql.DataSource;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
-
import org.joda.time.DateTime;
-
+import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import azkaban.database.DataSourceUtils;
+import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.flow.Flow;
import azkaban.project.Project;
+import azkaban.user.User;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
public class JdbcExecutorLoaderTest {
private static boolean testDBExists;
+ /* Directory with serialized description of test flows */
+ private static final String UNIT_BASE_DIR =
+ "../azkaban-test/src/test/resources/executions";
// @TODO remove this and turn into local host.
- private static final String host = "cyu-ld.linkedin.biz";
+ private static final String host = "localhost";
private static final int port = 3306;
private static final String database = "azkaban2";
private static final String user = "azkaban";
private static final String password = "azkaban";
private static final int numConnections = 10;
- private File flowDir = new File("unit/executions/exectest1");
-
@BeforeClass
public static void setupDB() {
DataSource dataSource =
@@ -117,12 +123,31 @@ public class JdbcExecutorLoaderTest {
return;
}
- DbUtils.closeQuietly(connection);
+ try {
+ runner.query(connection, "SELECT COUNT(1) FROM executors",
+ countHandler);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
- clearDB();
+ try {
+ runner.query(connection, "SELECT COUNT(1) FROM executor_events",
+ countHandler);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ DbUtils.closeQuietly(connection);
}
- private static void clearDB() {
+ @After
+ public void clearDB() {
if (!testDBExists) {
return;
}
@@ -178,6 +203,24 @@ public class JdbcExecutorLoaderTest {
return;
}
+ try {
+ runner.update(connection, "DELETE FROM executors");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ try {
+ runner.update(connection, "DELETE FROM executor_events");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
DbUtils.closeQuietly(connection);
}
@@ -186,9 +229,8 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
-
ExecutorLoader loader = createLoader();
- ExecutableFlow flow = createExecutableFlow("exec1");
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
@@ -217,7 +259,7 @@ public class JdbcExecutorLoaderTest {
}
ExecutorLoader loader = createLoader();
- ExecutableFlow flow = createExecutableFlow("exec1");
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
@@ -256,7 +298,7 @@ public class JdbcExecutorLoaderTest {
ExecutableFlow flow = createExecutableFlow(10, "exec1");
flow.setExecutionId(10);
- File jobFile = new File(flowDir, "job10.job");
+ File jobFile = new File(UNIT_BASE_DIR + "/exectest1", "job10.job");
Props props = new Props(null, jobFile);
props.put("test", "test2");
ExecutableNode oldNode = flow.getExecutableNode("job10");
@@ -293,6 +335,433 @@ public class JdbcExecutorLoaderTest {
}
+ /* Test exception when unassigning an missing execution */
+ @Test
+ public void testUnassignExecutorException() throws ExecutorManagerException,
+ IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ try {
+ loader.unassignExecutor(2);
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ }
+
+ /* Test happy case when unassigning executor for a flow execution */
+ @Test
+ public void testUnassignExecutor() throws ExecutorManagerException,
+ IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ String host = "localhost";
+ int port = 12345;
+ Executor executor = loader.addExecutor(host, port);
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow);
+ loader.assignExecutor(executor.getId(), flow.getExecutionId());
+ Assert.assertEquals(
+ loader.fetchExecutorByExecutionId(flow.getExecutionId()), executor);
+ loader.unassignExecutor(flow.getExecutionId());
+ Assert.assertEquals(
+ loader.fetchExecutorByExecutionId(flow.getExecutionId()), null);
+ }
+
+ /* Test exception when assigning a non-existent executor to a flow */
+ @Test
+ public void testAssignExecutorInvalidExecutor()
+ throws ExecutorManagerException, IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow);
+ try {
+ loader.assignExecutor(flow.getExecutionId(), 1);
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ }
+
+ /* Test exception when assigning an executor to a non-existent flow execution */
+ @Test
+ public void testAssignExecutorInvalidExecution()
+ throws ExecutorManagerException, IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ String host = "localhost";
+ int port = 12345;
+ Executor executor = loader.addExecutor(host, port);
+ try {
+ loader.assignExecutor(2, executor.getId());
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ }
+
+ /* Test null return when an invalid execution flows */
+ @Test
+ public void testFetchMissingExecutorByExecution()
+ throws ExecutorManagerException, IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ Assert.assertEquals(loader.fetchExecutorByExecutionId(1), null);
+ }
+
+ /* Test null return when for a non-dispatched execution */
+ @Test
+ public void testFetchExecutorByQueuedExecution()
+ throws ExecutorManagerException, IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow);
+ Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
+ null);
+ }
+
+ /* Test happy case when assigning and fetching an executor to a flow execution */
+ @Test
+ public void testAssignAndFetchExecutor() throws ExecutorManagerException,
+ IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ String host = "localhost";
+ int port = 12345;
+ Executor executor = loader.addExecutor(host, port);
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow);
+ loader.assignExecutor(executor.getId(), flow.getExecutionId());
+ Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
+ executor);
+ }
+
+ /* Test fetchQueuedFlows when there are no queued flows */
+ @Test
+ public void testFetchNoQueuedFlows() throws ExecutorManagerException,
+ IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+ loader.fetchQueuedFlows();
+
+ // no execution flows at all i.e. no running, completed or queued flows
+ Assert.assertTrue(queuedFlows.isEmpty());
+
+ String host = "lcoalhost";
+ int port = 12345;
+ Executor executor = loader.addExecutor(host, port);
+
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow);
+ loader.assignExecutor(executor.getId(), flow.getExecutionId());
+ // only completed flows
+ Assert.assertTrue(queuedFlows.isEmpty());
+
+ ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ loader.uploadExecutableFlow(flow);
+ loader.assignExecutor(executor.getId(), flow.getExecutionId());
+ ExecutionReference ref = new ExecutionReference(flow2.getExecutionId());
+ loader.addActiveExecutableReference(ref);
+ // only running and completed flows
+ Assert.assertTrue(queuedFlows.isEmpty());
+ }
+
+ /* Test fetchQueuedFlows happy case */
+ @Test
+ public void testFetchQueuedFlows() throws ExecutorManagerException,
+ IOException {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+ new LinkedList<Pair<ExecutionReference, ExecutableFlow>>();
+
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ loader.uploadExecutableFlow(flow);
+ ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ loader.uploadExecutableFlow(flow);
+
+ ExecutionReference ref2 = new ExecutionReference(flow2.getExecutionId());
+ loader.addActiveExecutableReference(ref2);
+ ExecutionReference ref = new ExecutionReference(flow.getExecutionId());
+ loader.addActiveExecutableReference(ref);
+
+ queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref, flow));
+ queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref2, flow2));
+
+ // only running and completed flows
+ Assert.assertArrayEquals(loader.fetchQueuedFlows().toArray(),
+ queuedFlows.toArray());
+ }
+
+ /* Test all executors fetch from empty executors */
+ @Test
+ public void testFetchEmptyExecutors() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ List<Executor> executors = loader.fetchAllExecutors();
+ Assert.assertEquals(executors.size(), 0);
+ }
+
+ /* Test active executors fetch from empty executors */
+ @Test
+ public void testFetchEmptyActiveExecutors() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ List<Executor> executors = loader.fetchActiveExecutors();
+ Assert.assertEquals(executors.size(), 0);
+ }
+
+ /* Test missing executor fetch with search by executor id */
+ @Test
+ public void testFetchMissingExecutorId() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ Executor executor = loader.fetchExecutor(0);
+ Assert.assertEquals(executor, null);
+ }
+
+ /* Test missing executor fetch with search by host:port */
+ @Test
+ public void testFetchMissingExecutorHostPort() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ Executor executor = loader.fetchExecutor("localhost", 12345);
+ Assert.assertEquals(executor, null);
+ }
+
+ /* Test executor events fetch from with no logged executor */
+ @Test
+ public void testFetchEmptyExecutorEvents() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ Executor executor = new Executor(1, "localhost", 12345, true);
+ List<ExecutorLogEvent> executorEvents =
+ loader.getExecutorEvents(executor, 5, 0);
+ Assert.assertEquals(executorEvents.size(), 0);
+ }
+
+ /* Test logging ExecutorEvents */
+ @Test
+ public void testExecutorEvents() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ int skip = 1;
+ User user = new User("testUser");
+ Executor executor = new Executor(1, "localhost", 12345, true);
+ String message = "My message ";
+ EventType[] events =
+ { EventType.CREATED, EventType.HOST_UPDATE, EventType.INACTIVATION };
+
+ for (EventType event : events) {
+ loader.postExecutorEvent(executor, event, user.getUserId(),
+ message + event.getNumVal());
+ }
+
+ List<ExecutorLogEvent> eventLogs =
+ loader.getExecutorEvents(executor, 10, skip);
+ Assert.assertTrue(eventLogs.size() == 2);
+
+ for (int index = 0; index < eventLogs.size(); ++index) {
+ ExecutorLogEvent eventLog = eventLogs.get(index);
+ Assert.assertEquals(eventLog.getExecutorId(), executor.getId());
+ Assert.assertEquals(eventLog.getUser(), user.getUserId());
+ Assert.assertEquals(eventLog.getType(), events[index + skip]);
+ Assert.assertEquals(eventLog.getMessage(),
+ message + events[index + skip].getNumVal());
+ }
+ }
+
+ /* Test to add duplicate executors */
+ @Test
+ public void testDuplicateAddExecutor() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ try {
+ String host = "localhost";
+ int port = 12345;
+ loader.addExecutor(host, port);
+ loader.addExecutor(host, port);
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ }
+
+ /* Test to try update a non-existent executor */
+ @Test
+ public void testMissingExecutorUpdate() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ try {
+ Executor executor = new Executor(1, "localhost", 1234, true);
+ loader.updateExecutor(executor);
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ clearDB();
+ }
+
+ /* Test add & fetch by Id Executors */
+ @Test
+ public void testSingleExecutorFetchById() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ List<Executor> executors = addTestExecutors(loader);
+ for (Executor executor : executors) {
+ Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+ Assert.assertEquals(executor, fetchedExecutor);
+ }
+ }
+
+ /* Test fetch all executors */
+ @Test
+ public void testFetchAllExecutors() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ List<Executor> executors = addTestExecutors(loader);
+
+ executors.get(0).setActive(false);
+ loader.updateExecutor(executors.get(0));
+
+ List<Executor> fetchedExecutors = loader.fetchAllExecutors();
+ Assert.assertEquals(executors.size(), fetchedExecutors.size());
+
+ Assert.assertArrayEquals(executors.toArray(), fetchedExecutors.toArray());
+ }
+
+ /* Test fetch only active executors */
+ @Test
+ public void testFetchActiveExecutors() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ List<Executor> executors = addTestExecutors(loader);
+
+ executors.get(0).setActive(false);
+ loader.updateExecutor(executors.get(0));
+
+ List<Executor> fetchedExecutors = loader.fetchActiveExecutors();
+ Assert.assertEquals(executors.size(), fetchedExecutors.size() + 1);
+ executors.remove(0);
+
+ Assert.assertArrayEquals(executors.toArray(), fetchedExecutors.toArray());
+ }
+
+ /* Test add & fetch by host:port Executors */
+ @Test
+ public void testSingleExecutorFetchHostPort() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ List<Executor> executors = addTestExecutors(loader);
+ for (Executor executor : executors) {
+ Executor fetchedExecutor =
+ loader.fetchExecutor(executor.getHost(), executor.getPort());
+ Assert.assertEquals(executor, fetchedExecutor);
+ }
+ }
+
+ /* Helper method used in methods testing jdbc interface for executors table */
+ private List<Executor> addTestExecutors(ExecutorLoader loader)
+ throws ExecutorManagerException {
+ List<Executor> executors = new ArrayList<Executor>();
+ executors.add(loader.addExecutor("localhost1", 12345));
+ executors.add(loader.addExecutor("localhost2", 12346));
+ executors.add(loader.addExecutor("localhost1", 12347));
+ return executors;
+ }
+
+ /* Test Executor Inactivation */
+ @Test
+ public void testExecutorInactivation() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ Executor executor = loader.addExecutor("localhost1", 12345);
+ Assert.assertTrue(executor.isActive());
+
+ executor.setActive(false);
+ loader.updateExecutor(executor);
+
+ Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+
+ Assert.assertEquals(executor.getHost(), fetchedExecutor.getHost());
+ Assert.assertEquals(executor.getId(), fetchedExecutor.getId());
+ Assert.assertEquals(executor.getPort(), fetchedExecutor.getPort());
+ Assert.assertFalse(fetchedExecutor.isActive());
+ }
+
+ /* Test Executor reactivation */
+ @Test
+ public void testExecutorActivation() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ Executor executor = loader.addExecutor("localhost1", 12345);
+ Assert.assertTrue(executor.isActive());
+
+ executor.setActive(false);
+ loader.updateExecutor(executor);
+ Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+ Assert.assertFalse(fetchedExecutor.isActive());
+
+ executor.setActive(true);
+ loader.updateExecutor(executor);
+ fetchedExecutor = loader.fetchExecutor(executor.getId());
+
+ Assert.assertEquals(executor, fetchedExecutor);
+ }
+
@Test
public void testActiveReference() throws Exception {
if (!isTestSetup()) {
@@ -300,19 +769,20 @@ public class JdbcExecutorLoaderTest {
}
ExecutorLoader loader = createLoader();
- ExecutableFlow flow1 = createExecutableFlow("exec1");
+ ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow1);
+ Executor executor = new Executor(2, "test", 1, true);
ExecutionReference ref1 =
- new ExecutionReference(flow1.getExecutionId(), "test", 1);
+ new ExecutionReference(flow1.getExecutionId(), executor);
loader.addActiveExecutableReference(ref1);
- ExecutableFlow flow2 = createExecutableFlow("exec1");
+ ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow2);
ExecutionReference ref2 =
- new ExecutionReference(flow2.getExecutionId(), "test", 1);
+ new ExecutionReference(flow2.getExecutionId(), executor);
loader.addActiveExecutableReference(ref2);
- ExecutableFlow flow3 = createExecutableFlow("exec1");
+ ExecutableFlow flow3 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow3);
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
@@ -356,7 +826,7 @@ public class JdbcExecutorLoaderTest {
@Ignore @Test
public void testSmallUploadLog() throws ExecutorManagerException {
- File logDir = new File("unit/executions/logtest");
+ File logDir = new File(UNIT_BASE_DIR + "logtest");
File[] smalllog =
{ new File(logDir, "log1.log"), new File(logDir, "log2.log"),
new File(logDir, "log3.log") };
@@ -381,7 +851,7 @@ public class JdbcExecutorLoaderTest {
@Ignore @Test
public void testLargeUploadLog() throws ExecutorManagerException {
- File logDir = new File("unit/executions/logtest");
+ File logDir = new File(UNIT_BASE_DIR + "logtest");
// Multiple of 255 for Henry the Eigth
File[] largelog =
@@ -427,7 +897,7 @@ public class JdbcExecutorLoaderTest {
ExecutorLoader loader = createLoader();
- File logDir = new File("unit/executions/logtest");
+ File logDir = new File(UNIT_BASE_DIR + "logtest");
// Multiple of 255 for Henry the Eigth
File[] largelog =
@@ -451,37 +921,10 @@ public class JdbcExecutorLoaderTest {
}
private ExecutableFlow createExecutableFlow(int executionId, String flowName)
- throws IOException {
- File jsonFlowFile = new File(flowDir, flowName + ".flow");
- @SuppressWarnings("unchecked")
- HashMap<String, Object> flowObj =
- (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
- Flow flow = Flow.flowFromObject(flowObj);
- Project project = new Project(1, "flow");
- HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
- flowMap.put(flow.getId(), flow);
- project.setFlows(flowMap);
- ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+ throws IOException {
+ ExecutableFlow execFlow =
+ TestUtils.createExecutableFlow("exectest1", flowName);
execFlow.setExecutionId(executionId);
-
- return execFlow;
- }
-
- private ExecutableFlow createExecutableFlow(String flowName)
- throws IOException {
- File jsonFlowFile = new File(flowDir, flowName + ".flow");
- @SuppressWarnings("unchecked")
- HashMap<String, Object> flowObj =
- (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
- Flow flow = Flow.flowFromObject(flowObj);
- Project project = new Project(1, "flow");
- HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
- flowMap.put(flow.getId(), flow);
- project.setFlows(flowMap);
- ExecutableFlow execFlow = new ExecutableFlow(project, flow);
-
return execFlow;
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index b9ad178..0db2de5 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -17,16 +17,21 @@
package azkaban.executor;
import java.io.File;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
public class MockExecutorLoader implements ExecutorLoader {
+ HashMap<Integer, Integer> executionExecutorMapping =
+ new HashMap<Integer, Integer>();
HashMap<Integer, ExecutableFlow> flows =
new HashMap<Integer, ExecutableFlow>();
HashMap<String, ExecutableNode> nodes = new HashMap<String, ExecutableNode>();
@@ -36,6 +41,10 @@ public class MockExecutorLoader implements ExecutorLoader {
HashMap<String, Integer> jobUpdateCount = new HashMap<String, Integer>();
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
new HashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+ List<Executor> executors = new ArrayList<Executor>();
+ int executorIdCounter = 0;
+ Map<Integer, ArrayList<ExecutorLogEvent>> executorEvents =
+ new HashMap<Integer, ArrayList<ExecutorLogEvent>>();
@Override
public void uploadExecutableFlow(ExecutableFlow flow)
@@ -249,4 +258,119 @@ public class MockExecutorLoader implements ExecutorLoader {
}
+ @Override
+ public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
+ List<Executor> activeExecutors = new ArrayList<Executor>();
+ for (Executor executor : executors) {
+ if (executor.isActive()) {
+ activeExecutors.add(executor);
+ }
+ }
+ return activeExecutors;
+ }
+
+ @Override
+ public Executor fetchExecutor(String host, int port)
+ throws ExecutorManagerException {
+ for (Executor executor : executors) {
+ if (executor.getHost().equals(host) && executor.getPort() == port) {
+ return executor;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
+ for (Executor executor : executors) {
+ if (executor.getId() == executorId) {
+ return executor;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Executor addExecutor(String host, int port)
+ throws ExecutorManagerException {
+ Executor executor = null;
+ if (fetchExecutor(host, port) == null) {
+ executorIdCounter++;
+ executor = new Executor(executorIdCounter, host, port, true);
+ executors.add(executor);
+ }
+ return executor;
+ }
+
+ @Override
+ public void postExecutorEvent(Executor executor, EventType type, String user,
+ String message) throws ExecutorManagerException {
+ ExecutorLogEvent event =
+ new ExecutorLogEvent(executor.getId(), user, new Date(), type, message);
+
+ if (!executorEvents.containsKey(executor.getId())) {
+ executorEvents.put(executor.getId(), new ArrayList<ExecutorLogEvent>());
+ }
+
+ executorEvents.get(executor.getId()).add(event);
+ }
+
+ @Override
+ public List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
+ int skip) throws ExecutorManagerException {
+ if (!executorEvents.containsKey(executor.getId())) {
+ List<ExecutorLogEvent> events = executorEvents.get(executor.getId());
+ return events.subList(skip, Math.min(num + skip - 1, events.size() - 1));
+ }
+ return null;
+ }
+
+ @Override
+ public void updateExecutor(Executor executor) throws ExecutorManagerException {
+ Executor oldExecutor = fetchExecutor(executor.getId());
+ executors.remove(oldExecutor);
+ executors.add(executor);
+ }
+
+ @Override
+ public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
+ return executors;
+ }
+
+ @Override
+ public void assignExecutor(int executorId, int execId)
+ throws ExecutorManagerException {
+ ExecutionReference ref = refs.get(execId);
+ ref.setExecutor(fetchExecutor(executorId));
+ executionExecutorMapping.put(execId, executorId);
+ }
+
+ @Override
+ public Executor fetchExecutorByExecutionId(int execId) throws ExecutorManagerException {
+ if (executionExecutorMapping.containsKey(execId)) {
+ return fetchExecutor(executionExecutorMapping.get(execId));
+ } else {
+ throw new ExecutorManagerException(
+ "Failed to find executor with execution : " + execId);
+ }
+ }
+
+ @Override
+ public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+ throws ExecutorManagerException {
+ List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+ new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+ for (int execId : refs.keySet()) {
+ if (!executionExecutorMapping.containsKey(execId)) {
+ queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(refs
+ .get(execId), flows.get(execId)));
+ }
+ }
+ return queuedFlows;
+ }
+
+ @Override
+ public void unassignExecutor(int executionId) throws ExecutorManagerException {
+ executionExecutorMapping.remove(executionId);
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
new file mode 100644
index 0000000..94bbd7e
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
@@ -0,0 +1,208 @@
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+
+public class QueuedExecutionsTest {
+ /* Directory with serialized description of test flows */
+ private static final String UNIT_BASE_DIR =
+ "../azkaban-test/src/test/resources/executions/exectest1/";
+
+ private File getFlowDir(String flow) {
+ return new File(UNIT_BASE_DIR + flow + ".flow");
+ }
+
+ /*
+ * Helper method to create an (ExecutionReference, ExecutableFlow) from
+ * serialized description
+ */
+ private Pair<ExecutionReference, ExecutableFlow> createExecutablePair(
+ String flowName, int execId) throws IOException {
+ File jsonFlowFile = getFlowDir(flowName);
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> flowObj =
+ (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+ Flow flow = Flow.flowFromObject(flowObj);
+ Project project = new Project(1, "flow");
+ HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+ flowMap.put(flow.getId(), flow);
+ project.setFlows(flowMap);
+ ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+ execFlow.setExecutionId(execId);
+ ExecutionReference ref = new ExecutionReference(execId);
+ return new Pair<ExecutionReference, ExecutableFlow>(ref, execFlow);
+ }
+
+ public List<Pair<ExecutionReference, ExecutableFlow>> getDummyData()
+ throws IOException {
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList =
+ new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+ dataList.add(createExecutablePair("exec1", 1));
+ dataList.add(createExecutablePair("exec2", 2));
+ return dataList;
+ }
+
+ /* Test enqueue method happy case */
+ @Test
+ public void testEnqueueHappyCase() throws IOException,
+ ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+ queue.enqueue(pair.getSecond(), pair.getFirst());
+ }
+
+ Assert.assertTrue(queue.getAllEntries().containsAll(dataList));
+ Assert.assertTrue(dataList.containsAll(queue.getAllEntries()));
+ }
+
+ /* Test enqueue duplicate execution ids */
+ @Test(expected = ExecutorManagerException.class)
+ public void testEnqueueDuplicateExecution() throws IOException,
+ ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair1 =
+ createExecutablePair("exec1", 1);
+ QueuedExecutions queue = new QueuedExecutions(5);
+ queue.enqueue(pair1.getSecond(), pair1.getFirst());
+ queue.enqueue(pair1.getSecond(), pair1.getFirst());
+ }
+
+ /* Test enqueue more than capacity */
+ @Test(expected = ExecutorManagerException.class)
+ public void testEnqueueOverflow() throws IOException,
+ ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair1 =
+ createExecutablePair("exec1", 1);
+ QueuedExecutions queue = new QueuedExecutions(1);
+ queue.enqueue(pair1.getSecond(), pair1.getFirst());
+ queue.enqueue(pair1.getSecond(), pair1.getFirst());
+ }
+
+ /* Test EnqueueAll method */
+ @Test
+ public void testEnqueueAll() throws IOException, ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ Assert.assertTrue(queue.getAllEntries().containsAll(dataList));
+ Assert.assertTrue(dataList.containsAll(queue.getAllEntries()));
+ }
+
+ /* Test size method */
+ @Test
+ public void testSize() throws IOException, ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ Assert.assertEquals(queue.size(), 2);
+ }
+
+ /* Test dequeue method */
+ @Test
+ public void testDequeue() throws IOException, ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ queue.dequeue(dataList.get(0).getFirst().getExecId());
+ Assert.assertEquals(queue.size(), 1);
+ Assert.assertTrue(queue.getAllEntries().contains(dataList.get(1)));
+ }
+
+ /* Test clear method */
+ @Test
+ public void testClear() throws IOException, ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ Assert.assertEquals(queue.size(), 2);
+ queue.clear();
+ Assert.assertEquals(queue.size(), 0);
+ }
+
+ /* Test isEmpty method */
+ @Test
+ public void testIsEmpty() throws IOException, ExecutorManagerException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ Assert.assertTrue(queue.isEmpty());
+ queue.enqueueAll(dataList);
+ Assert.assertEquals(queue.size(), 2);
+ queue.clear();
+ Assert.assertTrue(queue.isEmpty());
+ }
+
+ /* Test fetchHead method */
+ @Test
+ public void testFetchHead() throws IOException, ExecutorManagerException,
+ InterruptedException {
+ QueuedExecutions queue = new QueuedExecutions(5);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ Assert.assertTrue(queue.isEmpty());
+ queue.enqueueAll(dataList);
+ Assert.assertEquals(queue.fetchHead(), dataList.get(0));
+ Assert.assertEquals(queue.fetchHead(), dataList.get(1));
+ }
+
+ /* Test isFull method */
+ @Test
+ public void testIsFull() throws IOException, ExecutorManagerException,
+ InterruptedException {
+ QueuedExecutions queue = new QueuedExecutions(2);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ Assert.assertTrue(queue.isFull());
+ }
+
+ /* Test hasExecution method */
+ @Test
+ public void testHasExecution() throws IOException, ExecutorManagerException,
+ InterruptedException {
+ QueuedExecutions queue = new QueuedExecutions(2);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+ Assert.assertTrue(queue.hasExecution(pair.getFirst().getExecId()));
+ }
+ Assert.assertFalse(queue.hasExecution(5));
+ Assert.assertFalse(queue.hasExecution(7));
+ Assert.assertFalse(queue.hasExecution(15));
+ }
+
+ /* Test getFlow method */
+ @Test
+ public void testGetFlow() throws IOException, ExecutorManagerException,
+ InterruptedException {
+ QueuedExecutions queue = new QueuedExecutions(2);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+ Assert.assertEquals(pair.getSecond(),
+ queue.getFlow(pair.getFirst().getExecId()));
+ }
+ }
+
+ /* Test getReferences method */
+ @Test
+ public void testGetReferences() throws IOException, ExecutorManagerException,
+ InterruptedException {
+ QueuedExecutions queue = new QueuedExecutions(2);
+ List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+ queue.enqueueAll(dataList);
+ for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+ Assert.assertEquals(pair.getFirst(),
+ queue.getReference(pair.getFirst().getExecId()));
+ }
+ }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
new file mode 100644
index 0000000..e54b182
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -0,0 +1,738 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.BasicConfigurator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.executor.selector.*;
+import azkaban.utils.JSONUtils;
+
+public class SelectorTest {
+ // mock executor object.
+ protected class MockExecutorObject implements Comparable <MockExecutorObject>{
+ public String name;
+ public int port;
+ public double percentOfRemainingMemory;
+ public int amountOfRemainingMemory;
+ public int priority;
+ public Date lastAssigned;
+ public double percentOfRemainingFlowcapacity;
+ public int remainingTmp;
+
+ public MockExecutorObject(String name,
+ int port,
+ double percentOfRemainingMemory,
+ int amountOfRemainingMemory,
+ int priority,
+ Date lastAssigned,
+ double percentOfRemainingFlowcapacity,
+ int remainingTmp)
+ {
+ this.name = name;
+ this.port = port;
+ this.percentOfRemainingMemory = percentOfRemainingMemory;
+ this.amountOfRemainingMemory =amountOfRemainingMemory;
+ this.priority = priority;
+ this.lastAssigned = lastAssigned;
+ this.percentOfRemainingFlowcapacity = percentOfRemainingFlowcapacity;
+ this.remainingTmp = remainingTmp;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.name;
+ }
+
+ @Override
+ public int compareTo(MockExecutorObject o) {
+ return null == o ? 1 : this.hashCode() - o.hashCode();
+ }
+ }
+
+ // Mock flow object.
+ protected class MockFlowObject{
+ public String name;
+ public int requiredRemainingMemory;
+ public int requiredTotalMemory;
+ public int requiredRemainingTmpSpace;
+ public int priority;
+
+ public MockFlowObject(String name,
+ int requiredTotalMemory,
+ int requiredRemainingMemory,
+ int requiredRemainingTmpSpace,
+ int priority)
+ {
+ this.name = name;
+ this.requiredTotalMemory = requiredTotalMemory;
+ this.requiredRemainingMemory = requiredRemainingMemory;
+ this.requiredRemainingTmpSpace = requiredRemainingTmpSpace;
+ this.priority = priority;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.name;
+ }
+ }
+
+ // mock Filter class.
+ protected class MockFilter
+ extends CandidateFilter<MockExecutorObject,MockFlowObject>{
+
+ @Override
+ public String getName() {
+ return "Mockfilter";
+ }
+
+ public MockFilter(){
+ }
+
+ // function to register the remainingMemory filter.
+ // for test purpose the registration is put in a separated method, in production the work should be done
+ // in the constructor.
+ public void registerFilterforTotalMemory(){
+ this.registerFactorFilter(FactorFilter.create("requiredTotalMemory",
+ new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+ public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+
+ // Box has infinite memory.:)
+ if (itemToCheck.percentOfRemainingMemory == 0) {
+ return true;
+ }
+
+ // calculate the memory and return.
+ return itemToCheck.amountOfRemainingMemory / itemToCheck.percentOfRemainingMemory * 100 >
+ sourceObject.requiredTotalMemory;
+ }}));
+ }
+
+ public void registerFilterforRemainingMemory(){
+ this.registerFactorFilter(FactorFilter.create("requiredRemainingMemory",
+ new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+ public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+ return itemToCheck.amountOfRemainingMemory > sourceObject.requiredRemainingMemory;
+ }}));
+ }
+
+ public void registerFilterforPriority(){
+ this.registerFactorFilter(FactorFilter.create("requiredProprity",
+ new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+ public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+
+ // priority value, the bigger the lower.
+ return itemToCheck.priority >= sourceObject.priority;
+ }}));
+ }
+
+ public void registerFilterforRemainingTmpSpace(){
+ this.registerFactorFilter(FactorFilter.create("requiredRemainingTmpSpace",
+ new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+ public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+ // REAL LOGIC COMES HERE -
+ if (null == itemToCheck || null == sourceObject){
+ return false;
+ }
+
+ return itemToCheck.remainingTmp > sourceObject.requiredRemainingTmpSpace;
+ }}));
+ }
+
+ }
+
+ // mock comparator class.
+ protected class MockComparator
+ extends CandidateComparator<MockExecutorObject>{
+
+ @Override
+ public String getName() {
+ return "MockComparator";
+ }
+
+ @Override
+ protected boolean tieBreak(MockExecutorObject object1, MockExecutorObject object2){
+ if (null == object2) return true;
+ if (null == object1) return false;
+ return object1.name.compareTo(object2.name) >= 0;
+ }
+
+ public MockComparator(){
+ }
+
+ public void registerComparerForMemory(int weight){
+ this.registerFactorComparator(FactorComparator.create("Memory", weight, new Comparator<MockExecutorObject>(){
+ public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+ int result = 0 ;
+
+ // check remaining amount of memory.
+ result = o1.amountOfRemainingMemory - o2.amountOfRemainingMemory;
+ if (result != 0){
+ return result > 0 ? 1 : -1;
+ }
+
+ // check remaining % .
+ result = (int)(o1.percentOfRemainingMemory - o2.percentOfRemainingMemory);
+ return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+ } }));
+ }
+
+ public void registerComparerForRemainingSpace(int weight){
+ this.registerFactorComparator(FactorComparator.create("RemainingTmp", weight, new Comparator<MockExecutorObject>(){
+ public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+ int result = 0 ;
+
+ // check remaining % .
+ result = (int)(o1.remainingTmp - o2.remainingTmp);
+ return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+ } }));
+ }
+
+ public void registerComparerForPriority(int weight){
+ this.registerFactorComparator(FactorComparator.create("Priority", weight, new Comparator<MockExecutorObject>(){
+ public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+ int result = 0 ;
+
+ // check priority, bigger the better.
+ result = (int)(o1.priority - o2.priority);
+ return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+ } }));
+ }
+ }
+
+ // test samples.
+ protected ArrayList<MockExecutorObject> executorList = new ArrayList<MockExecutorObject>();
+
+ @BeforeClass public static void onlyOnce() {
+ BasicConfigurator.configure();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ executorList.clear();
+ executorList.add(new MockExecutorObject("Executor1",8080,50.0,2048,5,new Date(), 20, 6400));
+ executorList.add(new MockExecutorObject("Executor2",8080,50.0,2048,4,new Date(), 20, 6400));
+ executorList.add(new MockExecutorObject("Executor3",8080,40.0,2048,1,new Date(), 20, 6400));
+ executorList.add(new MockExecutorObject("Executor4",8080,50.0,2048,4,new Date(), 20, 6400));
+ executorList.add(new MockExecutorObject("Executor5",8080,50.0,1024,5,new Date(), 90, 6400));
+ executorList.add(new MockExecutorObject("Executor6",8080,50.0,1024,5,new Date(), 90, 3200));
+ executorList.add(new MockExecutorObject("Executor7",8080,50.0,1024,5,new Date(), 90, 3200));
+ executorList.add(new MockExecutorObject("Executor8",8080,50.0,2048,1,new Date(), 90, 3200));
+ executorList.add(new MockExecutorObject("Executor9",8080,50.0,2050,5,new Date(), 90, 4200));
+ executorList.add(new MockExecutorObject("Executor10",8080,00.0,1024,1,new Date(), 90, 3200));
+ executorList.add(new MockExecutorObject("Executor11",8080,20.0,2096,3,new Date(), 90, 2400));
+ executorList.add(new MockExecutorObject("Executor12",8080,90.0,2050,5,new Date(), 60, 2500));
+
+
+ // make sure each time the order is different.
+ Collections.shuffle(this.executorList);
+ }
+
+ private MockExecutorObject getExecutorByName(String name){
+ MockExecutorObject returnVal = null;
+ for (MockExecutorObject item : this.executorList){
+ if (item.name.equals(name)){
+ returnVal = item;
+ break;
+ }
+ }
+ return returnVal;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testExecutorFilter() throws Exception {
+
+ // mock object, remaining memory 11500, total memory 3095, remainingTmpSpace 4200, priority 2.
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",3096, 1500,4200,2);
+
+ MockFilter mFilter = new MockFilter();
+ mFilter.registerFilterforRemainingMemory();
+
+ // expect true.
+ boolean result = mFilter.filterTarget(this.getExecutorByName("Executor1"), dispatchingObj);
+ Assert.assertTrue(result);
+
+ //expect true.
+ result = mFilter.filterTarget(this.getExecutorByName("Executor3"), dispatchingObj);
+ /*
+ 1 [main] INFO azkaban.executor.Selector.CandidateFilter - start checking 'Executor3' with factor filter for 'Mockfilter'
+ 2 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 2 [main] INFO azkaban.executor.Selector.CandidateFilter - Final checking result : true
+ */
+ Assert.assertTrue(result);
+
+ // add the priority filter.
+ mFilter.registerFilterforPriority();
+ result = mFilter.filterTarget(this.getExecutorByName("Executor3"), dispatchingObj);
+ // expect false, for priority.
+ /*
+ 2 [main] INFO azkaban.executor.Selector.CandidateFilter - start checking 'Executor3' with factor filter for 'Mockfilter'
+ 2 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 2 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredProprity] filter result : false
+ 2 [main] INFO azkaban.executor.Selector.CandidateFilter - Final checking result : false
+ */
+ Assert.assertFalse(result);
+
+ // add the remaining space filter.
+ mFilter.registerFilterforRemainingTmpSpace();
+
+ // expect pass.
+ result = mFilter.filterTarget(this.getExecutorByName("Executor2"), dispatchingObj);
+ /*
+ 3 [main] INFO azkaban.executor.Selector.CandidateFilter - start checking 'Executor2' with factor filter for 'Mockfilter'
+ 3 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 3 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredRemainingTmpSpace] filter result : true
+ 3 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredProprity] filter result : true
+ 3 [main] INFO azkaban.executor.Selector.CandidateFilter - Final checking result : true
+ */
+ Assert.assertTrue(result);
+
+ // expect false, remaining tmp, priority will also fail but the logic shortcuts when the Tmp size check Fails.
+ result = mFilter.filterTarget(this.getExecutorByName("Executor8"), dispatchingObj);
+ /*
+ 4 [main] INFO azkaban.executor.Selector.CandidateFilter - start checking 'Executor8' with factor filter for 'Mockfilter'
+ 4 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredRemainingMemory] filter result : true
+ 4 [main] INFO azkaban.executor.Selector.CandidateFilter - [Factor: requiredRemainingTmpSpace] filter result : false
+ 4 [main] INFO azkaban.executor.Selector.CandidateFilter - Final checking result : false
+ */
+ Assert.assertFalse(result);
+
+ }
+
+ @Test
+ public void testExecutorFilterWithNullInputs() throws Exception {
+ MockFilter filter = new MockFilter();
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+ boolean result = false;
+ try {
+ result = filter.filterTarget(this.getExecutorByName("Executor1"), null);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when null value is passed to the filter.");
+ }
+ // note : the FactorFilter logic will decide whether true or false should be returned when null value
+ // is passed, for the Mock class it returns false.
+ Assert.assertFalse(result);
+
+ try {
+ result = filter.filterTarget(null, null);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when null value is passed to the filter.");
+ }
+ // note : the FactorFilter logic will decide whether true or false should be returned when null value
+ // is passed, for the Mock class it returns false.
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void testExecutorComparer() throws Exception {
+ MockComparator comparator = new MockComparator();
+ comparator.registerComparerForMemory(5);
+
+ MockExecutorObject nextExecutor = Collections.max(this.executorList, comparator);
+
+ // expect the first item to be selected, memory wise it is the max.
+ Assert.assertEquals(this.getExecutorByName("Executor11"),nextExecutor);
+
+ // add the priority factor.
+ // expect again the #9 item to be selected.
+ comparator.registerComparerForPriority(6);
+ nextExecutor = Collections.max(this.executorList, comparator);
+ Assert.assertEquals(this.getExecutorByName("Executor12"),nextExecutor);
+
+ // add the remaining space factor.
+ // expect the #12 item to be returned.
+ comparator.registerComparerForRemainingSpace(3);
+ nextExecutor = Collections.max(this.executorList, comparator);
+ Assert.assertEquals(this.getExecutorByName("Executor12"),nextExecutor);
+ }
+
+ @Test
+ public void testExecutorComparerResisterComparerWInvalidWeight() throws Exception {
+ MockComparator comparator = new MockComparator();
+ comparator.registerComparerForMemory(0);
+ }
+
+ @Test
+ public void testSelector() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(5);
+ comparator.registerComparerForRemainingSpace(3);
+
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+ new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ // mock object, remaining memory 11500, total memory 3095, remainingTmpSpace 4200, priority 2.
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",3096, 1500,4200,2);
+
+ // expected selection = #12
+ MockExecutorObject nextExecutor = morkSelector.getBest(this.executorList, dispatchingObj);
+ Assert.assertEquals(this.getExecutorByName("Executor1"),nextExecutor);
+
+ // remaining memory 11500, total memory 3095, remainingTmpSpace 14200, priority 2.
+ dispatchingObj = new MockFlowObject("flow1",3096, 1500,14200,2);
+ // all candidates should be filtered by the remaining memory.
+ nextExecutor = morkSelector.getBest(this.executorList, dispatchingObj);
+ Assert.assertEquals(null,nextExecutor);
+ }
+
+ @Test
+ public void testSelectorsignleCandidate() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(4);
+ comparator.registerComparerForRemainingSpace(1);
+
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+ new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ ArrayList<MockExecutorObject> signleExecutorList = new ArrayList<MockExecutorObject>();
+ MockExecutorObject signleExecutor = new MockExecutorObject("ExecutorX",8080,50.0,2048,3,new Date(), 20, 6400);
+ signleExecutorList.add(signleExecutor);
+
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,5);
+ MockExecutorObject executor = morkSelector.getBest(signleExecutorList, dispatchingObj);
+ // expected to see null result, as the only executor is filtered out .
+ Assert.assertTrue(null == executor);
+
+ // adjust the priority to let the executor pass the filter.
+ dispatchingObj.priority = 3;
+ executor = morkSelector.getBest(signleExecutorList, dispatchingObj);
+ Assert.assertEquals(signleExecutor, executor);
+ }
+
+ @Test
+ public void testSelectorListWithItemsThatAreReferenceEqual() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(4);
+ comparator.registerComparerForRemainingSpace(1);
+
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+ new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+ MockExecutorObject signleExecutor = new MockExecutorObject("ExecutorX",8080,50.0,2048,3,new Date(), 20, 6400);
+ list.add(signleExecutor);
+ list.add(signleExecutor);
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+ MockExecutorObject executor = morkSelector.getBest(list, dispatchingObj);
+ Assert.assertTrue(signleExecutor == executor);
+ }
+
+ @Test
+ public void testSelectorListWithItemsThatAreEqualInValue() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(4);
+ comparator.registerComparerForRemainingSpace(1);
+
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+ new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ // note - as the tieBreaker set in the MockComparator uses the name value of the executor to do the
+ // final diff therefore we need to set the name differently to make a meaningful test, in real
+ // scenario we may want to use something else (say hash code) to be the bottom line for the tieBreaker
+ // to make a final decision, the purpose of the test here is to prove that for two candidates with
+ // exact value (in the case of test, all values except for the name) the decision result is stable.
+ ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+ MockExecutorObject executor1 = new MockExecutorObject("ExecutorX", 8080,50.0,2048,3,new Date(), 20, 6400);
+ MockExecutorObject executor2 = new MockExecutorObject("ExecutorX2",8080,50.0,2048,3,new Date(), 20, 6400);
+ list.add(executor1);
+ list.add(executor2);
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+ MockExecutorObject executor = morkSelector.getBest(list, dispatchingObj);
+ Assert.assertTrue(executor2 == executor);
+
+ // shuffle and test again.
+ list.remove(0);
+ list.add(executor1);
+ executor = morkSelector.getBest(list, dispatchingObj);
+ Assert.assertTrue(executor2 == executor);
+ }
+
+ @Test
+ public void testSelectorEmptyList() throws Exception {
+ MockFilter filter = new MockFilter();
+ MockComparator comparator = new MockComparator();
+
+ filter.registerFilterforPriority();
+ filter.registerFilterforRemainingMemory();
+ filter.registerFilterforRemainingTmpSpace();
+ filter.registerFilterforTotalMemory();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(4);
+ comparator.registerComparerForRemainingSpace(1);
+
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+ new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+ ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,5);
+
+ MockExecutorObject executor = null;
+
+ try {
+ executor = morkSelector.getBest(list, dispatchingObj);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when an empty list is passed to the Selector.");
+ }
+
+ // expected to see null result.
+ Assert.assertTrue(null == executor);
+
+ try {
+ executor = morkSelector.getBest(list, dispatchingObj);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when null is passed to the Selector as the candidate list.");
+ }
+
+ // expected to see null result, as the only executor is filtered out .
+ Assert.assertTrue(null == executor);
+
+ }
+
+ @Test
+ public void testSelectorListWithNullValue() throws Exception {
+ MockComparator comparator = new MockComparator();
+
+ comparator.registerComparerForMemory(3);
+ comparator.registerComparerForPriority(4);
+ comparator.registerComparerForRemainingSpace(1);
+
+ CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+ new CandidateSelector<MockExecutorObject,MockFlowObject>(null,comparator);
+
+ ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+ MockExecutorObject executor1 = new MockExecutorObject("ExecutorX", 8080,50.0,2048,3,new Date(), 20, 6400);
+ MockExecutorObject executor2 = new MockExecutorObject("ExecutorX2",8080,50.0,2048,3,new Date(), 20, 6400);
+ list.add(executor1);
+ list.add(executor2);
+ list.add(null);
+
+ MockFlowObject dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+ MockExecutorObject executor = null;
+ try {
+ executor = morkSelector.getBest(list, dispatchingObj);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when an List contains null value.");
+ }
+ Assert.assertTrue(executor2 == executor);
+
+ // try to compare null vs null, no exception is expected.
+ list.clear();
+ list.add(null);
+ list.add(null);
+ try {
+ executor = morkSelector.getBest(list, dispatchingObj);
+ } catch (Exception ex){
+ Assert.fail("no exception should be thrown when an List contains multiple null values.");
+ }
+ Assert.assertTrue(null == executor);
+
+ }
+
+ @Test
+ public void testCreatingExectorfilterObject() throws Exception{
+ List<String> validList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
+ try {
+ new ExecutorFilter(validList);
+ }catch (Exception ex){
+ Assert.fail("creating ExecutorFilter with valid list throws exception . ex -" + ex.getMessage());
+ }
+ }
+
+ @Test
+ public void testCreatingExectorfilterObjectWInvalidList() throws Exception{
+ List<String> invalidList = new ArrayList<String>();
+ invalidList.add("notExistingFilter");
+ Exception result = null;
+ try {
+ new ExecutorFilter(invalidList);
+ }catch (Exception ex){
+ if (ex instanceof IllegalArgumentException)
+ result = ex;
+ }
+ Assert.assertNotNull(result);
+ }
+
+ @Test
+ public void testCreatingExectorComparatorObject() throws Exception{
+ Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+ for (String name : ExecutorComparator.getAvailableComparatorNames()){
+ comparatorMap.put(name, 1);
+ }
+ try {
+ new ExecutorComparator(comparatorMap);
+ }catch (Exception ex){
+ Assert.fail("creating ExecutorComparator with valid list throws exception . ex -" + ex.getMessage());
+ }
+ }
+
+ @Test
+ public void testCreatingExectorComparatorObjectWInvalidName() throws Exception{
+ Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+ comparatorMap.put("invalidName", 0);
+ Exception result = null;
+ try {
+ new ExecutorComparator(comparatorMap);
+ }catch (Exception ex){
+ if (ex instanceof IllegalArgumentException)
+ result = ex;
+ }
+ Assert.assertNotNull(result);
+ }
+
+ @Test
+ public void testCreatingExectorComparatorObjectWInvalidWeight() throws Exception{
+ Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+ for (String name : ExecutorComparator.getAvailableComparatorNames()){
+ comparatorMap.put(name, -1);
+ }
+ Exception result = null;
+ try {
+ new ExecutorComparator(comparatorMap);
+ }catch (Exception ex){
+ if (ex instanceof IllegalArgumentException)
+ result = ex;
+ }
+ Assert.assertNotNull(result);
+ }
+
+ @Test
+ public void testCreatingExecutorSelectorWithEmptyFilterComparatorList() throws Exception{
+ List<Executor> executorList = new ArrayList<Executor>();
+ executorList.add(new Executor(1, "host1", 80, true));
+ executorList.add(new Executor(2, "host2", 80, true));
+ executorList.add(new Executor(3, "host3", 80, true));
+
+ executorList.get(0).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
+ executorList.get(1).setExecutorInfo(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90, 0));
+ executorList.get(2).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90, 0));
+
+ ExecutableFlow flow = new ExecutableFlow();
+
+ ExecutorSelector selector = new ExecutorSelector(null , null);
+ Executor executor = selector.getBest(executorList, flow);
+ Assert.assertEquals(executorList.get(2), executor);
+ }
+
+
+ @Test
+ public void testExecutorSelectorE2E() throws Exception{
+ List<String> filterList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
+ Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+ List<Executor> executorList = new ArrayList<Executor>();
+ executorList.add(new Executor(1, "host1", 80, true));
+ executorList.add(new Executor(2, "host2", 80, true));
+ executorList.add(new Executor(3, "host3", 80, true));
+
+ executorList.get(0).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
+ executorList.get(1).setExecutorInfo(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90, 0));
+ executorList.get(2).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90, 0));
+
+ ExecutableFlow flow = new ExecutableFlow();
+
+ for (String name : ExecutorComparator.getAvailableComparatorNames()){
+ comparatorMap.put(name, 1);
+ }
+ ExecutorSelector selector = new ExecutorSelector(filterList,comparatorMap);
+ Executor executor = selector.getBest(executorList, flow);
+ Assert.assertEquals(executorList.get(0), executor);
+
+ // simulate that once the flow is assigned, executor1's remaining TMP storage dropped to 2048
+ // now we do the getBest again executor3 is expected to be selected as it has a earlier last dispatched time.
+ executorList.get(0).setExecutorInfo(new ExecutorInfo(99.9, 4095, 50, System.currentTimeMillis(), 90, 1));
+ executor = selector.getBest(executorList, flow);
+ Assert.assertEquals(executorList.get(2), executor);
+ }
+
+ @Test
+ public void testExecutorInfoJsonParser() throws Exception{
+ ExecutorInfo exeInfo = new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 10);
+ String json = JSONUtils.toJSON(exeInfo);
+ ExecutorInfo exeInfo2 = ExecutorInfo.fromJSONString(json);
+ Assert.assertTrue(exeInfo.equals(exeInfo2));
+ }
+
+}
diff --git a/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java b/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
new file mode 100644
index 0000000..24f6ef7
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.server;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.user.UserManagerException;
+import azkaban.utils.TestUtils;
+
+/**
+ * Test class for HttpRequestUtils
+ */
+public final class HttpRequestUtilsTest {
+ /* Helper method to get a test flow and add required properties */
+ public static ExecutableFlow createExecutableFlow() throws IOException {
+ ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ flow.getExecutionOptions().getFlowParameters()
+ .put(ExecutionOptions.FLOW_PRIORITY, "1");
+ flow.getExecutionOptions().getFlowParameters()
+ .put(ExecutionOptions.USE_EXECUTOR, "2");
+ return flow;
+ }
+
+ /* Test that flow properties are removed for non-admin user */
+ @Test
+ public void TestFilterNonAdminOnlyFlowParams() throws IOException,
+ ExecutorManagerException, UserManagerException {
+ ExecutableFlow flow = createExecutableFlow();
+ UserManager manager = TestUtils.createTestXmlUserManager();
+ User user = manager.getUser("testUser", "testUser");
+
+ HttpRequestUtils.filterAdminOnlyFlowParams(manager,
+ flow.getExecutionOptions(), user);
+
+ Assert.assertFalse(flow.getExecutionOptions().getFlowParameters()
+ .containsKey(ExecutionOptions.FLOW_PRIORITY));
+ Assert.assertFalse(flow.getExecutionOptions().getFlowParameters()
+ .containsKey(ExecutionOptions.USE_EXECUTOR));
+ }
+
+ /* Test that flow properties are retained for admin user */
+ @Test
+ public void TestFilterAdminOnlyFlowParams() throws IOException,
+ ExecutorManagerException, UserManagerException {
+ ExecutableFlow flow = createExecutableFlow();
+ UserManager manager = TestUtils.createTestXmlUserManager();
+ User user = manager.getUser("testAdmin", "testAdmin");
+
+ HttpRequestUtils.filterAdminOnlyFlowParams(manager,
+ flow.getExecutionOptions(), user);
+
+ Assert.assertTrue(flow.getExecutionOptions().getFlowParameters()
+ .containsKey(ExecutionOptions.FLOW_PRIORITY));
+ Assert.assertTrue(flow.getExecutionOptions().getFlowParameters()
+ .containsKey(ExecutionOptions.USE_EXECUTOR));
+ }
+
+ /* Test exception, if param is a valid integer */
+ @Test
+ public void testvalidIntegerParam() throws ExecutorManagerException {
+ Map<String, String> params = new HashMap<String, String>();
+ params.put("param1", "123");
+ HttpRequestUtils.validateIntegerParam(params, "param1");
+ }
+
+ /* Test exception, if param is not a valid integer */
+ @Test(expected = ExecutorManagerException.class)
+ public void testInvalidIntegerParam() throws ExecutorManagerException {
+ Map<String, String> params = new HashMap<String, String>();
+ params.put("param1", "1dff2");
+ HttpRequestUtils.validateIntegerParam(params, "param1");
+ }
+
+ /* Verify permission for admin user */
+ @Test
+ public void testHasAdminPermission() throws UserManagerException {
+ UserManager manager = TestUtils.createTestXmlUserManager();
+ User adminUser = manager.getUser("testAdmin", "testAdmin");
+ Assert.assertTrue(HttpRequestUtils.hasPermission(manager, adminUser,
+ Type.ADMIN));
+ }
+
+ /* verify permission for non-admin user */
+ @Test
+ public void testHasOrdinaryPermission() throws UserManagerException {
+ UserManager manager = TestUtils.createTestXmlUserManager();
+ User testUser = manager.getUser("testUser", "testUser");
+ Assert.assertFalse(HttpRequestUtils.hasPermission(manager, testUser,
+ Type.ADMIN));
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java b/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
index c01d3f8..55dd5d9 100644
--- a/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
@@ -84,9 +84,9 @@ public class CacheTest {
@Test
public void testTimeToLiveExpiry() {
- CacheManager.setUpdateFrequency(200);
CacheManager manager = CacheManager.getInstance();
Cache cache = manager.createCache();
+ CacheManager.setUpdateFrequency(200);
cache.setUpdateFrequencyMs(200);
cache.setEjectionPolicy(EjectionPolicy.FIFO);
@@ -122,9 +122,9 @@ public class CacheTest {
@Test
public void testIdleExpireExpiry() {
- CacheManager.setUpdateFrequency(250);
CacheManager manager = CacheManager.getInstance();
Cache cache = manager.createCache();
+ CacheManager.setUpdateFrequency(250);
cache.setUpdateFrequencyMs(250);
cache.setEjectionPolicy(EjectionPolicy.FIFO);
diff --git a/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java b/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java
new file mode 100644
index 0000000..85b2666
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/RestfulApiClientTest.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpResponseFactory;
+import org.apache.http.HttpStatus;
+import org.apache.http.HttpVersion;
+import org.apache.http.NameValuePair;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.DefaultHttpResponseFactory;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.message.BasicStatusLine;
+import org.apache.http.util.EntityUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class RestfulApiClientTest {
+
+ protected class MockRestfulApiClient extends RestfulApiClient<String> {
+ private int status = HttpStatus.SC_OK;
+
+ @Override
+ protected String parseResponse(HttpResponse response) throws IOException {
+ final StatusLine statusLine = response.getStatusLine();
+ if (statusLine.getStatusCode() >= 300) {
+ throw new HttpResponseException(statusLine.getStatusCode(),
+ statusLine.getReasonPhrase());
+ }
+ final HttpEntity entity = response.getEntity();
+ return entity == null ? null : EntityUtils.toString(entity);
+ }
+
+ public void setReturnStatus(int newStatus){
+ this.status = newStatus;
+ }
+
+ public void resetReturnStatus(){
+ this.status = HttpStatus.SC_OK;
+ }
+
+ @Override
+ protected String sendAndReturn(HttpUriRequest request) throws IOException{
+ HttpResponseFactory factory = new DefaultHttpResponseFactory();
+
+ HttpResponse response = factory.newHttpResponse(
+ new BasicStatusLine(HttpVersion.HTTP_1_1, this.status, null),null);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(String.format("%s = %s;", "METHOD", request.getMethod()));
+ sb.append(String.format("%s = %s;", "URI", request.getURI()));
+
+ if (request.getAllHeaders().length > 0){
+ sb.append("HEADER_EXISTS");
+ }
+
+ for (Header h : request.getAllHeaders()){
+ sb.append(String.format("%s = %s;", h.getName(), h.getValue()));
+ }
+
+ if (request instanceof HttpEntityEnclosingRequestBase){
+ HttpEntity entity = ((HttpEntityEnclosingRequestBase)request).getEntity();
+ if (entity != null){
+ sb.append("BODY_EXISTS");
+ sb.append(String.format("%s = %s;", "BODY", EntityUtils.toString(entity)));
+ }
+ }
+
+ response.setEntity(new StringEntity(sb.toString()));
+ return parseResponse(response);
+ }
+
+ }
+
+ @Test
+ public void testHttpGet() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ String result = mockClient.httpGet(uri, null);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = GET"));
+ }
+
+ @Test
+ public void testHttpGetWithHeaderItems() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+ headerItems.add(new BasicNameValuePair("h1","v1"));
+ headerItems.add(new BasicNameValuePair("h2","v2"));
+
+ String result = mockClient.httpGet(uri, headerItems);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = GET"));
+ Assert.assertTrue(result.contains("h1 = v1"));
+ Assert.assertTrue(result.contains("h2 = v2"));
+ }
+
+ @Test
+ public void testHttpPost() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+ headerItems.add(new BasicNameValuePair("h1","v1"));
+ headerItems.add(new BasicNameValuePair("h2","v2"));
+
+ String content = "123456789";
+
+ String result = mockClient.httpPost(uri, headerItems,content);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = POST"));
+ Assert.assertTrue(result.contains("h1 = v1"));
+ Assert.assertTrue(result.contains("h2 = v2"));
+ Assert.assertTrue(result.contains(String.format("%s = %s;", "BODY", content)));
+ }
+
+ @Test
+ public void testHttpPostWOBody() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ String result = mockClient.httpPost(uri, null,null);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = POST"));
+ Assert.assertFalse(result.contains("BODY_EXISTS"));
+ Assert.assertFalse(result.contains("HEADER_EXISTS"));
+ }
+
+ @Test
+ public void testHttpPut() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+ headerItems.add(new BasicNameValuePair("h1","v1"));
+ headerItems.add(new BasicNameValuePair("h2","v2"));
+
+ String content = "123456789";
+
+ String result = mockClient.httpPut(uri, headerItems,content);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = PUT"));
+ Assert.assertTrue(result.contains("h1 = v1"));
+ Assert.assertTrue(result.contains("h2 = v2"));
+ Assert.assertTrue(result.contains(String.format("%s = %s;", "BODY", content)));
+ }
+
+ @Test
+ public void testContentLength() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ String content = "123456789";
+
+ String result = mockClient.httpPut(uri, null,content);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("Content-Length = " + Integer.toString(content.length())));
+ }
+
+ @Test
+ public void testContentLengthOverride() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+ headerItems.add(new BasicNameValuePair("Content-Length","0"));
+
+ String content = "123456789";
+
+ String result = mockClient.httpPut(uri, headerItems,content);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertEquals(result.lastIndexOf("Content-Length"),result.indexOf("Content-Length"));
+ Assert.assertTrue(result.contains("Content-Length = " + Integer.toString(content.length())));
+ }
+
+ @Test
+ public void testHttpDelete() throws Exception {
+ MockRestfulApiClient mockClient = new MockRestfulApiClient();
+ @SuppressWarnings("unchecked")
+ URI uri = MockRestfulApiClient.buildUri("test.com", 80, "test", true,
+ new Pair <String,String>("Entry1","Value1"));
+
+ ArrayList<NameValuePair> headerItems = new ArrayList<NameValuePair>();
+ headerItems.add(new BasicNameValuePair("h1","v1"));
+ headerItems.add(new BasicNameValuePair("h2","v2"));
+
+ String result = mockClient.httpDelete(uri, headerItems);
+ Assert.assertTrue(result!= null && result.contains(uri.toString()));
+ Assert.assertTrue(result.contains("METHOD = DELETE"));
+ Assert.assertTrue(result.contains("h1 = v1"));
+ Assert.assertTrue(result.contains("h2 = v2"));
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
new file mode 100644
index 0000000..68b10ee
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.user.XmlUserManager;
+
+/**
+ * Commonly used utils method for unit/integration tests
+ */
+public class TestUtils {
+ /* Base resource direcotyr for unit tests */
+ private static final String UNIT_RESOURCE_DIR =
+ "../azkaban-test/src/test/resources";
+ /* Directory with serialized description of test flows */
+ private static final String UNIT_EXECUTION_DIR =
+ UNIT_RESOURCE_DIR + "/executions";
+
+ public static File getFlowDir(String projectName, String flow) {
+ return new File(String.format("%s/%s/%s.flow", UNIT_EXECUTION_DIR, projectName,
+ flow));
+ }
+
+ public static User getTestUser() {
+ return new User("testUser");
+ }
+
+ /* Helper method to create an ExecutableFlow from serialized description */
+ public static ExecutableFlow createExecutableFlow(String projectName,
+ String flowName) throws IOException {
+ File jsonFlowFile = getFlowDir(projectName, flowName);
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> flowObj =
+ (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+ Flow flow = Flow.flowFromObject(flowObj);
+ Project project = new Project(1, "flow");
+ HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+ flowMap.put(flow.getId(), flow);
+ project.setFlows(flowMap);
+ ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+
+ return execFlow;
+ }
+
+ /* Helper method to create an XmlUserManager from XML_FILE_PARAM file */
+ public static UserManager createTestXmlUserManager() {
+ Props props = new Props();
+ props.put(XmlUserManager.XML_FILE_PARAM, UNIT_RESOURCE_DIR
+ + "/azkaban-users.xml");
+ UserManager manager = new XmlUserManager(props);
+ return manager;
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/UtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/UtilsTest.java
new file mode 100644
index 0000000..4ea0930
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/UtilsTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test class for azkaban.utils.Utils
+ */
+public class UtilsTest {
+
+ /* Test negative port case */
+ @Test
+ public void testNegativePort() {
+ Assert.assertFalse(Utils.isValidPort(-1));
+ Assert.assertFalse(Utils.isValidPort(-10));
+ }
+
+ /* Test zero port case */
+ @Test
+ public void testZeroPort() {
+ Assert.assertFalse(Utils.isValidPort(0));
+ }
+
+ /* Test port beyond limit */
+ @Test
+ public void testOverflowPort() {
+ Assert.assertFalse(Utils.isValidPort(70000));
+ Assert.assertFalse(Utils.isValidPort(65536));
+ }
+
+ /* Test happy isValidPort case*/
+ @Test
+ public void testValidPort() {
+ Assert.assertTrue(Utils.isValidPort(1023));
+ Assert.assertTrue(Utils.isValidPort(10000));
+ Assert.assertTrue(Utils.isValidPort(3030));
+ Assert.assertTrue(Utils.isValidPort(1045));
+ }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 6e012ab..47fbfad 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;
@@ -131,6 +132,7 @@ public class AzkabanExecutorServer {
root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
+ root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");
root.setAttribute(ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, this);
@@ -175,7 +177,7 @@ public class AzkabanExecutorServer {
/**
* Configure Metric Reporting as per azkaban.properties settings
- *
+ *
* @throws MetricException
*/
private void configureMetricReports() throws MetricException {
@@ -224,13 +226,13 @@ public class AzkabanExecutorServer {
/**
* Load a custom class, which is provided by a configuration
* CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY.
- *
+ *
* This method will try to instantiate an instance of this custom class and
* with given properties as the argument in the constructor.
- *
+ *
* Basically the custom class must have a constructor that takes an argument
* with type Properties.
- *
+ *
* @param props
*/
private void loadCustomJMXAttributeProcessor(Props props) {
@@ -500,4 +502,18 @@ public class AzkabanExecutorServer {
return null;
}
}
+
+ /**
+ * Returns host:port combination for currently running executor
+ * @return
+ */
+ public String getExecutorHostPort() {
+ String host = "unkownHost";
+ try {
+ host = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (Exception e) {
+ logger.error("Failed to fetch LocalHostName");
+ }
+ return host + ":" + props.getInt("executor.port", DEFAULT_PORT_NUMBER);
+ }
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index da98b99..7eedee4 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -271,6 +271,7 @@ public class FlowRunner extends EventHandler implements Runnable {
this.watcher.setLogger(logger);
}
+ logger.info("Assigned executor : " + AzkabanExecutorServer.getApp().getExecutorHostPort());
logger.info("Running execid:" + execId + " flow:" + flowId + " project:"
+ projectId + " version:" + version);
if (pipelineExecId != null) {
@@ -840,7 +841,7 @@ public class FlowRunner extends EventHandler implements Runnable {
/**
* Configure Azkaban metrics tracking for a new jobRunner instance
- *
+ *
* @param jobRunner
*/
private void configureJobLevelMetrics(JobRunner jobRunner) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 78de829..01ab373 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -144,6 +145,9 @@ public class FlowRunnerManager implements EventListener,
private Object executionDirDeletionSync = new Object();
+ // date time of the the last flow submitted.
+ private long lastFlowSubmittedDate = 0;
+
public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
ProjectLoader projectLoader, ClassLoader parentClassLoader)
throws IOException {
@@ -256,6 +260,13 @@ public class FlowRunnerManager implements EventListener,
return allProjects;
}
+ public long getLastFlowSubmittedTime(){
+ // Note: this is not thread safe and may result in providing dirty data.
+ // we will provide this data as is for now and will revisit if there
+ // is a string justification for change.
+ return lastFlowSubmittedDate;
+ }
+
public Props getGlobalProps() {
return globalProps;
}
@@ -511,6 +522,8 @@ public class FlowRunnerManager implements EventListener,
Future<?> future = executorService.submit(runner);
// keep track of this future
submittedFlows.put(future, runner.getExecutionId());
+ // update the last submitted time.
+ this.lastFlowSubmittedDate = System.currentTimeMillis();
} catch (RejectedExecutionException re) {
throw new ExecutorManagerException(
"Azkaban server can't execute any more flows. "
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
new file mode 100644
index 0000000..e07eac5
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutorInfo;
+import azkaban.utils.JSONUtils;
+
+
+public class ServerStatisticsServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+ private static final int cacheTimeInMilliseconds = 1000;
+ private static final Logger logger = Logger.getLogger(ServerStatisticsServlet.class);
+ private static final String noCacheParamName = "nocache";
+ private static final boolean exists_Bash = new File("/bin/bash").exists();
+ private static final boolean exists_Cat = new File("/bin/cat").exists();
+ private static final boolean exists_Grep = new File("/bin/grep").exists();
+ private static final boolean exists_Meminfo = new File("/proc/meminfo").exists();
+ private static final boolean exists_LoadAvg = new File("/proc/loadavg").exists();
+
+ protected static long lastRefreshedTime = 0;
+ protected static ExecutorInfo cachedstats = null;
+
+ /**
+ * Handle all get request to Statistics Servlet {@inheritDoc}
+ *
+ * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
+ * javax.servlet.http.HttpServletResponse)
+ */
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+
+ boolean noCache = null != req && Boolean.valueOf(req.getParameter(noCacheParamName));
+
+ if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds) {
+ this.populateStatistics(noCache);
+ }
+
+ JSONUtils.toJSON(cachedstats, resp.getOutputStream(), true);
+ }
+
+ /**
+ * fill the result set with the percent of the remaining system memory on the server.
+ * @param stats reference to the result container which contains all the results, this specific method
+ * will only work work on the property "remainingMemory" and "remainingMemoryPercent".
+ *
+ * NOTE:
+ * a double value will be used to present the remaining memory,
+ * a returning value of '55.6' means 55.6%
+ */
+ protected void fillRemainingMemoryPercent(ExecutorInfo stats) {
+ if (exists_Bash && exists_Cat && exists_Grep && exists_Meminfo) {
+ java.lang.ProcessBuilder processBuilder =
+ new java.lang.ProcessBuilder("/bin/bash", "-c",
+ "/bin/cat /proc/meminfo | grep -E \"^MemTotal:|^MemFree:|^Buffers:|^Cached:|^SwapCached:\"");
+ try {
+ ArrayList<String> output = new ArrayList<String>();
+ Process process = processBuilder.start();
+ process.waitFor();
+ InputStream inputStream = process.getInputStream();
+ try {
+ java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(inputStream));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ output.add(line);
+ }
+ } finally {
+ inputStream.close();
+ }
+
+ long totalMemory = 0;
+ long totalFreeMemory = 0;
+ Long parsedResult = (long) 0;
+
+ // process the output from bash call.
+ // we expect the result from the bash call to be something like following -
+ // MemTotal: 65894264 kB
+ // MemFree: 57753844 kB
+ // Buffers: 305552 kB
+ // Cached: 3802432 kB
+ // SwapCached: 0 kB
+ // Note : total free memory = freeMemory + cached + buffers + swapCached
+ // TODO : think about merging the logic in systemMemoryInfo as the logic is similar
+ if (output.size() == 5) {
+ for (String result : output) {
+ // find the total memory and value the variable.
+ parsedResult = extractMemoryInfo("MemTotal", result);
+ if (null != parsedResult) {
+ totalMemory = parsedResult;
+ continue;
+ }
+
+ // find the free memory.
+ parsedResult = extractMemoryInfo("MemFree", result);
+ if (null != parsedResult) {
+ totalFreeMemory += parsedResult;
+ continue;
+ }
+
+ // find the Buffers.
+ parsedResult = extractMemoryInfo("Buffers", result);
+ if (null != parsedResult) {
+ totalFreeMemory += parsedResult;
+ continue;
+ }
+
+ // find the Cached.
+ parsedResult = extractMemoryInfo("SwapCached", result);
+ if (null != parsedResult) {
+ totalFreeMemory += parsedResult;
+ continue;
+ }
+
+ // find the Cached.
+ parsedResult = extractMemoryInfo("Cached", result);
+ if (null != parsedResult) {
+ totalFreeMemory += parsedResult;
+ continue;
+ }
+ }
+ } else {
+ logger.error("failed to get total/free memory info as the bash call returned invalid result."
+ + String.format(" Output from the bash call - %s ", output.toString()));
+ }
+
+ // the number got from the proc file is in KBs we want to see the number in MBs so we are dividing it by 1024.
+ stats.setRemainingMemoryInMB(totalFreeMemory / 1024);
+ stats.setRemainingMemoryPercent(totalMemory == 0 ? 0 : ((double) totalFreeMemory / (double) totalMemory) * 100);
+ } catch (Exception ex) {
+ logger.error("failed fetch system memory info "
+ + "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
+ }
+ } else {
+ logger.error("failed fetch system memory info, one or more files from the following list are missing - "
+ + "'/bin/bash'," + "'/bin/cat'," + "'/proc/loadavg'");
+ }
+ }
+
+ private Long extractMemoryInfo(String field, String result) {
+ Long returnResult = null;
+ if (null != result && null != field && result.matches(String.format("^%s:.*", field))
+ && result.split("\\s+").length > 2) {
+ try {
+ returnResult = Long.parseLong(result.split("\\s+")[1]);
+ logger.debug(field + ":" + returnResult);
+ } catch (NumberFormatException e) {
+ returnResult = 0l;
+ logger.error(String.format("yielding 0 for %s as output is invalid - %s", field, result));
+ }
+ }
+ return returnResult;
+ }
+
+ /**
+ * call the data providers to fill the returning data container for statistics data.
+ * This function refreshes the static cached copy of data in case if necessary.
+ * */
+ protected synchronized void populateStatistics(boolean noCache) {
+ //check again before starting the work.
+ if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds) {
+ final ExecutorInfo stats = new ExecutorInfo();
+
+ fillRemainingMemoryPercent(stats);
+ fillRemainingFlowCapacityAndLastDispatchedTime(stats);
+ fillCpuUsage(stats);
+
+ cachedstats = stats;
+ lastRefreshedTime = System.currentTimeMillis();
+ }
+ }
+
+ /**
+ * fill the result set with the remaining flow capacity .
+ * @param stats reference to the result container which contains all the results, this specific method
+ * will only work on the property "remainingFlowCapacity".
+ */
+ protected void fillRemainingFlowCapacityAndLastDispatchedTime(ExecutorInfo stats) {
+
+ AzkabanExecutorServer server = AzkabanExecutorServer.getApp();
+ if (server != null) {
+ FlowRunnerManager runnerMgr = AzkabanExecutorServer.getApp().getFlowRunnerManager();
+ int assignedFlows = runnerMgr.getNumRunningFlows() + runnerMgr.getNumQueuedFlows();
+ stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() - assignedFlows);
+ stats.setNumberOfAssignedFlows(assignedFlows);
+ stats.setLastDispatchedTime(runnerMgr.getLastFlowSubmittedTime());
+ } else {
+ logger.error("failed to get data for remaining flow capacity or LastDispatchedTime"
+ + " as the AzkabanExecutorServer has yet been initialized.");
+ }
+ }
+
+ /**<pre>
+ * fill the result set with the CPU usage .
+ * Note : As the 'Top' bash call doesn't yield accurate result for the system load,
+ * the implementation has been changed to load from the "proc/loadavg" which keeps
+ * the moving average of the system load, we are pulling the average for the recent 1 min.
+ *</pre>
+ * @param stats reference to the result container which contains all the results, this specific method
+ * will only work on the property "cpuUsage".
+ */
+ protected void fillCpuUsage(ExecutorInfo stats) {
+ if (exists_Bash && exists_Cat && exists_LoadAvg) {
+ java.lang.ProcessBuilder processBuilder =
+ new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/cat /proc/loadavg");
+ try {
+ ArrayList<String> output = new ArrayList<String>();
+ Process process = processBuilder.start();
+ process.waitFor();
+ InputStream inputStream = process.getInputStream();
+ try {
+ java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(inputStream));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ output.add(line);
+ }
+ } finally {
+ inputStream.close();
+ }
+
+ // process the output from bash call.
+ if (output.size() > 0) {
+ String[] splitedresult = output.get(0).split("\\s+");
+ double cpuUsage = 0.0;
+
+ try {
+ cpuUsage = Double.parseDouble(splitedresult[0]);
+ } catch (NumberFormatException e) {
+ logger.error("yielding 0.0 for CPU usage as output is invalid -" + output.get(0));
+ }
+ logger.info("System load : " + cpuUsage);
+ stats.setCpuUpsage(cpuUsage);
+ }
+ } catch (Exception ex) {
+ logger.error("failed fetch system load info "
+ + "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
+ }
+ } else {
+ logger.error("failed fetch system load info, one or more files from the following list are missing - "
+ + "'/bin/bash'," + "'/bin/cat'," + "'/proc/loadavg'");
+ }
+ }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index e66a586..1d9af6d 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -72,7 +72,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
/**
* Handle all get request to Stats Servlet {@inheritDoc}
- *
+ *
* @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
* javax.servlet.http.HttpServletResponse)
*/
@@ -176,7 +176,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
/**
* Get metric snapshots for a metric and date specification
- *
+ *
* @throws ServletException
*/
private void handleGetMetricHistory(HttpServletRequest req,
@@ -252,7 +252,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
/**
* Update tracking interval for a given metrics
- *
+ *
* @throws ServletException
*/
private void handleChangeMetricInterval(HttpServletRequest req,
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
new file mode 100644
index 0000000..15078b4
--- /dev/null
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -0,0 +1,81 @@
+package azkaban.execapp;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import azkaban.executor.ExecutorInfo;
+
+@Ignore
+public class StatisticsServletTest {
+ private class MockStatisticsServlet extends ServerStatisticsServlet {
+ /** */
+ private static final long serialVersionUID = 1L;
+
+ public ExecutorInfo getStastics() {
+ return cachedstats;
+ }
+
+ public long getUpdatedTime() {
+ return lastRefreshedTime;
+ }
+
+ public void callPopulateStatistics() {
+ this.populateStatistics(false);
+ }
+
+ public void callFillCpuUsage(ExecutorInfo stats) {
+ this.fillCpuUsage(stats);
+ }
+
+ public void callFillRemainingMemoryPercent(ExecutorInfo stats) {
+ this.fillRemainingMemoryPercent(stats);
+ }
+ }
+ private MockStatisticsServlet statServlet = new MockStatisticsServlet();
+
+ @Test
+ public void testFillMemory() {
+ ExecutorInfo stats = new ExecutorInfo();
+ this.statServlet.callFillRemainingMemoryPercent(stats);
+ // assume any machine that runs this test should
+ // have bash and top available and at least got some remaining memory.
+ Assert.assertTrue(stats.getRemainingMemoryInMB() > 0);
+ Assert.assertTrue(stats.getRemainingMemoryPercent() > 0);
+ }
+
+ @Test
+ public void testFillCpu() {
+ ExecutorInfo stats = new ExecutorInfo();
+ this.statServlet.callFillCpuUsage(stats);
+ Assert.assertTrue(stats.getCpuUsage() > 0);
+ }
+
+ @Test
+ public void testPopulateStatistics() {
+ this.statServlet.callPopulateStatistics();
+ Assert.assertNotNull(this.statServlet.getStastics());
+ Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryInMB() > 0);
+ Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryPercent() > 0);
+ Assert.assertTrue(this.statServlet.getStastics().getCpuUsage() > 0);
+ }
+
+ @Test
+ public void testPopulateStatisticsCache() {
+ this.statServlet.callPopulateStatistics();
+ final long updatedTime = this.statServlet.getUpdatedTime();
+ while (System.currentTimeMillis() - updatedTime < 1000) {
+ this.statServlet.callPopulateStatistics();
+ Assert.assertEquals(updatedTime, this.statServlet.getUpdatedTime());
+ }
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+
+ // make sure cache expires after timeout.
+ this.statServlet.callPopulateStatistics();
+ Assert.assertNotEquals(updatedTime, this.statServlet.getUpdatedTime());
+ }
+}
diff --git a/azkaban-sql/src/sql/create.executor_events.sql b/azkaban-sql/src/sql/create.executor_events.sql
new file mode 100644
index 0000000..1ff4799
--- /dev/null
+++ b/azkaban-sql/src/sql/create.executor_events.sql
@@ -0,0 +1,9 @@
+CREATE TABLE executor_events (
+ executor_id INT NOT NULL,
+ event_type TINYINT NOT NULL,
+ event_time DATETIME NOT NULL,
+ username VARCHAR(64),
+ message VARCHAR(512)
+);
+
+CREATE INDEX executor_log ON executor_events(executor_id, event_time);
\ No newline at end of file
azkaban-sql/src/sql/create.executors.sql 10(+10 -0)
diff --git a/azkaban-sql/src/sql/create.executors.sql b/azkaban-sql/src/sql/create.executors.sql
new file mode 100644
index 0000000..8e59ce3
--- /dev/null
+++ b/azkaban-sql/src/sql/create.executors.sql
@@ -0,0 +1,10 @@
+CREATE TABLE executors (
+ id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
+ host VARCHAR(64) NOT NULL,
+ port INT NOT NULL,
+ active BOOLEAN DEFAULT true,
+ UNIQUE (host, port),
+ UNIQUE INDEX executor_id (id)
+);
+
+CREATE INDEX executor_connection ON executors(host, port);
diff --git a/azkaban-sql/src/sql/database.properties b/azkaban-sql/src/sql/database.properties
index b68be28..b6802bc 100644
--- a/azkaban-sql/src/sql/database.properties
+++ b/azkaban-sql/src/sql/database.properties
@@ -1 +1 @@
-version=
+version=3.0
diff --git a/azkaban-sql/src/sql/update.active_executing_flows.3.0.sql b/azkaban-sql/src/sql/update.active_executing_flows.3.0.sql
new file mode 100644
index 0000000..dcb4ec5
--- /dev/null
+++ b/azkaban-sql/src/sql/update.active_executing_flows.3.0.sql
@@ -0,0 +1,2 @@
+ALTER TABLE active_executing_flows DROP COLUMN host;
+ALTER TABLE active_executing_flows DROP COLUMN port;
\ No newline at end of file
diff --git a/azkaban-sql/src/sql/update.execution_flows.3.0.sql b/azkaban-sql/src/sql/update.execution_flows.3.0.sql
new file mode 100644
index 0000000..2935810
--- /dev/null
+++ b/azkaban-sql/src/sql/update.execution_flows.3.0.sql
@@ -0,0 +1,2 @@
+ALTER TABLE execution_flows ADD COLUMN executor_id INT DEFAULT NULL;
+CREATE INDEX executor_id ON execution_flows(executor_id);
\ No newline at end of file
diff --git a/azkaban-test/src/test/resources/azkaban-users.xml b/azkaban-test/src/test/resources/azkaban-users.xml
new file mode 100644
index 0000000..55941a7
--- /dev/null
+++ b/azkaban-test/src/test/resources/azkaban-users.xml
@@ -0,0 +1,5 @@
+<azkaban-users>
+ <user username="testAdmin" password="testAdmin" roles="admin" groups="azkaban" />
+ <user username="testUser" password="testUser" />
+ <role name="admin" permissions="ADMIN" />
+</azkaban-users>
azkaban-webserver/.gitignore 1(+1 -0)
diff --git a/azkaban-webserver/.gitignore b/azkaban-webserver/.gitignore
new file mode 100644
index 0000000..5e56e04
--- /dev/null
+++ b/azkaban-webserver/.gitignore
@@ -0,0 +1 @@
+/bin
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 132a9f1..ef588f7 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -29,11 +29,13 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang.StringEscapeUtils;
+import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.Executor;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
@@ -47,8 +49,11 @@ import azkaban.server.HttpRequestUtils;
import azkaban.server.session.Session;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
+import azkaban.user.Role;
import azkaban.user.User;
+import azkaban.user.UserManager;
import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.Pair;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.plugin.PluginRegistry;
import azkaban.webapp.plugin.ViewerPlugin;
@@ -59,11 +64,13 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private ExecutorManagerAdapter executorManager;
private ScheduleManager scheduleManager;
private ExecutorVelocityHelper velocityHelper;
+ private UserManager userManager;
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
AzkabanWebServer server = (AzkabanWebServer) getApplication();
+ userManager = server.getUserManager();
projectManager = server.getProjectManager();
executorManager = server.getExecutorManager();
scheduleManager = server.getScheduleManager();
@@ -129,6 +136,12 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ajaxFetchExecutableFlowInfo(req, resp, ret, session.getUser(), exFlow);
}
}
+ } else if (ajaxName.equals("reloadExecutors")) {
+ ajaxReloadExecutors(req, resp, ret, session.getUser());
+ } else if (ajaxName.equals("enableQueueProcessor")) {
+ ajaxUpdateQueueProcessor(req, resp, ret, session.getUser(), true);
+ } else if (ajaxName.equals("disableQueueProcessor")) {
+ ajaxUpdateQueueProcessor(req, resp, ret, session.getUser(), false);
} else if (ajaxName.equals("getRunning")) {
String projectName = getParam(req, "project");
String flowName = getParam(req, "flow");
@@ -152,6 +165,63 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
+ /**
+ * <pre>
+ * Enables queueProcessor if @param status is true
+ * disables queueProcessor if @param status is false.
+ * </pre>
+ */
+ private void ajaxUpdateQueueProcessor(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> returnMap, User user,
+ boolean enableQueue) {
+ boolean wasSuccess = false;
+ if (HttpRequestUtils.hasPermission(userManager, user, Type.ADMIN)) {
+ try {
+ if (enableQueue) {
+ executorManager.enableQueueProcessorThread();
+ } else {
+ executorManager.disableQueueProcessorThread();
+ }
+ returnMap.put(ConnectorParams.STATUS_PARAM,
+ ConnectorParams.RESPONSE_SUCCESS);
+ wasSuccess = true;
+ } catch (ExecutorManagerException e) {
+ returnMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
+ }
+ } else {
+ returnMap.put(ConnectorParams.RESPONSE_ERROR,
+ "Only Admins are allowed to update queue processor");
+ }
+ if (!wasSuccess) {
+ returnMap.put(ConnectorParams.STATUS_PARAM,
+ ConnectorParams.RESPONSE_ERROR);
+ }
+ }
+
+ /* Reloads executors from DB and azkaban.properties via executorManager */
+ private void ajaxReloadExecutors(HttpServletRequest req,
+ HttpServletResponse resp, HashMap<String, Object> returnMap, User user) {
+ boolean wasSuccess = false;
+ if (HttpRequestUtils.hasPermission(userManager, user, Type.ADMIN)) {
+ try {
+ executorManager.setupExecutors();
+ returnMap.put(ConnectorParams.STATUS_PARAM,
+ ConnectorParams.RESPONSE_SUCCESS);
+ wasSuccess = true;
+ } catch (ExecutorManagerException e) {
+ returnMap.put(ConnectorParams.RESPONSE_ERROR,
+ "Failed to refresh the executors " + e.getMessage());
+ }
+ } else {
+ returnMap.put(ConnectorParams.RESPONSE_ERROR,
+ "Only Admins are allowed to refresh the executors");
+ }
+ if (!wasSuccess) {
+ returnMap.put(ConnectorParams.STATUS_PARAM,
+ ConnectorParams.RESPONSE_ERROR);
+ }
+ }
+
@Override
protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
Session session) throws ServletException, IOException {
@@ -225,7 +295,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
newPage(req, resp, session,
"azkaban/webapp/servlet/velocity/executionspage.vm");
- List<ExecutableFlow> runningFlows = executorManager.getRunningFlows();
+ List<Pair<ExecutableFlow, Executor>> runningFlows =
+ executorManager.getActiveFlowsWithExecutor();
page.add("runningFlows", runningFlows.isEmpty() ? null : runningFlows);
List<ExecutableFlow> finishedFlows =
@@ -809,10 +880,11 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
options.setMailCreator(flow.getMailCreator());
try {
+ HttpRequestUtils.filterAdminOnlyFlowParams(userManager, options, user);
String message =
executorManager.submitExecutableFlow(exflow, user.getUserId());
ret.put("message", message);
- } catch (ExecutorManagerException e) {
+ } catch (Exception e) {
e.printStackTrace();
ret.put("error",
"Error submitting flow " + exflow.getFlowId() + ". " + e.getMessage());
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index f688820..c6445c0 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -62,6 +62,7 @@ import azkaban.sla.SlaOption;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.User;
+import azkaban.user.UserManager;
import azkaban.utils.JSONUtils;
import azkaban.utils.SplitterOutputStream;
import azkaban.utils.Utils;
@@ -73,11 +74,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
private static final Logger logger = Logger.getLogger(ScheduleServlet.class);
private ProjectManager projectManager;
private ScheduleManager scheduleManager;
+ private UserManager userManager;
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
AzkabanWebServer server = (AzkabanWebServer) getApplication();
+ userManager = server.getUserManager();
projectManager = server.getProjectManager();
scheduleManager = server.getScheduleManager();
}
@@ -656,6 +659,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
ExecutionOptions flowOptions = null;
try {
flowOptions = HttpRequestUtils.parseFlowOptions(req);
+ HttpRequestUtils.filterAdminOnlyFlowParams(userManager, flowOptions, user);
} catch (Exception e) {
ret.put("error", e.getMessage());
}
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
index 6d14839..b2b3487 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -17,6 +17,7 @@
package azkaban.webapp.servlet;
import java.io.IOException;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -29,7 +30,9 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import azkaban.executor.ConnectorParams;
+import azkaban.executor.Executor;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
import azkaban.server.session.Session;
import azkaban.user.Permission;
import azkaban.user.Role;
@@ -67,55 +70,105 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session)
throws ServletException, IOException {
HashMap<String, Object> ret = new HashMap<String, Object>();
+ int executorId = getIntParam(req, ConnectorParams.EXECUTOR_ID_PARAM);
String actionName = getParam(req, ConnectorParams.ACTION_PARAM);
if (actionName.equals(ConnectorParams.STATS_GET_METRICHISTORY)) {
- handleGetMetricHistory(req, ret, session.getUser());
+ handleGetMetricHistory(executorId, req, ret, session.getUser());
+ } else if (actionName.equals(ConnectorParams.STATS_GET_ALLMETRICSNAME)) {
+ handleGetAllMetricName(executorId, req, ret);
} else if (actionName.equals(ConnectorParams.STATS_SET_REPORTINGINTERVAL)) {
- handleChangeConfigurationRequest(ConnectorParams.STATS_SET_REPORTINGINTERVAL, req, ret);
+ handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_REPORTINGINTERVAL, req, ret);
} else if (actionName.equals(ConnectorParams.STATS_SET_CLEANINGINTERVAL)) {
- handleChangeConfigurationRequest(ConnectorParams.STATS_SET_CLEANINGINTERVAL, req, ret);
+ handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_CLEANINGINTERVAL, req, ret);
} else if (actionName.equals(ConnectorParams.STATS_SET_MAXREPORTERPOINTS)) {
- handleChangeConfigurationRequest(ConnectorParams.STATS_SET_MAXREPORTERPOINTS, req, ret);
+ handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_MAXREPORTERPOINTS, req, ret);
} else if (actionName.equals(ConnectorParams.STATS_SET_ENABLEMETRICS)) {
- handleChangeConfigurationRequest(ConnectorParams.STATS_SET_ENABLEMETRICS, req, ret);
+ handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_ENABLEMETRICS, req, ret);
} else if (actionName.equals(ConnectorParams.STATS_SET_DISABLEMETRICS)) {
- handleChangeConfigurationRequest(ConnectorParams.STATS_SET_DISABLEMETRICS, req, ret);
+ handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_DISABLEMETRICS, req, ret);
}
writeJSON(resp, ret);
}
/**
+ * Get all metrics tracked by the given executor
+ *
+ * @param executorId
+ * @param req
+ * @param ret
+ */
+ private void handleGetAllMetricName(int executorId, HttpServletRequest req,
+ HashMap<String, Object> ret) throws IOException {
+ Map<String, Object> result;
+ try {
+ result =
+ execManager.callExecutorStats(executorId,
+ ConnectorParams.STATS_GET_ALLMETRICSNAME,
+ (Pair<String, String>[]) null);
+
+ if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+ ret.put("error", result.get(ConnectorParams.RESPONSE_ERROR).toString());
+ } else {
+ ret.put("metricList", result.get("data"));
+ }
+ } catch (ExecutorManagerException e) {
+ ret.put("error", "Failed to fetch metric names for executor : "
+ + executorId);
+ }
+ }
+
+ /**
* Generic method to facilitate actionName action using Azkaban exec server
+ * @param executorId
* @param actionName Name of the action
+ * @throws ExecutorManagerException
*/
- private void handleChangeConfigurationRequest(String actionName, HttpServletRequest req, HashMap<String, Object> ret)
+ private void handleChangeConfigurationRequest(int executorId, String actionName, HttpServletRequest req, HashMap<String, Object> ret)
throws ServletException, IOException {
- Map<String, Object> result = execManager.callExecutorStats(actionName, getAllParams(req));
- if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
- ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
- } else {
- ret.put(ConnectorParams.STATUS_PARAM, result.get(ConnectorParams.STATUS_PARAM));
+ try {
+ Map<String, Object> result =
+ execManager
+ .callExecutorStats(executorId, actionName, getAllParams(req));
+ if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+ ret.put(ConnectorParams.RESPONSE_ERROR,
+ result.get(ConnectorParams.RESPONSE_ERROR).toString());
+ } else {
+ ret.put(ConnectorParams.STATUS_PARAM,
+ result.get(ConnectorParams.STATUS_PARAM));
+ }
+ } catch (ExecutorManagerException ex) {
+ ret.put("error", "Failed to change config change");
}
}
/**
* Get metric snapshots for a metric and date specification
+ * @param executorId
* @throws ServletException
+ * @throws ExecutorManagerException
*/
- private void handleGetMetricHistory(HttpServletRequest req, HashMap<String, Object> ret, User user)
- throws IOException, ServletException {
- Map<String, Object> result =
- execManager.callExecutorStats(ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
- if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
- ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
- } else {
- ret.put("data", result.get("data"));
+ private void handleGetMetricHistory(int executorId, HttpServletRequest req,
+ HashMap<String, Object> ret, User user) throws IOException,
+ ServletException {
+ try {
+ Map<String, Object> result =
+ execManager.callExecutorStats(executorId,
+ ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
+ if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+ ret.put(ConnectorParams.RESPONSE_ERROR,
+ result.get(ConnectorParams.RESPONSE_ERROR).toString());
+ } else {
+ ret.put("data", result.get("data"));
+ }
+ } catch (ExecutorManagerException ex) {
+ ret.put("error", "Failed to fetch metric history");
}
}
/**
+ * @throws ExecutorManagerException
*
*/
private void handleStatePageLoad(HttpServletRequest req, HttpServletResponse resp, Session session)
@@ -128,14 +181,20 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
}
try {
+ Collection<Executor> executors = execManager.getAllActiveExecutors();
+ page.add("executorList", executors);
+
Map<String, Object> result =
- execManager.callExecutorStats(ConnectorParams.STATS_GET_ALLMETRICSNAME, (Pair<String, String>[]) null);
+ execManager.callExecutorStats(executors.iterator().next().getId(),
+ ConnectorParams.STATS_GET_ALLMETRICSNAME,
+ (Pair<String, String>[]) null);
if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
- page.add("errorMsg", result.get(ConnectorParams.RESPONSE_ERROR).toString());
+ page.add("errorMsg", result.get(ConnectorParams.RESPONSE_ERROR)
+ .toString());
} else {
page.add("metricList", result.get("data"));
}
- } catch (IOException e) {
+ } catch (Exception e) {
page.add("errorMsg", "Failed to get a response from Azkaban exec server");
}
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
index 49d1085..9c2a6fb 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
@@ -68,6 +68,7 @@
<tr>
<th>#</th>
<th class="execid">Execution Id</th>
+ <th >Executor Id</th>
<th>Flow</th>
<th>Project</th>
<th class="user">User</th>
@@ -80,25 +81,33 @@
</tr>
</thead>
<tbody>
-#if ($runningFlows)
+
+#if ( !$null.isNull(${runningFlows}))
#foreach ($flow in $runningFlows)
<tr>
<td class="tb-name">
$velocityCount
</td>
<td class="tb-name">
- <a href="${context}/executor?execid=${flow.executionId}">${flow.executionId}</a>
+ <a href="${context}/executor?execid=${flow.getFirst().executionId}">${flow.getFirst().executionId}</a>
</td>
- <td><a href="${context}/manager?project=$vmutils.getProjectName(${flow.projectId})&flow=${flow.flowId}">${flow.flowId}</a></td>
<td>
- <a href="${context}/manager?project=$vmutils.getProjectName(${flow.projectId})">$vmutils.getProjectName(${flow.projectId})</a>
+ #if (${flow.getSecond()})
+ ${flow.getSecond().getId()}
+ #else
+ -
+ #end
</td>
- <td>${flow.submitUser}</td>
- <td>${flow.proxyUsers}</td>
- <td>$utils.formatDate(${flow.startTime})</td>
- <td>$utils.formatDate(${flow.endTime})</td>
- <td>$utils.formatDuration(${flow.startTime}, ${flow.endTime})</td>
- <td><div class="status ${flow.status}">$utils.formatStatus(${flow.status})</div></td>
+ <td><a href="${context}/manager?project=$vmutils.getProjectName(${flow.getFirst().projectId})&flow=${flow.getFirst().flowId}">${flow.getFirst().flowId}</a></td>
+ <td>
+ <a href="${context}/manager?project=$vmutils.getProjectName(${flow.getFirst().projectId})">$vmutils.getProjectName(${flow.getFirst().projectId})</a>
+ </td>
+ <td>${flow.getFirst().submitUser}</td>
+ <td>${flow.getFirst().proxyUsers}</td>
+ <td>$utils.formatDate(${flow.getFirst().startTime})</td>
+ <td>$utils.formatDate(${flow.getFirst().endTime})</td>
+ <td>$utils.formatDuration(${flow.getFirst().startTime}, ${flow.getFirst().endTime})</td>
+ <td><div class="status ${flow.getFirst().status}">$utils.formatStatus(${flow.getFirst().status})</div></td>
<td></td>
</tr>
#end
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
index 4596ac2..27be342 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -29,6 +29,25 @@
var currentTime = ${currentTime};
var timezone = "${timezone}";
+ function refreshMetricList() {
+ var requestURL = '/stats';
+ var requestData = {
+ 'action': 'getAllMetricNames',
+ 'executorId': $('#executorName').val()
+ };
+ var successHandler = function(responseData) {
+ if(responseData.error != null) {
+ $('#reportedMetric').html(responseData.error);
+ } else {
+ $('#metricName').empty();
+ for(var index = 0; index < responseData.metricList.length; index++) {
+ $('#metricName').append($('<option value="' + responseData.metricList[index] +'">' + responseData.metricList[index] + '</option>'));
+ }
+ }
+ };
+ $.get(requestURL, requestData, successHandler, 'json');
+ }
+
function refreshMetricChart() {
var requestURL = '/stats';
var requestData = {
@@ -36,7 +55,8 @@
'from': new Date($('#datetimebegin').val()).toUTCString(),
'to' : new Date($('#datetimeend').val()).toUTCString(),
'metricName': $('#metricName').val(),
- 'useStats': $("#useStats").is(':checked')
+ 'useStats': $("#useStats").is(':checked'),
+ 'executorId': $('#executorName').val()
};
var successHandler = function(responseData) {
if(responseData.error != null) {
@@ -67,6 +87,7 @@
$('#datetimebegin').data('DateTimePicker').setEndDate(e.date);
});
$('#retrieve').click(refreshMetricChart);
+ $('#executorName').click(refreshMetricList);
});
</script>
@@ -84,8 +105,16 @@
<div class="header-title" style="width: 17%;">
<h1><a href="${context}/stats">Statistics</a></h1>
</div>
- <div class="header-control" style="width: 900px; padding-top: 5px;">
+ <div class="header-control" style="width: 1300px; padding-top: 5px;">
<form id="metric-form" method="get">
+ <label for="executorLabel" >Executor</label>
+ #if (!$executorList.isEmpty())
+ <select id="executorName" name="executorName" style="width:200px">
+ #foreach ($executor in $executorList)
+ <option value="${executor.getId()}" style="width:200px">${executor.getHost()}:${executor.getPort()}</option>
+ #end
+ </select>
+ #end
<label for="metricLabel" >Metric</label>
#if (!$metricList.isEmpty())
<select id="metricName" name="metricName" style="width:200px">
@@ -119,4 +148,4 @@
<!-- /container-full -->
#end
</body>
- <html>
\ No newline at end of file
+ <html>
diff --git a/azkaban-webserver/src/web/js/azkaban/view/exflow.js b/azkaban-webserver/src/web/js/azkaban/view/exflow.js
index dbb8ae4..026a946 100644
--- a/azkaban-webserver/src/web/js/azkaban/view/exflow.js
+++ b/azkaban-webserver/src/web/js/azkaban/view/exflow.js
@@ -178,8 +178,11 @@ azkaban.FlowTabView = Backbone.View.extend({
$("#retrybtn").hide();
if (data.status == "SUCCEEDED") {
- $("#executebtn").show();
+ $("#executebtn").show();
}
+ else if (data.status == "PREPARING") {
+ $("#cancelbtn").show();
+ }
else if (data.status == "FAILED") {
$("#executebtn").show();
}