azkaban-aplcache
Changes
azkaban-sql/src/sql/create.executors.sql 10(+10 -0)
Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
new file mode 100644
index 0000000..31070bf
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.utils.Utils;
+
+/**
+ * Class to represent an AzkabanExecutorServer details for ExecutorManager
+ *
+ * @author gaggarwa
+ */
+public class Executor {
+ private final int id;
+ private final String host;
+ private final int port;
+ private boolean isActive;
+
+ // TODO: ExecutorStats to be added
+
+ /**
+ * <pre>
+ * Construct an Executor Object
+ * Note: port should be a within unsigned 2 byte
+ * integer range
+ * </pre>
+ *
+ * @param executor_id
+ * @param executor_host
+ * @param executor_port
+ */
+ public Executor(int id, String host, int port, boolean isActive) {
+ if (!Utils.isValidPort(port)) {
+ throw new IllegalArgumentException(String.format(
+ "Invalid port number %d for host %s, executor_id %d", port, host, id));
+ }
+
+ this.id = id;
+ this.host = host;
+ this.port = port;
+ this.isActive = isActive;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (isActive ? 1231 : 1237);
+ result = prime * result + ((host == null) ? 0 : host.hashCode());
+ result = prime * result + id;
+ result = prime * result + port;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (!(obj instanceof Executor))
+ return false;
+ Executor other = (Executor) obj;
+ if (isActive != other.isActive)
+ return false;
+ if (host == null) {
+ if (other.host != null)
+ return false;
+ } else if (!host.equals(other.host))
+ return false;
+ if (id != other.id)
+ return false;
+ if (port != other.port)
+ return false;
+ return true;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public boolean isActive() {
+ return isActive;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public void setActive(boolean isActive) {
+ this.isActive = isActive;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 6dc0e11..1bfa910 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -20,6 +20,7 @@ import java.io.File;
import java.util.List;
import java.util.Map;
+import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
@@ -47,6 +48,123 @@ public interface ExecutorLoader {
String flowContains, String userNameContains, int status, long startData,
long endData, int skip, int num) throws ExecutorManagerException;
+ /**
+ * <pre>
+ * Fetch all executors from executors table
+ * Note:-
+ * 1 throws an Exception in case of a SQL issue
+ * 2 returns an empty list in case of no executor
+ * </pre>
+ *
+ * @return List<Executor>
+ * @throws ExecutorManagerException
+ */
+ public List<Executor> fetchAllExecutors() throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Fetch all executors from executors table with active = true
+ * Note:-
+ * 1 throws an Exception in case of a SQL issue
+ * 2 returns an empty list in case of no active executor
+ * </pre>
+ *
+ * @return List<Executor>
+ * @throws ExecutorManagerException
+ */
+ public List<Executor> fetchActiveExecutors() throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Fetch executor from executors with a given (host, port)
+ * Note:
+ * 1. throws an Exception in case of a SQL issue
+ * 2. return null when no executor is found
+ * with the given (host,port)
+ * </pre>
+ *
+ * @return Executor
+ * @throws ExecutorManagerException
+ */
+ public Executor fetchExecutor(String host, int port)
+ throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Fetch executor from executors with a given executorId
+ * Note:
+ * 1. throws an Exception in case of a SQL issue
+ * 2. return null when no executor is found with the given executorId
+ * </pre>
+ *
+ * @return Executor
+ * @throws ExecutorManagerException
+ */
+ public Executor fetchExecutor(int executorId) throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * create an executor and insert in executors table.
+ * Note:-
+ * 1. throws an Exception in case of a SQL issue
+ * 2. throws an Exception if a executor with (host, port) already exist
+ * 3. return null when no executor is found with the given executorId
+ * </pre>
+ *
+ * @return Executor
+ * @throws ExecutorManagerException
+ */
+ public Executor addExecutor(String host, int port)
+ throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * create an executor and insert in executors table.
+ * Note:-
+ * 1. throws an Exception in case of a SQL issue
+ * 2. throws an Exception if there is no executor with the given id
+ * 3. return null when no executor is found with the given executorId
+ * </pre>
+ *
+ * @param executorId
+ * @throws ExecutorManagerException
+ */
+ public void updateExecutor(Executor executor) throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * Log an event in executor_event audit table Note:- throws an Exception in
+ * case of a SQL issue
+ * Note: throws an Exception in case of a SQL issue
+ * </pre>
+ *
+ * @param executor
+ * @param type
+ * @param user
+ * @param message
+ * @return isSuccess
+ */
+ public void postExecutorEvent(Executor executor, EventType type, String user,
+ String message) throws ExecutorManagerException;
+
+ /**
+ * <pre>
+ * This method is to fetch events recorded in executor audit table, inserted
+ * by postExecutorEvents with a given executor, starting from skip
+ * Note:-
+ * 1. throws an Exception in case of a SQL issue
+ * 2. Returns an empty list in case of no events
+ * </pre>
+ *
+ * @param executor
+ * @param num
+ * @param skip
+ * @return List<ExecutorLogEvent>
+ * @throws ExecutorManagerException
+ */
+ List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
+ int offset) throws ExecutorManagerException;
+
public void addActiveExecutableReference(ExecutionReference ref)
throws ExecutorManagerException;
@@ -105,5 +223,4 @@ public interface ExecutorLoader {
public int removeExecutionLogsByTime(long millis)
throws ExecutorManagerException;
-
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
new file mode 100644
index 0000000..5598a0d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.Date;
+
+/**
+ * Class to represent events on Azkaban executors
+ *
+ * @author gaggarwa
+ */
+public class ExecutorLogEvent {
+ /**
+ * Log event type messages. Do not change the numeric representation of each
+ * enum. Only represent from 0 to 255 different codes.
+ */
+ public static enum EventType {
+ ERROR(128), HOST_UPDATE(1), PORT_UPDATE(2), ACTIVATION(3), INACTIVATION(4),
+ CREATED(5);
+
+ private int numVal;
+
+ EventType(int numVal) {
+ this.numVal = numVal;
+ }
+
+ public int getNumVal() {
+ return numVal;
+ }
+
+ public static EventType fromInteger(int x)
+ throws IllegalArgumentException {
+ switch (x) {
+ case 1:
+ return HOST_UPDATE;
+ case 2:
+ return PORT_UPDATE;
+ case 3:
+ return ACTIVATION;
+ case 4:
+ return INACTIVATION;
+ case 5:
+ return CREATED;
+ case 128:
+ return ERROR;
+ default:
+ throw new IllegalArgumentException(String.format(
+ "inalid status code %d", x));
+ }
+ }
+ }
+
+ private final int executorId;
+ private final String user;
+ private final Date time;
+ private final EventType type;
+ private final String message;
+
+ public ExecutorLogEvent(int executorId, String user, Date time,
+ EventType type, String message) {
+ this.executorId = executorId;
+ this.user = user;
+ this.time = time;
+ this.type = type;
+ this.message = message;
+ }
+
+ public int getExecutorId() {
+ return executorId;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public Date getTime() {
+ return time;
+ }
+
+ public EventType getType() {
+ return type;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 8d230ba..2a1cf26 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -21,12 +21,14 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.lang.annotation.Inherited;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -40,6 +42,7 @@ import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import azkaban.database.AbstractJdbcLoader;
+import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.GZIPUtils;
@@ -773,6 +776,206 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return connection;
}
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#fetchActiveExecutors()
+ */
+ @Override
+ public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+ try {
+ List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_ALL_EXECUTORS, executorHandler);
+ return executors;
+ } catch (Exception e) {
+ throw new ExecutorManagerException("Error fetching executors", e);
+ }
+ }
+
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#fetchActiveExecutors()
+ */
+ @Override
+ public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+ try {
+ List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_ACTIVE_EXECUTORS,
+ executorHandler);
+ return executors;
+ } catch (Exception e) {
+ throw new ExecutorManagerException("Error fetching active executors", e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#fetchExecutor(java.lang.String, int)
+ */
+ @Override
+ public Executor fetchExecutor(String host, int port)
+ throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+ try {
+ List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_HOST_PORT,
+ executorHandler, host, port);
+ if (executors.isEmpty()) {
+ return null;
+ } else {
+ return executors.get(0);
+ }
+ } catch (Exception e) {
+ throw new ExecutorManagerException(String.format(
+ "Error fetching executor %s:%d", host, port), e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#fetchExecutor(int)
+ */
+ @Override
+ public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+ FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+ try {
+ List<Executor> executors =
+ runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_ID,
+ executorHandler, executorId);
+ if (executors.isEmpty()) {
+ return null;
+ } else {
+ return executors.get(0);
+ }
+ } catch (Exception e) {
+ throw new ExecutorManagerException(String.format(
+ "Error fetching executor with id: %d", executorId), e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#updateExecutor(int)
+ */
+ @Override
+ public void updateExecutor(Executor executor) throws ExecutorManagerException {
+ final String UPDATE =
+ "UPDATE executors SET host=?, port=?, active=? where id=?";
+
+ QueryRunner runner = createQueryRunner();
+ try {
+ int rows =
+ runner.update(UPDATE, executor.getHost(), executor.getPort(),
+ executor.isActive(), executor.getId());
+ if (rows == 0) {
+ throw new ExecutorManagerException("No executor with id :"
+ + executor.getId());
+ }
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Error inactivating executor "
+ + executor.getId(), e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#addExecutor(java.lang.String, int)
+ */
+ @Override
+ public Executor addExecutor(String host, int port)
+ throws ExecutorManagerException {
+ // verify, if executor already exists
+ Executor executor = fetchExecutor(host, port);
+ if (executor != null) {
+ throw new ExecutorManagerException(String.format(
+ "Executor %s:%d already exist", host, port));
+ }
+ // add new executor
+ addExecutorHelper(host, port);
+ // fetch newly added executor
+ executor = fetchExecutor(host, port);
+
+ return executor;
+ }
+
+ private void addExecutorHelper(String host, int port)
+ throws ExecutorManagerException {
+ final String INSERT = "INSERT INTO executors (host, port) values (?,?)";
+ QueryRunner runner = createQueryRunner();
+ try {
+ runner.update(INSERT, host, port);
+ } catch (SQLException e) {
+ throw new ExecutorManagerException(String.format("Error adding %s:%d ",
+ host, port), e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#postExecutorEvent(azkaban.executor.Executor,
+ * azkaban.executor.ExecutorLogEvent.EventType, java.lang.String,
+ * java.lang.String)
+ */
+ @Override
+ public void postExecutorEvent(Executor executor, EventType type, String user,
+ String message) throws ExecutorManagerException{
+ QueryRunner runner = createQueryRunner();
+
+ final String INSERT_PROJECT_EVENTS =
+ "INSERT INTO executor_events (executor_id, event_type, event_time, username, message) values (?,?,?,?,?)";
+ Date updateDate = new Date();
+ try {
+ runner.update(INSERT_PROJECT_EVENTS, executor.getId(), type.getNumVal(),
+ updateDate, user, message);
+ } catch (SQLException e) {
+ throw new ExecutorManagerException("Failed to post executor event", e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorLoader#getExecutorEvents(azkaban.executor.Executor,
+ * int, int)
+ */
+ @Override
+ public List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
+ int offset) throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+
+ ExecutorLogsResultHandler logHandler = new ExecutorLogsResultHandler();
+ List<ExecutorLogEvent> events = null;
+ try {
+ events =
+ runner.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
+ logHandler, executor.getId(), num, offset);
+ } catch (SQLException e) {
+ throw new ExecutorManagerException(
+ "Failed to fetch events for executor id : " + executor.getId(), e);
+ }
+
+ return events;
+ }
+
private static class LastInsertID implements ResultSetHandler<Long> {
private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
@@ -1134,4 +1337,71 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return updateNum;
}
+
+ /**
+ * JDBC ResultSetHandler to fetch records from executors table
+ */
+ private static class FetchExecutorHandler implements
+ ResultSetHandler<List<Executor>> {
+ private static String FETCH_ALL_EXECUTORS =
+ "SELECT id, host, port, active FROM executors";
+ private static String FETCH_ACTIVE_EXECUTORS =
+ "SELECT id, host, port, active FROM executors where active=true";
+ private static String FETCH_EXECUTOR_BY_ID =
+ "SELECT id, host, port, active FROM executors where id=?";
+ private static String FETCH_EXECUTOR_BY_HOST_PORT =
+ "SELECT id, host, port, active FROM executors where host=? AND port=?";
+
+ @Override
+ public List<Executor> handle(ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.<Executor> emptyList();
+ }
+
+ List<Executor> executors = new ArrayList<Executor>();
+ do {
+ int id = rs.getInt(1);
+ String host = rs.getString(2);
+ int port = rs.getInt(3);
+ boolean active = rs.getBoolean(4);
+ Executor executor = new Executor(id, host, port, active);
+ executors.add(executor);
+ } while (rs.next());
+
+ return executors;
+ }
+ }
+
+ /**
+ * JDBC ResultSetHandler to fetch records from executor_events table
+ */
+ private static class ExecutorLogsResultHandler implements
+ ResultSetHandler<List<ExecutorLogEvent>> {
+ private static String SELECT_EXECUTOR_EVENTS_ORDER =
+ "SELECT executor_id, event_type, event_time, username, message FROM executor_events "
+ + " WHERE executor_id=? ORDER BY event_time LIMIT ? OFFSET ?";
+
+ @Override
+ public List<ExecutorLogEvent> handle(ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.<ExecutorLogEvent> emptyList();
+ }
+
+ ArrayList<ExecutorLogEvent> events = new ArrayList<ExecutorLogEvent>();
+ do {
+ int executorId = rs.getInt(1);
+ int eventType = rs.getInt(2);
+ Date eventTime = rs.getDate(3);
+ String username = rs.getString(4);
+ String message = rs.getString(5);
+
+ ExecutorLogEvent event =
+ new ExecutorLogEvent(executorId, username, eventTime,
+ EventType.fromInteger(eventType), message);
+ events.add(event);
+ } while (rs.next());
+
+ return events;
+ }
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
index 134602b..e8e2dac 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -172,10 +172,10 @@ public class JavaProcessJob extends ProcessJob {
if (xmx > sizeMaxXmx) {
throw new Exception(String.format("%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
- getId(), maxXms));
+ getId(), maxXmx));
}
}
return new Pair<Long, Long>(xms, xmx);
}
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/Utils.java b/azkaban-common/src/main/java/azkaban/utils/Utils.java
index 7227b0b..138b80f 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Utils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Utils.java
@@ -111,6 +111,19 @@ public class Utils {
System.exit(exitCode);
}
+ /**
+ * Tests whether a port is valid or not
+ *
+ * @param port
+ * @return true, if port is valid
+ */
+ public static boolean isValidPort(int port) {
+ if (port >= 1 && port <= 65535) {
+ return true;
+ }
+ return false;
+ }
+
public static File createTempDir() {
return createTempDir(new File(System.getProperty("java.io.tmpdir")));
}
@@ -410,7 +423,7 @@ public class Utils {
}
/**
- * @param strMemSize : memory string in the format such as 1G, 500M, 3000K, 5000
+ * @param strMemSize : memory string in the format such as 1G, 500M, 3000K, 5000
* @return : long value of memory amount in kb
*/
public static long parseMemString(String strMemSize) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index f453f0c..71be50f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -21,8 +21,10 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
@@ -30,17 +32,18 @@ import javax.sql.DataSource;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
-
import org.joda.time.DateTime;
-
+import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import azkaban.database.DataSourceUtils;
+import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.flow.Flow;
import azkaban.project.Project;
+import azkaban.user.User;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
@@ -117,12 +120,31 @@ public class JdbcExecutorLoaderTest {
return;
}
- DbUtils.closeQuietly(connection);
+ try {
+ runner.query(connection, "SELECT COUNT(1) FROM executors",
+ countHandler);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
- clearDB();
+ try {
+ runner.query(connection, "SELECT COUNT(1) FROM executor_events",
+ countHandler);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ DbUtils.closeQuietly(connection);
}
- private static void clearDB() {
+ @After
+ public void clearDB() {
if (!testDBExists) {
return;
}
@@ -178,6 +200,24 @@ public class JdbcExecutorLoaderTest {
return;
}
+ try {
+ runner.update(connection, "DELETE FROM executors");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ try {
+ runner.update(connection, "DELETE FROM executor_events");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
DbUtils.closeQuietly(connection);
}
@@ -186,6 +226,7 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
+ Assert.assertEquals(1, 0);
ExecutorLoader loader = createLoader();
ExecutableFlow flow = createExecutableFlow("exec1");
@@ -293,6 +334,252 @@ public class JdbcExecutorLoaderTest {
}
+
+ /* Test all executors fetch from empty executors */
+ @Test
+ public void testFetchEmptyExecutors() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ List<Executor> executors = loader.fetchAllExecutors();
+ Assert.assertEquals(executors.size(), 0);
+ }
+
+ /* Test active executors fetch from empty executors */
+ @Test
+ public void testFetchEmptyActiveExecutors() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ List<Executor> executors = loader.fetchActiveExecutors();
+ Assert.assertEquals(executors.size(), 0);
+ }
+
+ /* Test missing executor fetch with search by executor id */
+ @Test
+ public void testFetchMissingExecutorId() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ Executor executor = loader.fetchExecutor(0);
+ Assert.assertEquals(executor, null);
+ }
+
+ /* Test missing executor fetch with search by host:port */
+ @Test
+ public void testFetchMissingExecutorHostPort() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ Executor executor = loader.fetchExecutor("localhost", 12345);
+ Assert.assertEquals(executor, null);
+ }
+
+ /* Test executor events fetch from with no logged executor */
+ @Test
+ public void testFetchEmptyExecutorEvents() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ Executor executor = new Executor(1, "localhost", 12345, true);
+ List<ExecutorLogEvent> executorEvents =
+ loader.getExecutorEvents(executor, 5, 0);
+ Assert.assertEquals(executorEvents.size(), 0);
+ }
+
+ /* Test logging ExecutorEvents */
+ @Test
+ public void testExecutorEvents() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ int skip = 1;
+ User user = new User("testUser");
+ Executor executor = new Executor(1, "localhost", 12345, true);
+ String message = "My message ";
+ EventType[] events =
+ { EventType.CREATED, EventType.HOST_UPDATE, EventType.INACTIVATION };
+
+ for (EventType event : events) {
+ loader.postExecutorEvent(executor, event, user.getUserId(),
+ message + event.getNumVal());
+ }
+
+ List<ExecutorLogEvent> eventLogs =
+ loader.getExecutorEvents(executor, 10, skip);
+ Assert.assertTrue(eventLogs.size() == 2);
+
+ for (int index = 0; index < eventLogs.size(); ++index) {
+ ExecutorLogEvent eventLog = eventLogs.get(index);
+ Assert.assertEquals(eventLog.getExecutorId(), executor.getId());
+ Assert.assertEquals(eventLog.getUser(), user.getUserId());
+ Assert.assertEquals(eventLog.getType(), events[index + skip]);
+ Assert.assertEquals(eventLog.getMessage(),
+ message + events[index + skip].getNumVal());
+ }
+ }
+
+ /* Test to add duplicate executors */
+ @Test
+ public void testDuplicateAddExecutor() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ try {
+ String host = "localhost";
+ int port = 123456;
+ loader.addExecutor(host, port);
+ loader.addExecutor(host, port);
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ }
+
+ /* Test to try update a non-existent executor */
+ @Test
+ public void testMissingExecutorUpdate() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ try {
+ Executor executor = new Executor(1, "localhost", 1234, true);
+ loader.updateExecutor(executor);
+ Assert.fail("Expecting exception, but didn't get one");
+ } catch (ExecutorManagerException ex) {
+ System.out.println("Test true");
+ }
+ }
+
+ /* Test add & fetch by Id Executors */
+ @Test
+ public void testSingleExecutorFetchById() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ List<Executor> executors = addTestExecutors(loader);
+ for (Executor executor : executors) {
+ Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+ Assert.assertEquals(executor, fetchedExecutor);
+ }
+ }
+
+ /* Test fetch all executors */
+ @Test
+ public void testFetchAllExecutors() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ List<Executor> executors = addTestExecutors(loader);
+
+ executors.get(0).setActive(false);
+ loader.updateExecutor(executors.get(0));
+
+ List<Executor> fetchedExecutors = loader.fetchAllExecutors();
+ Assert.assertEquals(executors.size(), fetchedExecutors.size());
+
+ Assert.assertArrayEquals(executors.toArray(), fetchedExecutors.toArray());
+ }
+
+ /* Test fetch only active executors */
+ @Test
+ public void testFetchActiveExecutors() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ List<Executor> executors = addTestExecutors(loader);
+
+ executors.get(0).setActive(false);
+ loader.updateExecutor(executors.get(0));
+
+ List<Executor> fetchedExecutors = loader.fetchActiveExecutors();
+ Assert.assertEquals(executors.size(), fetchedExecutors.size() + 1);
+ executors.remove(0);
+
+ Assert.assertArrayEquals(executors.toArray(), fetchedExecutors.toArray());
+ }
+
+ /* Test add & fetch by host:port Executors */
+ @Test
+ public void testSingleExecutorFetchHostPort() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+ ExecutorLoader loader = createLoader();
+ List<Executor> executors = addTestExecutors(loader);
+ for (Executor executor : executors) {
+ Executor fetchedExecutor =
+ loader.fetchExecutor(executor.getHost(), executor.getPort());
+ Assert.assertEquals(executor, fetchedExecutor);
+ }
+ }
+
+ /* Helper method used in methods testing jdbc interface for executors table */
+ private List<Executor> addTestExecutors(ExecutorLoader loader)
+ throws ExecutorManagerException {
+ List<Executor> executors = new ArrayList<Executor>();
+ executors.add(loader.addExecutor("localhost1", 12345));
+ executors.add(loader.addExecutor("localhost2", 12346));
+ executors.add(loader.addExecutor("localhost1", 12347));
+ return executors;
+ }
+
+ /* Test Executor Inactivation */
+ @Test
+ public void testExecutorInactivation() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ Executor executor = loader.addExecutor("localhost1", 12345);
+ Assert.assertTrue(executor.isActive());
+
+ executor.setActive(false);
+ loader.updateExecutor(executor);
+
+ Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+
+ Assert.assertEquals(executor.getHost(), fetchedExecutor.getHost());
+ Assert.assertEquals(executor.getId(), fetchedExecutor.getId());
+ Assert.assertEquals(executor.getPort(), fetchedExecutor.getPort());
+ Assert.assertFalse(fetchedExecutor.isActive());
+ }
+
+ /* Test Executor reactivation */
+ @Test
+ public void testExecutorActivation() throws Exception {
+ if (!isTestSetup()) {
+ return;
+ }
+
+ ExecutorLoader loader = createLoader();
+ Executor executor = loader.addExecutor("localhost1", 12345);
+ Assert.assertTrue(executor.isActive());
+
+ executor.setActive(false);
+ loader.updateExecutor(executor);
+ Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+ Assert.assertFalse(fetchedExecutor.isActive());
+
+ executor.setActive(true);
+ loader.updateExecutor(executor);
+ fetchedExecutor = loader.fetchExecutor(executor.getId());
+
+ Assert.assertEquals(executor, fetchedExecutor);
+ }
+
@Test
public void testActiveReference() throws Exception {
if (!isTestSetup()) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index b9ad178..f2ffee8 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -17,10 +17,13 @@
package azkaban.executor;
import java.io.File;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
@@ -36,6 +39,10 @@ public class MockExecutorLoader implements ExecutorLoader {
HashMap<String, Integer> jobUpdateCount = new HashMap<String, Integer>();
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
new HashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+ List<Executor> executors = new ArrayList<Executor>();
+ int executorIdCounter = 0;
+ Map<Integer, ArrayList<ExecutorLogEvent>> executorEvents =
+ new HashMap<Integer, ArrayList<ExecutorLogEvent>>();
@Override
public void uploadExecutableFlow(ExecutableFlow flow)
@@ -249,4 +256,82 @@ public class MockExecutorLoader implements ExecutorLoader {
}
+ @Override
+ public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
+ List<Executor> activeExecutors = new ArrayList<Executor>();
+ for (Executor executor : executors) {
+ if (executor.isActive()) {
+ activeExecutors.add(executor);
+ }
+ }
+ return activeExecutors;
+ }
+
+ @Override
+ public Executor fetchExecutor(String host, int port)
+ throws ExecutorManagerException {
+ for (Executor executor : executors) {
+ if (executor.getHost().equals(host) && executor.getPort() == port) {
+ return executor;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
+ for (Executor executor : executors) {
+ if (executor.getId() == executorId) {
+ return executor;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Executor addExecutor(String host, int port)
+ throws ExecutorManagerException {
+ if (fetchExecutor(host, port) != null) {
+
+ }
+ executorIdCounter++;
+ Executor executor = new Executor(executorIdCounter, host, port, true);
+ return executor;
+ }
+
+ @Override
+ public void postExecutorEvent(Executor executor, EventType type, String user,
+ String message) throws ExecutorManagerException {
+ ExecutorLogEvent event =
+ new ExecutorLogEvent(executor.getId(), user, new Date(), type, message);
+
+ if (!executorEvents.containsKey(executor.getId())) {
+ executorEvents.put(executor.getId(), new ArrayList<ExecutorLogEvent>());
+ }
+
+ executorEvents.get(executor.getId()).add(event);
+ }
+
+ @Override
+ public List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
+ int skip) throws ExecutorManagerException {
+ if (!executorEvents.containsKey(executor.getId())) {
+ List<ExecutorLogEvent> events = executorEvents.get(executor.getId());
+ return events.subList(skip, Math.min(num + skip - 1, events.size() - 1));
+ }
+ return null;
+ }
+
+ @Override
+ public void updateExecutor(Executor executor) throws ExecutorManagerException {
+ Executor oldExecutor = fetchExecutor(executor.getId());
+ executors.remove(oldExecutor);
+ executors.add(executor);
+ }
+
+ @Override
+ public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
+ return executors;
+ }
+
}
diff --git a/azkaban-common/src/test/java/azkaban/utils/UtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/UtilsTest.java
new file mode 100644
index 0000000..4ea0930
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/UtilsTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test class for azkaban.utils.Utils
+ */
+public class UtilsTest {
+
+ /* Test negative port case */
+ @Test
+ public void testNegativePort() {
+ Assert.assertFalse(Utils.isValidPort(-1));
+ Assert.assertFalse(Utils.isValidPort(-10));
+ }
+
+ /* Test zero port case */
+ @Test
+ public void testZeroPort() {
+ Assert.assertFalse(Utils.isValidPort(0));
+ }
+
+ /* Test port beyond limit */
+ @Test
+ public void testOverflowPort() {
+ Assert.assertFalse(Utils.isValidPort(70000));
+ Assert.assertFalse(Utils.isValidPort(65536));
+ }
+
+ /* Test happy isValidPort case*/
+ @Test
+ public void testValidPort() {
+ Assert.assertTrue(Utils.isValidPort(1023));
+ Assert.assertTrue(Utils.isValidPort(10000));
+ Assert.assertTrue(Utils.isValidPort(3030));
+ Assert.assertTrue(Utils.isValidPort(1045));
+ }
+}
diff --git a/azkaban-sql/src/sql/create.executor_events.sql b/azkaban-sql/src/sql/create.executor_events.sql
new file mode 100644
index 0000000..1ff4799
--- /dev/null
+++ b/azkaban-sql/src/sql/create.executor_events.sql
@@ -0,0 +1,9 @@
+CREATE TABLE executor_events (
+ executor_id INT NOT NULL,
+ event_type TINYINT NOT NULL,
+ event_time DATETIME NOT NULL,
+ username VARCHAR(64),
+ message VARCHAR(512)
+);
+
+CREATE INDEX executor_log ON executor_events(executor_id, event_time);
\ No newline at end of file
azkaban-sql/src/sql/create.executors.sql 10(+10 -0)
diff --git a/azkaban-sql/src/sql/create.executors.sql b/azkaban-sql/src/sql/create.executors.sql
new file mode 100644
index 0000000..8e59ce3
--- /dev/null
+++ b/azkaban-sql/src/sql/create.executors.sql
@@ -0,0 +1,10 @@
+CREATE TABLE executors (
+ id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
+ host VARCHAR(64) NOT NULL,
+ port INT NOT NULL,
+ active BOOLEAN DEFAULT true,
+ UNIQUE (host, port),
+ UNIQUE INDEX executor_id (id)
+);
+
+CREATE INDEX executor_connection ON executors(host, port);