diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 3014a6c..4bff2c8 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -16,37 +16,41 @@
package azkaban.executor;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import azkaban.metrics.CommonMetrics;
+import azkaban.metrics.MetricsManager;
+import azkaban.user.User;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
import com.codahale.metrics.MetricRegistry;
import java.io.IOException;
-import java.util.Arrays;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-
import org.junit.Assert;
-import org.junit.Test;
import org.junit.Ignore;
-
-import azkaban.user.User;
-import azkaban.utils.Pair;
-import azkaban.utils.Props;
-import azkaban.utils.TestUtils;
-import static org.mockito.Mockito.*;
+import org.junit.Test;
/**
* Test class for executor manager
*/
public class ExecutorManagerTest {
+
+ private final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
private ExecutorManager manager;
private ExecutorLoader loader;
private Props props;
private User user;
- private Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
private ExecutableFlow flow1;
private ExecutableFlow flow2;
@@ -61,15 +65,15 @@ public class ExecutorManagerTest {
* ExecutorLoader
*/
private ExecutorManager createMultiExecutorManagerInstance(
- ExecutorLoader loader) throws ExecutorManagerException {
- Props props = new Props();
+ final ExecutorLoader loader) throws ExecutorManagerException {
+ final Props props = new Props();
props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
loader.addExecutor("localhost", 12345);
loader.addExecutor("localhost", 12346);
return new ExecutorManager(props, loader, new AlerterHolder(props),
- new CommonMetrics(new MetricRegistry()));
+ new CommonMetrics(new MetricsManager(new MetricRegistry())));
}
/*
@@ -78,13 +82,12 @@ public class ExecutorManagerTest {
*/
@Test(expected = ExecutorManagerException.class)
public void testNoExecutorScenario() throws ExecutorManagerException {
- Props props = new Props();
+ final Props props = new Props();
props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
- ExecutorLoader loader = new MockExecutorLoader();
- @SuppressWarnings("unused")
- ExecutorManager manager =
+ final ExecutorLoader loader = new MockExecutorLoader();
+ @SuppressWarnings("unused") final ExecutorManager manager =
new ExecutorManager(props, loader, new AlerterHolder(props),
- new CommonMetrics(new MetricRegistry()));
+ new CommonMetrics(new MetricsManager(new MetricRegistry())));
}
/*
@@ -92,18 +95,18 @@ public class ExecutorManagerTest {
*/
@Test
public void testLocalExecutorScenario() throws ExecutorManagerException {
- Props props = new Props();
+ final Props props = new Props();
props.put("executor.port", 12345);
- ExecutorLoader loader = new MockExecutorLoader();
- ExecutorManager manager =
+ final ExecutorLoader loader = new MockExecutorLoader();
+ final ExecutorManager manager =
new ExecutorManager(props, loader, new AlerterHolder(props),
- new CommonMetrics(new MetricRegistry()));
- Set<Executor> activeExecutors =
+ new CommonMetrics(new MetricsManager(new MetricRegistry())));
+ final Set<Executor> activeExecutors =
new HashSet(manager.getAllActiveExecutors());
Assert.assertEquals(activeExecutors.size(), 1);
- Executor executor = activeExecutors.iterator().next();
+ final Executor executor = activeExecutors.iterator().next();
Assert.assertEquals(executor.getHost(), "localhost");
Assert.assertEquals(executor.getPort(), 12345);
Assert.assertArrayEquals(activeExecutors.toArray(), loader
@@ -115,16 +118,16 @@ public class ExecutorManagerTest {
*/
@Test
public void testMultipleExecutorScenario() throws ExecutorManagerException {
- Props props = new Props();
+ final Props props = new Props();
props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
- ExecutorLoader loader = new MockExecutorLoader();
- Executor executor1 = loader.addExecutor("localhost", 12345);
- Executor executor2 = loader.addExecutor("localhost", 12346);
+ final ExecutorLoader loader = new MockExecutorLoader();
+ final Executor executor1 = loader.addExecutor("localhost", 12345);
+ final Executor executor2 = loader.addExecutor("localhost", 12346);
- ExecutorManager manager =
+ final ExecutorManager manager =
new ExecutorManager(props, loader, new AlerterHolder(props),
- new CommonMetrics(new MetricRegistry()));
- Set<Executor> activeExecutors =
+ new CommonMetrics(new MetricsManager(new MetricRegistry())));
+ final Set<Executor> activeExecutors =
new HashSet(manager.getAllActiveExecutors());
Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
executor1, executor2 });
@@ -135,22 +138,22 @@ public class ExecutorManagerTest {
*/
@Test
public void testSetupExecutorsSucess() throws ExecutorManagerException {
- Props props = new Props();
+ final Props props = new Props();
props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
- ExecutorLoader loader = new MockExecutorLoader();
- Executor executor1 = loader.addExecutor("localhost", 12345);
+ final ExecutorLoader loader = new MockExecutorLoader();
+ final Executor executor1 = loader.addExecutor("localhost", 12345);
- ExecutorManager manager =
+ final ExecutorManager manager =
new ExecutorManager(props, loader, new AlerterHolder(props),
- new CommonMetrics(new MetricRegistry()));
+ new CommonMetrics(new MetricsManager(new MetricRegistry())));
Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
new Executor[] { executor1 });
// mark older executor as inactive
executor1.setActive(false);
loader.updateExecutor(executor1);
- Executor executor2 = loader.addExecutor("localhost", 12346);
- Executor executor3 = loader.addExecutor("localhost", 12347);
+ final Executor executor2 = loader.addExecutor("localhost", 12346);
+ final Executor executor3 = loader.addExecutor("localhost", 12347);
manager.setupExecutors();
Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
@@ -163,15 +166,15 @@ public class ExecutorManagerTest {
*/
@Test(expected = ExecutorManagerException.class)
public void testSetupExecutorsException() throws ExecutorManagerException {
- Props props = new Props();
+ final Props props = new Props();
props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
- ExecutorLoader loader = new MockExecutorLoader();
- Executor executor1 = loader.addExecutor("localhost", 12345);
+ final ExecutorLoader loader = new MockExecutorLoader();
+ final Executor executor1 = loader.addExecutor("localhost", 12345);
- ExecutorManager manager =
+ final ExecutorManager manager =
new ExecutorManager(props, loader, new AlerterHolder(props),
- new CommonMetrics(new MetricRegistry()));
- Set<Executor> activeExecutors =
+ new CommonMetrics(new MetricsManager(new MetricRegistry())));
+ final Set<Executor> activeExecutors =
new HashSet(manager.getAllActiveExecutors());
Assert.assertArrayEquals(activeExecutors.toArray(),
new Executor[] { executor1 });
@@ -185,7 +188,7 @@ public class ExecutorManagerTest {
/* Test disabling queue process thread to pause dispatching */
@Test
public void testDisablingQueueProcessThread() throws ExecutorManagerException {
- ExecutorManager manager = createMultiExecutorManagerInstance();
+ final ExecutorManager manager = createMultiExecutorManagerInstance();
manager.enableQueueProcessorThread();
Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
manager.disableQueueProcessorThread();
@@ -195,7 +198,7 @@ public class ExecutorManagerTest {
/* Test renabling queue process thread to pause restart dispatching */
@Test
public void testEnablingQueueProcessThread() throws ExecutorManagerException {
- ExecutorManager manager = createMultiExecutorManagerInstance();
+ final ExecutorManager manager = createMultiExecutorManagerInstance();
Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
manager.enableQueueProcessorThread();
Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
@@ -204,30 +207,30 @@ public class ExecutorManagerTest {
/* Test submit a non-dispatched flow */
@Test
public void testQueuedFlows() throws ExecutorManagerException, IOException {
- ExecutorLoader loader = new MockExecutorLoader();
- ExecutorManager manager = createMultiExecutorManagerInstance(loader);
- ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutorLoader loader = new MockExecutorLoader();
+ final ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+ final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
flow1.setExecutionId(1);
- ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
flow2.setExecutionId(2);
- User testUser = TestUtils.getTestUser();
+ final User testUser = TestUtils.getTestUser();
manager.submitExecutableFlow(flow1, testUser.getUserId());
manager.submitExecutableFlow(flow2, testUser.getUserId());
- List<Integer> testFlows = Arrays.asList(flow1.getExecutionId(), flow2.getExecutionId());
+ final List<Integer> testFlows = Arrays.asList(flow1.getExecutionId(), flow2.getExecutionId());
- List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
+ final List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
loader.fetchQueuedFlows();
Assert.assertEquals(queuedFlowsDB.size(), testFlows.size());
// Verify things are correctly setup in db
- for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
+ for (final Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
Assert.assertTrue(testFlows.contains(pair.getSecond().getExecutionId()));
}
// Verify running flows using old definition of "running" flows i.e. a
// non-dispatched flow is also considered running
- List<Integer> managerActiveFlows = manager.getRunningFlows()
+ final List<Integer> managerActiveFlows = manager.getRunningFlows()
.stream().map(ExecutableFlow::getExecutionId).collect(Collectors.toList());
Assert.assertTrue(managerActiveFlows.containsAll(testFlows)
&& testFlows.containsAll(managerActiveFlows));
@@ -240,12 +243,12 @@ public class ExecutorManagerTest {
@Test(expected = ExecutorManagerException.class)
public void testDuplicateQueuedFlows() throws ExecutorManagerException,
IOException {
- ExecutorManager manager = createMultiExecutorManagerInstance();
- ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutorManager manager = createMultiExecutorManagerInstance();
+ final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
flow1.getExecutionOptions().setConcurrentOption(
ExecutionOptions.CONCURRENT_OPTION_SKIP);
- User testUser = TestUtils.getTestUser();
+ final User testUser = TestUtils.getTestUser();
manager.submitExecutableFlow(flow1, testUser.getUserId());
manager.submitExecutableFlow(flow1, testUser.getUserId());
}
@@ -256,14 +259,14 @@ public class ExecutorManagerTest {
*/
@Test
public void testKillQueuedFlow() throws ExecutorManagerException, IOException {
- ExecutorLoader loader = new MockExecutorLoader();
- ExecutorManager manager = createMultiExecutorManagerInstance(loader);
- ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
- User testUser = TestUtils.getTestUser();
+ final ExecutorLoader loader = new MockExecutorLoader();
+ final ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+ final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final User testUser = TestUtils.getTestUser();
manager.submitExecutableFlow(flow1, testUser.getUserId());
manager.cancelFlow(flow1, testUser.getUserId());
- ExecutableFlow fetchedFlow =
+ final ExecutableFlow fetchedFlow =
loader.fetchExecutableFlow(flow1.getExecutionId());
Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
@@ -277,17 +280,17 @@ public class ExecutorManagerTest {
@Test
public void testSubmitFlows() throws ExecutorManagerException, IOException {
testSetUpForRunningFlows();
- ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
- manager.submitExecutableFlow(flow1, user.getUserId());
- verify(loader).uploadExecutableFlow(flow1);
- verify(loader).addActiveExecutableReference(any());
+ final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ this.manager.submitExecutableFlow(flow1, this.user.getUserId());
+ verify(this.loader).uploadExecutableFlow(flow1);
+ verify(this.loader).addActiveExecutableReference(any());
}
@Ignore @Test
public void testFetchAllActiveFlows() throws ExecutorManagerException, IOException {
testSetUpForRunningFlows();
- List<ExecutableFlow> flows = manager.getRunningFlows();
- for(Pair<ExecutionReference, ExecutableFlow> pair : activeFlows.values()) {
+ final List<ExecutableFlow> flows = this.manager.getRunningFlows();
+ for (final Pair<ExecutionReference, ExecutableFlow> pair : this.activeFlows.values()) {
Assert.assertTrue(flows.contains(pair.getSecond()));
}
}
@@ -295,28 +298,30 @@ public class ExecutorManagerTest {
@Ignore @Test
public void testFetchActiveFlowByProject() throws ExecutorManagerException, IOException {
testSetUpForRunningFlows();
- List<Integer> executions = manager.getRunningFlows(flow1.getProjectId(), flow1.getFlowId());
- Assert.assertTrue(executions.contains(flow1.getExecutionId()));
- Assert.assertTrue(manager.isFlowRunning(flow1.getProjectId(), flow1.getFlowId()));
+ final List<Integer> executions = this.manager.getRunningFlows(this.flow1.getProjectId(),
+ this.flow1.getFlowId());
+ Assert.assertTrue(executions.contains(this.flow1.getExecutionId()));
+ Assert
+ .assertTrue(this.manager.isFlowRunning(this.flow1.getProjectId(), this.flow1.getFlowId()));
}
@Ignore @Test
public void testFetchActiveFlowWithExecutor() throws ExecutorManagerException, IOException {
testSetUpForRunningFlows();
- List<Pair<ExecutableFlow, Executor>> activeFlowsWithExecutor =
- manager.getActiveFlowsWithExecutor();
- Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(flow1,
- manager.fetchExecutor(flow1.getExecutionId()))));
- Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(flow2,
- manager.fetchExecutor(flow2.getExecutionId()))));
+ final List<Pair<ExecutableFlow, Executor>> activeFlowsWithExecutor =
+ this.manager.getActiveFlowsWithExecutor();
+ Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(this.flow1,
+ this.manager.fetchExecutor(this.flow1.getExecutionId()))));
+ Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(this.flow2,
+ this.manager.fetchExecutor(this.flow2.getExecutionId()))));
}
@Test
public void testFetchAllActiveExecutorServerHosts() throws ExecutorManagerException, IOException {
testSetUpForRunningFlows();
- Set<String> activeExecutorServerHosts = manager.getAllActiveExecutorServerHosts();
- Executor executor1 = manager.fetchExecutor(flow1.getExecutionId());
- Executor executor2 = manager.fetchExecutor(flow2.getExecutionId());
+ final Set<String> activeExecutorServerHosts = this.manager.getAllActiveExecutorServerHosts();
+ final Executor executor1 = this.manager.fetchExecutor(this.flow1.getExecutionId());
+ final Executor executor2 = this.manager.fetchExecutor(this.flow2.getExecutionId());
Assert.assertTrue(activeExecutorServerHosts.contains(executor1.getHost() + ":" + executor1.getPort()));
Assert.assertTrue(activeExecutorServerHosts.contains(executor2.getHost() + ":" + executor2.getPort()));
}
@@ -326,34 +331,34 @@ public class ExecutorManagerTest {
*/
private void testSetUpForRunningFlows()
throws ExecutorManagerException, IOException {
- loader = mock(ExecutorLoader.class);
- user = TestUtils.getTestUser();
- props = new Props();
- props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+ this.loader = mock(ExecutorLoader.class);
+ this.user = TestUtils.getTestUser();
+ this.props = new Props();
+ this.props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
//To test runningFlows, AZKABAN_QUEUEPROCESSING_ENABLED should be set to true
//so that flows will be dispatched to executors.
- props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "true");
+ this.props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "true");
- List<Executor> executors = new ArrayList<>();
- Executor executor1 = new Executor(1, "localhost", 12345, true);
- Executor executor2 = new Executor(2, "localhost", 12346, true);
+ final List<Executor> executors = new ArrayList<>();
+ final Executor executor1 = new Executor(1, "localhost", 12345, true);
+ final Executor executor2 = new Executor(2, "localhost", 12346, true);
executors.add(executor1);
executors.add(executor2);
- when(loader.fetchActiveExecutors()).thenReturn(executors);
- manager = new ExecutorManager(props, loader, new AlerterHolder(props),
- new CommonMetrics(new MetricRegistry()));
-
- flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
- flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
- flow1.setExecutionId(1);
- flow2.setExecutionId(2);
- ExecutionReference ref1 =
- new ExecutionReference(flow1.getExecutionId(), executor1);
- ExecutionReference ref2 =
- new ExecutionReference(flow2.getExecutionId(), executor2);
- activeFlows.put(flow1.getExecutionId(), new Pair<>(ref1, flow1));
- activeFlows.put(flow2.getExecutionId(), new Pair<>(ref2, flow2));
- when(loader.fetchActiveFlows()).thenReturn(activeFlows);
+ when(this.loader.fetchActiveExecutors()).thenReturn(executors);
+ this.manager = new ExecutorManager(this.props, this.loader, new AlerterHolder(this.props),
+ new CommonMetrics(new MetricsManager(new MetricRegistry())));
+
+ this.flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ this.flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ this.flow1.setExecutionId(1);
+ this.flow2.setExecutionId(2);
+ final ExecutionReference ref1 =
+ new ExecutionReference(this.flow1.getExecutionId(), executor1);
+ final ExecutionReference ref2 =
+ new ExecutionReference(this.flow2.getExecutionId(), executor2);
+ this.activeFlows.put(this.flow1.getExecutionId(), new Pair<>(ref1, this.flow1));
+ this.activeFlows.put(this.flow2.getExecutionId(), new Pair<>(ref2, this.flow2));
+ when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
}
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index e9874a1..3b3f1ca 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -16,7 +16,15 @@
package azkaban.executor;
+import azkaban.database.DataSourceUtils;
+import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.metrics.CommonMetrics;
+import azkaban.metrics.MetricsManager;
+import azkaban.user.User;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
import com.codahale.metrics.MetricRegistry;
import java.io.File;
import java.io.IOException;
@@ -26,12 +34,9 @@ import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-
import javax.sql.DataSource;
-
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
@@ -43,16 +48,7 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-import azkaban.database.DataSourceUtils;
-import azkaban.executor.ExecutorLogEvent.EventType;
-import azkaban.user.User;
-import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.Pair;
-import azkaban.utils.Props;
-import azkaban.utils.TestUtils;
-
public class JdbcExecutorLoaderTest {
- private static boolean testDBExists;
/* Directory with serialized description of test flows */
private static final String UNIT_BASE_DIR =
"../test/src/test/resources/azkaban/test/executions";
@@ -65,10 +61,11 @@ public class JdbcExecutorLoaderTest {
private static final int numConnections = 10;
private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(1);
private static final Duration FLOW_FINISHED_TIME = Duration.ofMinutes(2);
+ private static boolean testDBExists;
@BeforeClass
public static void setupDB() {
- DataSource dataSource =
+ final DataSource dataSource =
DataSourceUtils.getMySQLDataSource(host, port, database, user,
password, numConnections);
testDBExists = true;
@@ -76,19 +73,19 @@ public class JdbcExecutorLoaderTest {
Connection connection = null;
try {
connection = dataSource.getConnection();
- } catch (SQLException e) {
+ } catch (final SQLException e) {
e.printStackTrace();
testDBExists = false;
DbUtils.closeQuietly(connection);
return;
}
- CountHandler countHandler = new CountHandler();
- QueryRunner runner = new QueryRunner();
+ final CountHandler countHandler = new CountHandler();
+ final QueryRunner runner = new QueryRunner();
try {
runner.query(connection, "SELECT COUNT(1) FROM active_executing_flows",
countHandler);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
e.printStackTrace();
testDBExists = false;
DbUtils.closeQuietly(connection);
@@ -98,7 +95,7 @@ public class JdbcExecutorLoaderTest {
try {
runner.query(connection, "SELECT COUNT(1) FROM execution_flows",
countHandler);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
e.printStackTrace();
testDBExists = false;
DbUtils.closeQuietly(connection);
@@ -108,7 +105,7 @@ public class JdbcExecutorLoaderTest {
try {
runner.query(connection, "SELECT COUNT(1) FROM execution_jobs",
countHandler);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
e.printStackTrace();
testDBExists = false;
DbUtils.closeQuietly(connection);
@@ -118,7 +115,7 @@ public class JdbcExecutorLoaderTest {
try {
runner.query(connection, "SELECT COUNT(1) FROM execution_logs",
countHandler);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
e.printStackTrace();
testDBExists = false;
DbUtils.closeQuietly(connection);
@@ -128,7 +125,7 @@ public class JdbcExecutorLoaderTest {
try {
runner.query(connection, "SELECT COUNT(1) FROM executors",
countHandler);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
e.printStackTrace();
testDBExists = false;
DbUtils.closeQuietly(connection);
@@ -138,7 +135,7 @@ public class JdbcExecutorLoaderTest {
try {
runner.query(connection, "SELECT COUNT(1) FROM executor_events",
countHandler);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
e.printStackTrace();
testDBExists = false;
DbUtils.closeQuietly(connection);
@@ -154,24 +151,24 @@ public class JdbcExecutorLoaderTest {
return;
}
- DataSource dataSource =
+ final DataSource dataSource =
DataSourceUtils.getMySQLDataSource(host, port, database, user,
password, numConnections);
Connection connection = null;
try {
connection = dataSource.getConnection();
- } catch (SQLException e) {
+ } catch (final SQLException e) {
e.printStackTrace();
testDBExists = false;
DbUtils.closeQuietly(connection);
return;
}
- QueryRunner runner = new QueryRunner();
+ final QueryRunner runner = new QueryRunner();
try {
runner.update(connection, "DELETE FROM active_executing_flows");
- } catch (SQLException e) {
+ } catch (final SQLException e) {
e.printStackTrace();
testDBExists = false;
DbUtils.closeQuietly(connection);
@@ -180,7 +177,7 @@ public class JdbcExecutorLoaderTest {
try {
runner.update(connection, "DELETE FROM execution_flows");
- } catch (SQLException e) {
+ } catch (final SQLException e) {
e.printStackTrace();
testDBExists = false;
DbUtils.closeQuietly(connection);
@@ -189,7 +186,7 @@ public class JdbcExecutorLoaderTest {
try {
runner.update(connection, "DELETE FROM execution_jobs");
- } catch (SQLException e) {
+ } catch (final SQLException e) {
e.printStackTrace();
testDBExists = false;
DbUtils.closeQuietly(connection);
@@ -198,7 +195,7 @@ public class JdbcExecutorLoaderTest {
try {
runner.update(connection, "DELETE FROM execution_logs");
- } catch (SQLException e) {
+ } catch (final SQLException e) {
e.printStackTrace();
testDBExists = false;
DbUtils.closeQuietly(connection);
@@ -207,7 +204,7 @@ public class JdbcExecutorLoaderTest {
try {
runner.update(connection, "DELETE FROM executors");
- } catch (SQLException e) {
+ } catch (final SQLException e) {
e.printStackTrace();
testDBExists = false;
DbUtils.closeQuietly(connection);
@@ -216,7 +213,7 @@ public class JdbcExecutorLoaderTest {
try {
runner.update(connection, "DELETE FROM executor_events");
- } catch (SQLException e) {
+ } catch (final SQLException e) {
e.printStackTrace();
testDBExists = false;
DbUtils.closeQuietly(connection);
@@ -231,12 +228,12 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutorLoader loader = createLoader();
+ final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
- ExecutableFlow fetchFlow =
+ final ExecutableFlow fetchFlow =
loader.fetchExecutableFlow(flow.getExecutionId());
// Shouldn't be the same object.
@@ -250,8 +247,8 @@ public class JdbcExecutorLoaderTest {
Assert.assertEquals(flow.getVersion(), fetchFlow.getVersion());
Assert.assertEquals(flow.getExecutionOptions().getFailureAction(),
fetchFlow.getExecutionOptions().getFailureAction());
- Assert.assertEquals(new HashSet<String>(flow.getEndNodes()),
- new HashSet<String>(fetchFlow.getEndNodes()));
+ Assert.assertEquals(new HashSet<>(flow.getEndNodes()),
+ new HashSet<>(fetchFlow.getEndNodes()));
}
@Test
@@ -260,18 +257,18 @@ public class JdbcExecutorLoaderTest {
return;
}
- ExecutorLoader loader = createLoader();
- ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutorLoader loader = createLoader();
+ final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
- ExecutableFlow fetchFlow2 =
+ final ExecutableFlow fetchFlow2 =
loader.fetchExecutableFlow(flow.getExecutionId());
fetchFlow2.setEndTime(System.currentTimeMillis());
fetchFlow2.setStatus(Status.SUCCEEDED);
loader.updateExecutableFlow(fetchFlow2);
- ExecutableFlow fetchFlow =
+ final ExecutableFlow fetchFlow =
loader.fetchExecutableFlow(flow.getExecutionId());
// Shouldn't be the same object.
@@ -286,8 +283,8 @@ public class JdbcExecutorLoaderTest {
Assert.assertEquals(flow.getVersion(), fetchFlow.getVersion());
Assert.assertEquals(flow.getExecutionOptions().getFailureAction(),
fetchFlow.getExecutionOptions().getFailureAction());
- Assert.assertEquals(new HashSet<String>(flow.getEndNodes()),
- new HashSet<String>(fetchFlow.getEndNodes()));
+ Assert.assertEquals(new HashSet<>(flow.getEndNodes()),
+ new HashSet<>(fetchFlow.getEndNodes()));
}
@Test
@@ -296,18 +293,18 @@ public class JdbcExecutorLoaderTest {
return;
}
- ExecutorLoader loader = createLoader();
- ExecutableFlow flow = createExecutableFlow(10, "exec1");
+ final ExecutorLoader loader = createLoader();
+ final ExecutableFlow flow = createExecutableFlow(10, "exec1");
flow.setExecutionId(10);
- File jobFile = new File(UNIT_BASE_DIR + "/exectest1", "job10.job");
- Props props = new Props(null, jobFile);
+ final File jobFile = new File(UNIT_BASE_DIR + "/exectest1", "job10.job");
+ final Props props = new Props(null, jobFile);
props.put("test", "test2");
- ExecutableNode oldNode = flow.getExecutableNode("job10");
+ final ExecutableNode oldNode = flow.getExecutableNode("job10");
oldNode.setStartTime(System.currentTimeMillis());
loader.uploadExecutableNode(oldNode, props);
- ExecutableJobInfo info = loader.fetchJobInfo(10, "job10", 0);
+ final ExecutableJobInfo info = loader.fetchJobInfo(10, "job10", 0);
Assert.assertEquals(flow.getExecutionId(), info.getExecId());
Assert.assertEquals(flow.getProjectId(), info.getProjectId());
Assert.assertEquals(flow.getVersion(), info.getVersion());
@@ -320,15 +317,15 @@ public class JdbcExecutorLoaderTest {
info.getEndTime());
// Fetch props
- Props outputProps = new Props();
+ final Props outputProps = new Props();
outputProps.put("hello", "output");
oldNode.setOutputProps(outputProps);
oldNode.setEndTime(System.currentTimeMillis());
loader.updateExecutableNode(oldNode);
- Props fInputProps = loader.fetchExecutionJobInputProps(10, "job10");
- Props fOutputProps = loader.fetchExecutionJobOutputProps(10, "job10");
- Pair<Props, Props> inOutProps = loader.fetchExecutionJobProps(10, "job10");
+ final Props fInputProps = loader.fetchExecutionJobInputProps(10, "job10");
+ final Props fOutputProps = loader.fetchExecutionJobOutputProps(10, "job10");
+ final Pair<Props, Props> inOutProps = loader.fetchExecutionJobProps(10, "job10");
Assert.assertEquals(fInputProps.get("test"), "test2");
Assert.assertEquals(fOutputProps.get("hello"), "output");
@@ -344,11 +341,11 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
+ final ExecutorLoader loader = createLoader();
try {
loader.unassignExecutor(2);
Assert.fail("Expecting exception, but didn't get one");
- } catch (ExecutorManagerException ex) {
+ } catch (final ExecutorManagerException ex) {
System.out.println("Test true");
}
}
@@ -360,11 +357,11 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- String host = "localhost";
- int port = 12345;
- Executor executor = loader.addExecutor(host, port);
- ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutorLoader loader = createLoader();
+ final String host = "localhost";
+ final int port = 12345;
+ final Executor executor = loader.addExecutor(host, port);
+ final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
loader.assignExecutor(executor.getId(), flow.getExecutionId());
Assert.assertEquals(
@@ -381,13 +378,13 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutorLoader loader = createLoader();
+ final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
try {
loader.assignExecutor(flow.getExecutionId(), 1);
Assert.fail("Expecting exception, but didn't get one");
- } catch (ExecutorManagerException ex) {
+ } catch (final ExecutorManagerException ex) {
System.out.println("Test true");
}
}
@@ -399,14 +396,14 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- String host = "localhost";
- int port = 12345;
- Executor executor = loader.addExecutor(host, port);
+ final ExecutorLoader loader = createLoader();
+ final String host = "localhost";
+ final int port = 12345;
+ final Executor executor = loader.addExecutor(host, port);
try {
loader.assignExecutor(2, executor.getId());
Assert.fail("Expecting exception, but didn't get one");
- } catch (ExecutorManagerException ex) {
+ } catch (final ExecutorManagerException ex) {
System.out.println("Test true");
}
}
@@ -418,7 +415,7 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
+ final ExecutorLoader loader = createLoader();
Assert.assertEquals(loader.fetchExecutorByExecutionId(1), null);
}
@@ -429,8 +426,8 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutorLoader loader = createLoader();
+ final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
null);
@@ -443,11 +440,11 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- String host = "localhost";
- int port = 12345;
- Executor executor = loader.addExecutor(host, port);
- ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutorLoader loader = createLoader();
+ final String host = "localhost";
+ final int port = 12345;
+ final Executor executor = loader.addExecutor(host, port);
+ final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
loader.assignExecutor(executor.getId(), flow.getExecutionId());
Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
@@ -462,25 +459,25 @@ public class JdbcExecutorLoaderTest {
return;
}
- ExecutorLoader loader = createLoader();
- List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+ final ExecutorLoader loader = createLoader();
+ final List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
loader.fetchQueuedFlows();
// no execution flows at all i.e. no running, completed or queued flows
Assert.assertTrue(queuedFlows.isEmpty());
- String host = "lcoalhost";
- int port = 12345;
- Executor executor = loader.addExecutor(host, port);
+ final String host = "lcoalhost";
+ final int port = 12345;
+ final Executor executor = loader.addExecutor(host, port);
// When a flow is assigned an executor, it is no longer in queued state
- ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
loader.assignExecutor(executor.getId(), flow.getExecutionId());
Assert.assertTrue(queuedFlows.isEmpty());
// When flow status is finished, it is no longer in queued state
- ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
loader.uploadExecutableFlow(flow2);
flow2.setStatus(Status.SUCCEEDED);
loader.updateExecutableFlow(flow2);
@@ -495,17 +492,18 @@ public class JdbcExecutorLoaderTest {
return;
}
- ExecutorLoader loader = createLoader();
+ final ExecutorLoader loader = createLoader();
- ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow);
- ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
loader.uploadExecutableFlow(flow2);
- List<Pair<ExecutionReference, ExecutableFlow>> fetchedQueuedFlows = loader.fetchQueuedFlows();
+ final List<Pair<ExecutionReference, ExecutableFlow>> fetchedQueuedFlows = loader
+ .fetchQueuedFlows();
Assert.assertEquals(2, fetchedQueuedFlows.size());
- Pair<ExecutionReference, ExecutableFlow> fetchedFlow1 = fetchedQueuedFlows.get(0);
- Pair<ExecutionReference, ExecutableFlow> fetchedFlow2 = fetchedQueuedFlows.get(1);
+ final Pair<ExecutionReference, ExecutableFlow> fetchedFlow1 = fetchedQueuedFlows.get(0);
+ final Pair<ExecutionReference, ExecutableFlow> fetchedFlow2 = fetchedQueuedFlows.get(1);
Assert.assertEquals(flow.getExecutionId(), fetchedFlow1.getSecond().getExecutionId());
Assert.assertEquals(flow.getFlowId(), fetchedFlow1.getSecond().getFlowId());
@@ -521,8 +519,8 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- List<Executor> executors = loader.fetchAllExecutors();
+ final ExecutorLoader loader = createLoader();
+ final List<Executor> executors = loader.fetchAllExecutors();
Assert.assertEquals(executors.size(), 0);
}
@@ -532,8 +530,8 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- List<Executor> executors = loader.fetchActiveExecutors();
+ final ExecutorLoader loader = createLoader();
+ final List<Executor> executors = loader.fetchActiveExecutors();
Assert.assertEquals(executors.size(), 0);
}
@@ -543,8 +541,8 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- Executor executor = loader.fetchExecutor(0);
+ final ExecutorLoader loader = createLoader();
+ final Executor executor = loader.fetchExecutor(0);
Assert.assertEquals(executor, null);
}
@@ -554,8 +552,8 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- Executor executor = loader.fetchExecutor("localhost", 12345);
+ final ExecutorLoader loader = createLoader();
+ final Executor executor = loader.fetchExecutor("localhost", 12345);
Assert.assertEquals(executor, null);
}
@@ -565,9 +563,9 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- Executor executor = new Executor(1, "localhost", 12345, true);
- List<ExecutorLogEvent> executorEvents =
+ final ExecutorLoader loader = createLoader();
+ final Executor executor = new Executor(1, "localhost", 12345, true);
+ final List<ExecutorLogEvent> executorEvents =
loader.getExecutorEvents(executor, 5, 0);
Assert.assertEquals(executorEvents.size(), 0);
}
@@ -578,25 +576,25 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- int skip = 1;
- User user = new User("testUser");
- Executor executor = new Executor(1, "localhost", 12345, true);
- String message = "My message ";
- EventType[] events =
+ final ExecutorLoader loader = createLoader();
+ final int skip = 1;
+ final User user = new User("testUser");
+ final Executor executor = new Executor(1, "localhost", 12345, true);
+ final String message = "My message ";
+ final EventType[] events =
{ EventType.CREATED, EventType.HOST_UPDATE, EventType.INACTIVATION };
- for (EventType event : events) {
+ for (final EventType event : events) {
loader.postExecutorEvent(executor, event, user.getUserId(),
message + event.getNumVal());
}
- List<ExecutorLogEvent> eventLogs =
+ final List<ExecutorLogEvent> eventLogs =
loader.getExecutorEvents(executor, 10, skip);
Assert.assertTrue(eventLogs.size() == 2);
for (int index = 0; index < eventLogs.size(); ++index) {
- ExecutorLogEvent eventLog = eventLogs.get(index);
+ final ExecutorLogEvent eventLog = eventLogs.get(index);
Assert.assertEquals(eventLog.getExecutorId(), executor.getId());
Assert.assertEquals(eventLog.getUser(), user.getUserId());
Assert.assertEquals(eventLog.getType(), events[index + skip]);
@@ -611,14 +609,14 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
+ final ExecutorLoader loader = createLoader();
try {
- String host = "localhost";
- int port = 12345;
+ final String host = "localhost";
+ final int port = 12345;
loader.addExecutor(host, port);
loader.addExecutor(host, port);
Assert.fail("Expecting exception, but didn't get one");
- } catch (ExecutorManagerException ex) {
+ } catch (final ExecutorManagerException ex) {
System.out.println("Test true");
}
}
@@ -629,12 +627,12 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
+ final ExecutorLoader loader = createLoader();
try {
- Executor executor = new Executor(1, "localhost", 1234, true);
+ final Executor executor = new Executor(1, "localhost", 1234, true);
loader.updateExecutor(executor);
Assert.fail("Expecting exception, but didn't get one");
- } catch (ExecutorManagerException ex) {
+ } catch (final ExecutorManagerException ex) {
System.out.println("Test true");
}
clearDB();
@@ -646,10 +644,10 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- List<Executor> executors = addTestExecutors(loader);
- for (Executor executor : executors) {
- Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+ final ExecutorLoader loader = createLoader();
+ final List<Executor> executors = addTestExecutors(loader);
+ for (final Executor executor : executors) {
+ final Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
Assert.assertEquals(executor, fetchedExecutor);
}
}
@@ -660,13 +658,13 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- List<Executor> executors = addTestExecutors(loader);
+ final ExecutorLoader loader = createLoader();
+ final List<Executor> executors = addTestExecutors(loader);
executors.get(0).setActive(false);
loader.updateExecutor(executors.get(0));
- List<Executor> fetchedExecutors = loader.fetchAllExecutors();
+ final List<Executor> fetchedExecutors = loader.fetchAllExecutors();
Assert.assertEquals(executors.size(), fetchedExecutors.size());
Assert.assertArrayEquals(executors.toArray(), fetchedExecutors.toArray());
@@ -678,13 +676,13 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- List<Executor> executors = addTestExecutors(loader);
+ final ExecutorLoader loader = createLoader();
+ final List<Executor> executors = addTestExecutors(loader);
executors.get(0).setActive(false);
loader.updateExecutor(executors.get(0));
- List<Executor> fetchedExecutors = loader.fetchActiveExecutors();
+ final List<Executor> fetchedExecutors = loader.fetchActiveExecutors();
Assert.assertEquals(executors.size(), fetchedExecutors.size() + 1);
executors.remove(0);
@@ -697,19 +695,19 @@ public class JdbcExecutorLoaderTest {
if (!isTestSetup()) {
return;
}
- ExecutorLoader loader = createLoader();
- List<Executor> executors = addTestExecutors(loader);
- for (Executor executor : executors) {
- Executor fetchedExecutor =
+ final ExecutorLoader loader = createLoader();
+ final List<Executor> executors = addTestExecutors(loader);
+ for (final Executor executor : executors) {
+ final Executor fetchedExecutor =
loader.fetchExecutor(executor.getHost(), executor.getPort());
Assert.assertEquals(executor, fetchedExecutor);
}
}
/* Helper method used in methods testing jdbc interface for executors table */
- private List<Executor> addTestExecutors(ExecutorLoader loader)
+ private List<Executor> addTestExecutors(final ExecutorLoader loader)
throws ExecutorManagerException {
- List<Executor> executors = new ArrayList<Executor>();
+ final List<Executor> executors = new ArrayList<>();
executors.add(loader.addExecutor("localhost1", 12345));
executors.add(loader.addExecutor("localhost2", 12346));
executors.add(loader.addExecutor("localhost1", 12347));
@@ -723,14 +721,14 @@ public class JdbcExecutorLoaderTest {
return;
}
- ExecutorLoader loader = createLoader();
- Executor executor = loader.addExecutor("localhost1", 12345);
+ final ExecutorLoader loader = createLoader();
+ final Executor executor = loader.addExecutor("localhost1", 12345);
Assert.assertTrue(executor.isActive());
executor.setActive(false);
loader.updateExecutor(executor);
- Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+ final Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
Assert.assertEquals(executor.getHost(), fetchedExecutor.getHost());
Assert.assertEquals(executor.getId(), fetchedExecutor.getId());
@@ -745,11 +743,11 @@ public class JdbcExecutorLoaderTest {
return;
}
- ExecutorLoader loader = createLoader();
- Executor executor = loader.addExecutor("localhost1", 12345);
+ final ExecutorLoader loader = createLoader();
+ final Executor executor = loader.addExecutor("localhost1", 12345);
Assert.assertNotNull(executor);
loader.removeExecutor("localhost1", 12345);
- Executor fetchedExecutor = loader.fetchExecutor("localhost1", 12345);
+ final Executor fetchedExecutor = loader.fetchExecutor("localhost1", 12345);
Assert.assertNull(fetchedExecutor);
}
@@ -760,8 +758,8 @@ public class JdbcExecutorLoaderTest {
return;
}
- ExecutorLoader loader = createLoader();
- Executor executor = loader.addExecutor("localhost1", 12345);
+ final ExecutorLoader loader = createLoader();
+ final Executor executor = loader.addExecutor("localhost1", 12345);
Assert.assertTrue(executor.isActive());
executor.setActive(false);
@@ -783,23 +781,23 @@ public class JdbcExecutorLoaderTest {
}
// Upload flow1, executor assigned
- ExecutorLoader loader = createLoader();
- ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutorLoader loader = createLoader();
+ final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow1);
- Executor executor = loader.addExecutor("test", 1);
+ final Executor executor = loader.addExecutor("test", 1);
loader.assignExecutor(executor.getId(), flow1.getExecutionId());
// Upload flow2, executor not assigned
- ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+ final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
loader.uploadExecutableFlow(flow2);
- Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
loader.fetchActiveFlows();
Assert.assertTrue(activeFlows1.containsKey(flow1.getExecutionId()));
Assert.assertFalse(activeFlows1.containsKey(flow2.getExecutionId()));
- ExecutableFlow flow1Result =
+ final ExecutableFlow flow1Result =
activeFlows1.get(flow1.getExecutionId()).getSecond();
Assert.assertNotNull(flow1Result);
Assert.assertTrue(flow1 != flow1Result);
@@ -820,11 +818,11 @@ public class JdbcExecutorLoaderTest {
return;
}
- ExecutorLoader loader = createLoader();
- ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutorLoader loader = createLoader();
+ final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
// Flow status is PREPARING when uploaded, should be in active flows
loader.uploadExecutableFlow(flow1);
- Executor executor = loader.addExecutor("test", 1);
+ final Executor executor = loader.addExecutor("test", 1);
loader.assignExecutor(executor.getId(), flow1.getExecutionId());
Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
@@ -854,16 +852,16 @@ public class JdbcExecutorLoaderTest {
return;
}
- ExecutorLoader loader = createLoader();
- ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutorLoader loader = createLoader();
+ final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow1);
- Executor executor = loader.addExecutor("test", 1);
+ final Executor executor = loader.addExecutor("test", 1);
loader.assignExecutor(executor.getId(), flow1.getExecutionId());
- ExecutionReference ref1 =
+ final ExecutionReference ref1 =
new ExecutionReference(flow1.getExecutionId(), executor);
loader.addActiveExecutableReference(ref1);
- Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
loader.fetchActiveFlows();
Assert.assertTrue(activeFlows1.containsKey(flow1.getExecutionId()));
@@ -878,17 +876,17 @@ public class JdbcExecutorLoaderTest {
return;
}
- ExecutorLoader loader = createLoader();
- ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutorLoader loader = createLoader();
+ final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow1);
- Executor executor = loader.addExecutor("test", 1);
+ final Executor executor = loader.addExecutor("test", 1);
loader.assignExecutor(executor.getId(), flow1.getExecutionId());
- Pair<ExecutionReference, ExecutableFlow> activeFlow1 =
+ final Pair<ExecutionReference, ExecutableFlow> activeFlow1 =
loader.fetchActiveFlowByExecId(flow1.getExecutionId());
- ExecutionReference execRef1 = activeFlow1.getFirst();
- ExecutableFlow execFlow1 = activeFlow1.getSecond();
+ final ExecutionReference execRef1 = activeFlow1.getFirst();
+ final ExecutableFlow execFlow1 = activeFlow1.getSecond();
Assert.assertNotNull(execRef1);
Assert.assertNotNull(execFlow1);
Assert.assertEquals(flow1.getExecutionId(), execFlow1.getExecutionId());
@@ -903,14 +901,14 @@ public class JdbcExecutorLoaderTest {
return;
}
- ExecutorLoader loader = createLoader();
- ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutorLoader loader = createLoader();
+ final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow1);
flow1.setStatus(Status.SUCCEEDED);
flow1.setEndTime(DateTimeUtils.currentTimeMillis());
loader.updateExecutableFlow(flow1);
//Flow just finished. Fetch recently finished flows immediately. Should get it.
- List<ExecutableFlow> flows = loader.fetchRecentlyFinishedFlows(
+ final List<ExecutableFlow> flows = loader.fetchRecentlyFinishedFlows(
RECENTLY_FINISHED_LIFETIME);
Assert.assertEquals(1, flows.size());
Assert.assertEquals(flow1.getExecutionId(), flows.get(0).getExecutionId());
@@ -925,8 +923,8 @@ public class JdbcExecutorLoaderTest {
return;
}
- ExecutorLoader loader = createLoader();
- ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+ final ExecutorLoader loader = createLoader();
+ final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
loader.uploadExecutableFlow(flow1);
flow1.setStatus(Status.SUCCEEDED);
flow1.setEndTime(DateTimeUtils.currentTimeMillis());
@@ -937,29 +935,29 @@ public class JdbcExecutorLoaderTest {
flow1.setEndTime(DateTimeUtils.currentTimeMillis());
loader.updateExecutableFlow(flow1);
//Fetch recently finished flows within 1 min. Should be empty.
- List<ExecutableFlow> flows = loader
+ final List<ExecutableFlow> flows = loader
.fetchRecentlyFinishedFlows(RECENTLY_FINISHED_LIFETIME);
Assert.assertTrue(flows.isEmpty());
}
@Ignore @Test
public void testSmallUploadLog() throws ExecutorManagerException {
- File logDir = new File(UNIT_BASE_DIR + "logtest");
- File[] smalllog =
+ final File logDir = new File(UNIT_BASE_DIR + "logtest");
+ final File[] smalllog =
{ new File(logDir, "log1.log"), new File(logDir, "log2.log"),
new File(logDir, "log3.log") };
- ExecutorLoader loader = createLoader();
+ final ExecutorLoader loader = createLoader();
loader.uploadLogFile(1, "smallFiles", 0, smalllog);
- LogData data = loader.fetchLogs(1, "smallFiles", 0, 0, 50000);
+ final LogData data = loader.fetchLogs(1, "smallFiles", 0, 0, 50000);
Assert.assertNotNull(data);
Assert.assertEquals("Logs length is " + data.getLength(), data.getLength(),
53);
System.out.println(data.toString());
- LogData data2 = loader.fetchLogs(1, "smallFiles", 0, 10, 20);
+ final LogData data2 = loader.fetchLogs(1, "smallFiles", 0, 10, 20);
System.out.println(data2.toString());
Assert.assertNotNull(data2);
Assert.assertEquals("Logs length is " + data2.getLength(),
@@ -969,40 +967,40 @@ public class JdbcExecutorLoaderTest {
@Ignore @Test
public void testLargeUploadLog() throws ExecutorManagerException {
- File logDir = new File(UNIT_BASE_DIR + "logtest");
+ final File logDir = new File(UNIT_BASE_DIR + "logtest");
// Multiple of 255 for Henry the Eigth
- File[] largelog =
+ final File[] largelog =
{ new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"),
new File(logDir, "largeLog3.log") };
- ExecutorLoader loader = createLoader();
+ final ExecutorLoader loader = createLoader();
loader.uploadLogFile(1, "largeFiles", 0, largelog);
- LogData logsResult = loader.fetchLogs(1, "largeFiles", 0, 0, 64000);
+ final LogData logsResult = loader.fetchLogs(1, "largeFiles", 0, 0, 64000);
Assert.assertNotNull(logsResult);
Assert.assertEquals("Logs length is " + logsResult.getLength(),
logsResult.getLength(), 64000);
- LogData logsResult2 = loader.fetchLogs(1, "largeFiles", 0, 1000, 64000);
+ final LogData logsResult2 = loader.fetchLogs(1, "largeFiles", 0, 1000, 64000);
Assert.assertNotNull(logsResult2);
Assert.assertEquals("Logs length is " + logsResult2.getLength(),
logsResult2.getLength(), 64000);
- LogData logsResult3 = loader.fetchLogs(1, "largeFiles", 0, 330000, 400000);
+ final LogData logsResult3 = loader.fetchLogs(1, "largeFiles", 0, 330000, 400000);
Assert.assertNotNull(logsResult3);
Assert.assertEquals("Logs length is " + logsResult3.getLength(),
logsResult3.getLength(), 5493);
- LogData logsResult4 = loader.fetchLogs(1, "largeFiles", 0, 340000, 400000);
+ final LogData logsResult4 = loader.fetchLogs(1, "largeFiles", 0, 340000, 400000);
Assert.assertNull(logsResult4);
- LogData logsResult5 = loader.fetchLogs(1, "largeFiles", 0, 153600, 204800);
+ final LogData logsResult5 = loader.fetchLogs(1, "largeFiles", 0, 153600, 204800);
Assert.assertNotNull(logsResult5);
Assert.assertEquals("Logs length is " + logsResult5.getLength(),
logsResult5.getLength(), 181893);
- LogData logsResult6 = loader.fetchLogs(1, "largeFiles", 0, 150000, 250000);
+ final LogData logsResult6 = loader.fetchLogs(1, "largeFiles", 0, 150000, 250000);
Assert.assertNotNull(logsResult6);
Assert.assertEquals("Logs length is " + logsResult6.getLength(),
logsResult6.getLength(), 185493);
@@ -1013,24 +1011,24 @@ public class JdbcExecutorLoaderTest {
public void testRemoveExecutionLogsByTime() throws ExecutorManagerException,
IOException, InterruptedException {
- ExecutorLoader loader = createLoader();
+ final ExecutorLoader loader = createLoader();
- File logDir = new File(UNIT_BASE_DIR + "logtest");
+ final File logDir = new File(UNIT_BASE_DIR + "logtest");
// Multiple of 255 for Henry the Eigth
- File[] largelog =
+ final File[] largelog =
{ new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"),
new File(logDir, "largeLog3.log") };
- DateTime time1 = DateTime.now();
+ final DateTime time1 = DateTime.now();
loader.uploadLogFile(1, "oldlog", 0, largelog);
// sleep for 5 seconds
Thread.currentThread().sleep(5000);
loader.uploadLogFile(2, "newlog", 0, largelog);
- DateTime time2 = time1.plusMillis(2500);
+ final DateTime time2 = time1.plusMillis(2500);
- int count = loader.removeExecutionLogsByTime(time2.getMillis());
+ final int count = loader.removeExecutionLogsByTime(time2.getMillis());
System.out.print("Removed " + count + " records");
LogData logs = loader.fetchLogs(1, "oldlog", 0, 0, 22222);
Assert.assertTrue(logs == null);
@@ -1038,16 +1036,16 @@ public class JdbcExecutorLoaderTest {
Assert.assertFalse(logs == null);
}
- private ExecutableFlow createExecutableFlow(int executionId, String flowName)
+ private ExecutableFlow createExecutableFlow(final int executionId, final String flowName)
throws IOException {
- ExecutableFlow execFlow =
+ final ExecutableFlow execFlow =
TestUtils.createExecutableFlow("exectest1", flowName);
execFlow.setExecutionId(executionId);
return execFlow;
}
private ExecutorLoader createLoader() {
- Props props = new Props();
+ final Props props = new Props();
props.put("database.type", "mysql");
props.put("mysql.host", host);
@@ -1057,7 +1055,8 @@ public class JdbcExecutorLoaderTest {
props.put("mysql.password", password);
props.put("mysql.numconnections", numConnections);
- return new JdbcExecutorLoader(props, new CommonMetrics(new MetricRegistry()));
+ return new JdbcExecutorLoader(props,
+ new CommonMetrics(new MetricsManager(new MetricRegistry())));
}
private boolean isTestSetup() {
@@ -1072,7 +1071,7 @@ public class JdbcExecutorLoaderTest {
public static class CountHandler implements ResultSetHandler<Integer> {
@Override
- public Integer handle(ResultSet rs) throws SQLException {
+ public Integer handle(final ResultSet rs) throws SQLException {
int val = 0;
while (rs.next()) {
val++;