Details
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index 71fb6b9..1397bb2 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -26,7 +26,6 @@ import azkaban.db.DatabaseOperatorImpl;
import azkaban.db.H2FileDataSource;
import azkaban.db.MySQLDataSource;
import azkaban.executor.ExecutorLoader;
-import azkaban.executor.ExecutorManager;
import azkaban.executor.JdbcExecutorLoader;
import azkaban.project.JdbcProjectImpl;
import azkaban.project.ProjectLoader;
diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index 54d1b70..f97c7fe 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -20,13 +20,10 @@ package azkaban;
/**
* Constants
*
- * Global place for storing constants.
- * Conventions:
- * <p>
- * - All internal constants to be put in the root level ie. {@link Constants} class <p>
- * - All Configuration keys to be put in {@link ConfigurationKeys} class <p>
- * - Flow level Properties keys go to {@link FlowProperties} <p>
- * - Job level Properties keys go to {@link JobProperties} <p>
+ * Global place for storing constants. Conventions: <p> - All internal constants to be put in the
+ * root level ie. {@link Constants} class <p> - All Configuration keys to be put in {@link
+ * ConfigurationKeys} class <p> - Flow level Properties keys go to {@link FlowProperties} <p> - Job
+ * level Properties keys go to {@link JobProperties} <p>
*/
public class Constants {
@@ -54,6 +51,7 @@ public class Constants {
public static final long DEFAULT_SCHEDULE_END_EPOCH_TIME = 2524608000000L;
public static class ConfigurationKeys {
+
// These properties are configurable through azkaban.properties
public static final String AZKABAN_PID_FILENAME = "azkaban.pid.filename";
diff --git a/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java b/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java
index 70b6f83..b6e63db 100644
--- a/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java
+++ b/azkaban-common/src/main/java/azkaban/executor/AlerterHolder.java
@@ -22,7 +22,6 @@ import azkaban.utils.Emailer;
import azkaban.utils.FileIOUtils;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
-import azkaban.metrics.CommonMetrics;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.File;
diff --git a/azkaban-common/src/main/java/azkaban/executor/AssignExecutorDao.java b/azkaban-common/src/main/java/azkaban/executor/AssignExecutorDao.java
index c526d4f..a4e01a5 100644
--- a/azkaban-common/src/main/java/azkaban/executor/AssignExecutorDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/AssignExecutorDao.java
@@ -29,7 +29,7 @@ public class AssignExecutorDao {
@Inject
public AssignExecutorDao(final DatabaseOperator dbOperator,
- final ExecutorDao executorDao) {
+ final ExecutorDao executorDao) {
this.dbOperator = dbOperator;
this.executorDao = executorDao;
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
index ad63a36..a82486a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
@@ -157,14 +157,14 @@ public class ExecutableNode {
this.updateTime = updateTime;
}
- public void setKilledBySLA(boolean killedBySLA) {
- this.killedBySLA = killedBySLA;
- }
-
public boolean isKilledBySLA() {
return this.killedBySLA;
}
+ public void setKilledBySLA(final boolean killedBySLA) {
+ this.killedBySLA = killedBySLA;
+ }
+
public void addOutNode(final String exNode) {
this.outNodes.add(exNode);
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index 457c4f5..5ccac59 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -88,7 +88,7 @@ public class ExecutionFlowDao {
}
List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
- final int skip, final int num)
+ final int skip, final int num)
throws ExecutorManagerException {
try {
return this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY,
@@ -109,8 +109,8 @@ public class ExecutionFlowDao {
}
List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
- final int skip, final int num,
- final Status status)
+ final int skip, final int num,
+ final Status status)
throws ExecutorManagerException {
try {
return this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS,
@@ -133,9 +133,9 @@ public class ExecutionFlowDao {
}
List<ExecutableFlow> fetchFlowHistory(final String projContain, final String flowContains,
- final String userNameContains, final int status,
- final long startTime, final long endTime,
- final int skip, final int num)
+ final String userNameContains, final int status,
+ final long startTime, final long endTime,
+ final int skip, final int num)
throws ExecutorManagerException {
String query = FetchExecutableFlows.FETCH_BASE_EXECUTABLE_FLOW_QUERY;
final List<Object> params = new ArrayList<>();
@@ -318,6 +318,7 @@ public class ExecutionFlowDao {
*/
private static class FetchQueuedExecutableFlows implements
ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
+
// Select queued unassigned flows
private static final String FETCH_QUEUED_EXECUTABLE_FLOW =
"SELECT exec_id, enc_type, flow_data FROM execution_flows"
@@ -360,6 +361,7 @@ public class ExecutionFlowDao {
private static class FetchRecentlyFinishedFlows implements
ResultSetHandler<List<ExecutableFlow>> {
+
// Execution_flows table is already indexed by end_time
private static final String FETCH_RECENTLY_FINISHED_FLOW =
"SELECT exec_id, enc_type, flow_data FROM execution_flows "
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
index 88fe492..da13928 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
@@ -174,9 +174,9 @@ public class ExecutionJobDao {
}
public List<ExecutableJobInfo> fetchJobHistory(final int projectId,
- final String jobId,
- final int skip,
- final int size) throws ExecutorManagerException {
+ final String jobId,
+ final int skip,
+ final int size) throws ExecutorManagerException {
try {
final List<ExecutableJobInfo> info =
this.dbOperator.query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE,
@@ -229,6 +229,7 @@ public class ExecutionJobDao {
private static class FetchExecutableJobHandler implements
ResultSetHandler<List<ExecutableJobInfo>> {
+
private static final String FETCH_EXECUTABLE_NODE =
"SELECT exec_id, project_id, version, flow_id, job_id, "
+ "start_time, end_time, status, attempt "
@@ -247,7 +248,7 @@ public class ExecutionJobDao {
@Override
public List<ExecutableJobInfo> handle(final ResultSet rs) throws SQLException {
if (!rs.next()) {
- return Collections.<ExecutableJobInfo> emptyList();
+ return Collections.<ExecutableJobInfo>emptyList();
}
final List<ExecutableJobInfo> execNodes = new ArrayList<>();
@@ -274,6 +275,7 @@ public class ExecutionJobDao {
private static class FetchExecutableJobPropsHandler implements
ResultSetHandler<Pair<Props, Props>> {
+
private static final String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE =
"SELECT output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
private static final String FETCH_INPUT_PARAM_EXECUTABLE_NODE =
@@ -335,6 +337,7 @@ public class ExecutionJobDao {
private static class FetchExecutableJobAttachmentsHandler implements
ResultSetHandler<String> {
+
private static final String FETCH_ATTACHMENTS_EXECUTABLE_NODE =
"SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";
@@ -354,4 +357,4 @@ public class ExecutionJobDao {
return attachmentsJson;
}
}
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
index 0821814..707c1c8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
@@ -55,8 +55,8 @@ public class ExecutionLogsDao {
// TODO kunkun-tang: the interface's parameter is called endByte, but actually is length.
LogData fetchLogs(final int execId, final String name, final int attempt,
- final int startByte,
- final int length) throws ExecutorManagerException {
+ final int startByte,
+ final int length) throws ExecutorManagerException {
final FetchLogsHandler handler = new FetchLogsHandler(startByte, length + startByte);
try {
return this.dbOperator.query(FetchLogsHandler.FETCH_LOGS, handler,
@@ -68,7 +68,7 @@ public class ExecutionLogsDao {
}
public void uploadLogFile(final int execId, final String name, final int attempt,
- final File... files) throws ExecutorManagerException {
+ final File... files) throws ExecutorManagerException {
final SQLTransaction<Integer> transaction = transOperator -> {
uploadLogFile(transOperator, execId, name, attempt, files, this.defaultEncodingType);
transOperator.getConnection().commit();
@@ -82,8 +82,9 @@ public class ExecutionLogsDao {
}
}
- private void uploadLogFile(final DatabaseTransOperator transOperator, final int execId, final String name,
- final int attempt, final File[] files, final EncodingType encType)
+ private void uploadLogFile(final DatabaseTransOperator transOperator, final int execId,
+ final String name,
+ final int attempt, final File[] files, final EncodingType encType)
throws SQLException {
// 50K buffer... if logs are greater than this, we chunk.
// However, we better prevent large log files from being uploaded somehow
@@ -148,10 +149,10 @@ public class ExecutionLogsDao {
}
private void uploadLogPart(final DatabaseTransOperator transOperator, final int execId,
- final String name,
- final int attempt, final int startByte, final int endByte,
- final EncodingType encType,
- final byte[] buffer, final int length)
+ final String name,
+ final int attempt, final int startByte, final int endByte,
+ final EncodingType encType,
+ final byte[] buffer, final int length)
throws SQLException, IOException {
final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs "
+ "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
@@ -228,4 +229,4 @@ public class ExecutionLogsDao {
new String(buffer, result.getFirst(), result.getSecond(), StandardCharsets.UTF_8));
}
}
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
index 35a1882..12653b4 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorDao.java
@@ -34,7 +34,7 @@ public class ExecutorDao {
private final DatabaseOperator dbOperator;
@Inject
- public ExecutorDao (final DatabaseOperator dbOperator) {
+ public ExecutorDao(final DatabaseOperator dbOperator) {
this.dbOperator = dbOperator;
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
index d4e581c..90e17f0 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
@@ -39,7 +39,7 @@ public class ExecutorEventsDao {
}
public void postExecutorEvent(final Executor executor, final EventType type, final String user,
- final String message) throws ExecutorManagerException {
+ final String message) throws ExecutorManagerException {
final String INSERT_PROJECT_EVENTS =
"INSERT INTO executor_events (executor_id, event_type, event_time, username, message) values (?,?,?,?,?)";
try {
@@ -51,7 +51,7 @@ public class ExecutorEventsDao {
}
public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
- final int offset)
+ final int offset)
throws ExecutorManagerException {
try {
return this.dbOperator.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
@@ -95,4 +95,4 @@ public class ExecutorEventsDao {
return events;
}
}
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
index d269f9a..c743499 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
@@ -20,14 +20,9 @@ import java.io.IOException;
import org.codehaus.jackson.map.ObjectMapper;
/**
- * Class that exposes the statistics from the executor server.
- * List of the statistics -
- * remainingMemoryPercent;
- * remainingMemory;
- * remainingFlowCapacity;
- * numberOfAssignedFlows;
- * lastDispatchedTime;
- * cpuUsage;
+ * Class that exposes the statistics from the executor server. List of the statistics -
+ * remainingMemoryPercent; remainingMemory; remainingFlowCapacity; numberOfAssignedFlows;
+ * lastDispatchedTime; cpuUsage;
*/
public class ExecutorInfo implements java.io.Serializable {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index a8a719d..7702a73 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
public interface ExecutorLoader {
+
void uploadExecutableFlow(ExecutableFlow flow)
throws ExecutorManagerException;
@@ -63,7 +64,6 @@ public interface ExecutorLoader {
* </pre>
*
* @return List<Executor>
- * @throws ExecutorManagerException
*/
List<Executor> fetchAllExecutors() throws ExecutorManagerException;
@@ -76,7 +76,6 @@ public interface ExecutorLoader {
* </pre>
*
* @return List<Executor>
- * @throws ExecutorManagerException
*/
List<Executor> fetchActiveExecutors() throws ExecutorManagerException;
@@ -90,10 +89,9 @@ public interface ExecutorLoader {
* </pre>
*
* @return Executor
- * @throws ExecutorManagerException
*/
Executor fetchExecutor(String host, int port)
- throws ExecutorManagerException;
+ throws ExecutorManagerException;
/**
* <pre>
@@ -104,7 +102,6 @@ public interface ExecutorLoader {
* </pre>
*
* @return Executor
- * @throws ExecutorManagerException
*/
Executor fetchExecutor(int executorId) throws ExecutorManagerException;
@@ -118,10 +115,9 @@ public interface ExecutorLoader {
* </pre>
*
* @return Executor
- * @throws ExecutorManagerException
*/
Executor addExecutor(String host, int port)
- throws ExecutorManagerException;
+ throws ExecutorManagerException;
/**
* <pre>
@@ -131,9 +127,6 @@ public interface ExecutorLoader {
* 2. throws an Exception if there is no executor with the given id
* 3. return null when no executor is found with the given executorId
* </pre>
- *
- * @param executorId
- * @throws ExecutorManagerException
*/
void updateExecutor(Executor executor) throws ExecutorManagerException;
@@ -144,9 +137,6 @@ public interface ExecutorLoader {
* 1. throws an Exception in case of a SQL issue
* 2. throws an Exception if there is no executor in the table* </pre>
* </pre>
- * @param host
- * @param port
- * @throws ExecutorManagerException
*/
void removeExecutor(String host, int port) throws ExecutorManagerException;
@@ -157,14 +147,10 @@ public interface ExecutorLoader {
* Note: throws an Exception in case of a SQL issue
* </pre>
*
- * @param executor
- * @param type
- * @param user
- * @param message
* @return isSuccess
*/
void postExecutorEvent(Executor executor, EventType type, String user,
- String message) throws ExecutorManagerException;
+ String message) throws ExecutorManagerException;
/**
* <pre>
@@ -175,14 +161,10 @@ public interface ExecutorLoader {
* 2. Returns an empty list in case of no events
* </pre>
*
- * @param executor
- * @param num
- * @param skip
* @return List<ExecutorLogEvent>
- * @throws ExecutorManagerException
*/
List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
- int offset) throws ExecutorManagerException;
+ int offset) throws ExecutorManagerException;
void addActiveExecutableReference(ExecutionReference ref)
throws ExecutorManagerException;
@@ -197,10 +179,6 @@ public interface ExecutorLoader {
* Note:-
* throws an Exception in case of a SQL issue
* </pre>
- *
- * @param executorId
- * @param execId
- * @throws ExecutorManagerException
*/
void unassignExecutor(int executionId) throws ExecutorManagerException;
@@ -211,13 +189,9 @@ public interface ExecutorLoader {
* 1. throws an Exception in case of a SQL issue
* 2. throws an Exception in case executionId or executorId do not exist
* </pre>
- *
- * @param executorId
- * @param execId
- * @throws ExecutorManagerException
*/
void assignExecutor(int executorId, int execId)
- throws ExecutorManagerException;
+ throws ExecutorManagerException;
/**
* <pre>
@@ -227,12 +201,10 @@ public interface ExecutorLoader {
* 2. return null when no executor is found with the given executionId
* </pre>
*
- * @param executionId
* @return fetched Executor
- * @throws ExecutorManagerException
*/
Executor fetchExecutorByExecutionId(int executionId)
- throws ExecutorManagerException;
+ throws ExecutorManagerException;
/**
* <pre>
@@ -243,10 +215,9 @@ public interface ExecutorLoader {
* </pre>
*
* @return List of queued flows and corresponding execution reference
- * @throws ExecutorManagerException
*/
List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
- throws ExecutorManagerException;
+ throws ExecutorManagerException;
boolean updateExecutableReference(int execId, long updateTime)
throws ExecutorManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
index cd0d38c..d763ff9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
@@ -61,8 +61,8 @@ public class ExecutorLogEvent {
}
/**
- * Log event type messages. Do not change the numeric representation of each
- * enum. Only represent from 0 to 255 different codes.
+ * Log event type messages. Do not change the numeric representation of each enum. Only represent
+ * from 0 to 255 different codes.
*/
public enum EventType {
ERROR(128), HOST_UPDATE(1), PORT_UPDATE(2), ACTIVATION(3), INACTIVATION(4),
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index a0990fc..b4632ec 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -17,8 +17,20 @@
package azkaban.executor;
import azkaban.Constants;
+import azkaban.alert.Alerter;
+import azkaban.event.EventHandler;
+import azkaban.executor.selector.ExecutorComparator;
+import azkaban.executor.selector.ExecutorFilter;
+import azkaban.executor.selector.ExecutorSelector;
import azkaban.metrics.CommonMetrics;
+import azkaban.project.Project;
+import azkaban.project.ProjectWhitelist;
+import azkaban.utils.FileIOUtils.JobMetaData;
+import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.FlowUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -44,92 +56,65 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
-import azkaban.alert.Alerter;
-import azkaban.event.Event;
-import azkaban.event.Event.Type;
-import azkaban.event.EventData;
-import azkaban.event.EventHandler;
-import azkaban.executor.selector.ExecutorComparator;
-import azkaban.executor.selector.ExecutorFilter;
-import azkaban.executor.selector.ExecutorSelector;
-import azkaban.project.Project;
-import azkaban.project.ProjectWhitelist;
-import azkaban.utils.FileIOUtils.JobMetaData;
-import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.JSONUtils;
-import azkaban.utils.Pair;
-import azkaban.utils.Props;
-
/**
* Executor manager used to manage the client side job.
- *
*/
@Singleton
public class ExecutorManager extends EventHandler implements
ExecutorManagerAdapter {
+
+ public static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
+ "azkaban.use.multiple.executors";
static final String AZKABAN_EXECUTOR_SELECTOR_FILTERS =
"azkaban.executorselector.filters";
static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
"azkaban.executorselector.comparator.";
static final String AZKABAN_QUEUEPROCESSING_ENABLED =
- "azkaban.queueprocessing.enabled";
- public static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
- "azkaban.use.multiple.executors";
+ "azkaban.queueprocessing.enabled";
private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
- "azkaban.webserver.queue.size";
+ "azkaban.webserver.queue.size";
private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS =
- "azkaban.activeexecutor.refresh.milisecinterval";
+ "azkaban.activeexecutor.refresh.milisecinterval";
private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
- "azkaban.activeexecutor.refresh.flowinterval";
+ "azkaban.activeexecutor.refresh.flowinterval";
private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
"azkaban.executorinfo.refresh.maxThreads";
private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED =
- "azkaban.maxDispatchingErrors";
-
- private static Logger logger = Logger.getLogger(ExecutorManager.class);
- private ExecutorLoader executorLoader;
-
- private CleanerThread cleanerThread;
-
- private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
- new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
-
-
- QueuedExecutions queuedFlows;
-
- final private Set<Executor> activeExecutors = new HashSet<Executor>();
- private QueueProcessorThread queueProcessor;
- private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
-
- private ExecutingManagerUpdaterThread executingManager;
+ "azkaban.maxDispatchingErrors";
// 12 weeks
private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
* 24 * 60 * 60 * 1000L;
private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(10);
- private long lastCleanerThreadCheckTime = -1;
-
- private long lastThreadCheckTime = -1;
- private String updaterStage = "not started";
-
+ private static final Logger logger = Logger.getLogger(ExecutorManager.class);
+ final private Set<Executor> activeExecutors = new HashSet<>();
private final AlerterHolder alerterHolder;
-
- File cacheDir;
-
private final Props azkProps;
private final CommonMetrics commonMetrics;
+ private final ExecutorLoader executorLoader;
+ private final CleanerThread cleanerThread;
+ private final ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
+ new ConcurrentHashMap<>();
+ private final ExecutingManagerUpdaterThread executingManager;
+ QueuedExecutions queuedFlows;
+ File cacheDir;
+ private QueueProcessorThread queueProcessor;
+ private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
+ private long lastCleanerThreadCheckTime = -1;
+ private long lastThreadCheckTime = -1;
+ private String updaterStage = "not started";
private List<String> filterList;
private Map<String, Integer> comparatorWeightsMap;
private long lastSuccessfulExecutorInfoRefresh;
private ExecutorService executorInforRefresherService;
@Inject
- public ExecutorManager(Props azkProps, ExecutorLoader loader, AlerterHolder alerterHolder,
- CommonMetrics commonMetrics) throws ExecutorManagerException {
+ public ExecutorManager(final Props azkProps, final ExecutorLoader loader,
+ final AlerterHolder alerterHolder,
+ final CommonMetrics commonMetrics) throws ExecutorManagerException {
this.alerterHolder = alerterHolder;
this.azkProps = azkProps;
this.commonMetrics = commonMetrics;
@@ -137,213 +122,212 @@ public class ExecutorManager extends EventHandler implements
this.setupExecutors();
this.loadRunningFlows();
- queuedFlows = new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+ this.queuedFlows = new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
this.loadQueuedFlows();
- cacheDir = new File(azkProps.getString("cache.directory", "cache"));
+ this.cacheDir = new File(azkProps.getString("cache.directory", "cache"));
- executingManager = new ExecutingManagerUpdaterThread();
- executingManager.start();
+ this.executingManager = new ExecutingManagerUpdaterThread();
+ this.executingManager.start();
- if(isMultiExecutorMode()) {
+ if (isMultiExecutorMode()) {
setupMultiExecutorMode();
}
- long executionLogsRetentionMs =
+ final long executionLogsRetentionMs =
azkProps.getLong("execution.logs.retention.ms",
- DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+ DEFAULT_EXECUTION_LOGS_RETENTION_MS);
- cleanerThread = new CleanerThread(executionLogsRetentionMs);
- cleanerThread.start();
+ this.cleanerThread = new CleanerThread(executionLogsRetentionMs);
+ this.cleanerThread.start();
}
private void setupMultiExecutorMode() {
// initliatize hard filters for executor selector from azkaban.properties
- String filters = azkProps.getString(AZKABAN_EXECUTOR_SELECTOR_FILTERS, "");
+ final String filters = this.azkProps.getString(AZKABAN_EXECUTOR_SELECTOR_FILTERS, "");
if (filters != null) {
- filterList = Arrays.asList(StringUtils.split(filters, ","));
+ this.filterList = Arrays.asList(StringUtils.split(filters, ","));
}
// initliatize comparator feature weights for executor selector from
// azkaban.properties
- Map<String, String> compListStrings =
- azkProps.getMapByPrefix(AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
+ final Map<String, String> compListStrings =
+ this.azkProps.getMapByPrefix(AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
if (compListStrings != null) {
- comparatorWeightsMap = new TreeMap<String, Integer>();
- for (Map.Entry<String, String> entry : compListStrings.entrySet()) {
- comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
+ this.comparatorWeightsMap = new TreeMap<>();
+ for (final Map.Entry<String, String> entry : compListStrings.entrySet()) {
+ this.comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
}
}
- executorInforRefresherService =
- Executors.newFixedThreadPool(azkProps.getInt(
- AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));
+ this.executorInforRefresherService =
+ Executors.newFixedThreadPool(this.azkProps.getInt(
+ AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));
// configure queue processor
- queueProcessor =
- new QueueProcessorThread(azkProps.getBoolean(
- AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
- AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000), azkProps.getInt(
- AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5), azkProps.getInt(
- AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED, activeExecutors.size()));
+ this.queueProcessor =
+ new QueueProcessorThread(this.azkProps.getBoolean(
+ AZKABAN_QUEUEPROCESSING_ENABLED, true), this.azkProps.getLong(
+ AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000), this.azkProps.getInt(
+ AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5), this.azkProps.getInt(
+ AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED, this.activeExecutors.size()));
- queueProcessor.start();
+ this.queueProcessor.start();
}
/**
- *
* {@inheritDoc}
+ *
* @see azkaban.executor.ExecutorManagerAdapter#setupExecutors()
*/
@Override
public void setupExecutors() throws ExecutorManagerException {
- Set<Executor> newExecutors = new HashSet<Executor>();
+ final Set<Executor> newExecutors = new HashSet<>();
if (isMultiExecutorMode()) {
logger.info("Initializing multi executors from database");
- newExecutors.addAll(executorLoader.fetchActiveExecutors());
- } else if (azkProps.containsKey("executor.port")) {
+ newExecutors.addAll(this.executorLoader.fetchActiveExecutors());
+ } else if (this.azkProps.containsKey("executor.port")) {
// Add local executor, if specified as per properties
- String executorHost = azkProps.getString(Constants.ConfigurationKeys.EXECUTOR_HOST, "localhost");
- int executorPort = azkProps.getInt("executor.port");
+ final String executorHost = this.azkProps
+ .getString(Constants.ConfigurationKeys.EXECUTOR_HOST, "localhost");
+ final int executorPort = this.azkProps.getInt("executor.port");
logger.info(String.format("Initializing local executor %s:%d",
- executorHost, executorPort));
+ executorHost, executorPort));
Executor executor =
- executorLoader.fetchExecutor(executorHost, executorPort);
+ this.executorLoader.fetchExecutor(executorHost, executorPort);
if (executor == null) {
- executor = executorLoader.addExecutor(executorHost, executorPort);
+ executor = this.executorLoader.addExecutor(executorHost, executorPort);
} else if (!executor.isActive()) {
executor.setActive(true);
- executorLoader.updateExecutor(executor);
+ this.executorLoader.updateExecutor(executor);
}
newExecutors.add(new Executor(executor.getId(), executorHost,
- executorPort, true));
+ executorPort, true));
}
if (newExecutors.isEmpty()) {
logger.error("No active executor found");
throw new ExecutorManagerException("No active executor found");
- } else if(newExecutors.size() > 1 && !isMultiExecutorMode()) {
+ } else if (newExecutors.size() > 1 && !isMultiExecutorMode()) {
logger.error("Multiple local executors specified");
throw new ExecutorManagerException("Multiple local executors specified");
} else {
// clear all active executors, only if we have at least one new active
// executors
- activeExecutors.clear();
- activeExecutors.addAll(newExecutors);
+ this.activeExecutors.clear();
+ this.activeExecutors.addAll(newExecutors);
}
}
private boolean isMultiExecutorMode() {
- return azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false);
+ return this.azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false);
}
/**
* Refresh Executor stats for all the actie executors in this executorManager
*/
private void refreshExecutors() {
- synchronized (activeExecutors) {
+ synchronized (this.activeExecutors) {
- List<Pair<Executor, Future<String>>> futures =
- new ArrayList<Pair<Executor, Future<String>>>();
- for (final Executor executor : activeExecutors) {
+ final List<Pair<Executor, Future<String>>> futures =
+ new ArrayList<>();
+ for (final Executor executor : this.activeExecutors) {
// execute each executorInfo refresh task to fetch
- Future<String> fetchExecutionInfo =
- executorInforRefresherService.submit(new Callable<String>() {
- @Override
- public String call() throws Exception {
- return callExecutorForJsonString(executor.getHost(),
- executor.getPort(), "/serverStatistics", null);
- }
- });
- futures.add(new Pair<Executor, Future<String>>(executor,
- fetchExecutionInfo));
+ final Future<String> fetchExecutionInfo =
+ this.executorInforRefresherService.submit(new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ return callExecutorForJsonString(executor.getHost(),
+ executor.getPort(), "/serverStatistics", null);
+ }
+ });
+ futures.add(new Pair<>(executor,
+ fetchExecutionInfo));
}
boolean wasSuccess = true;
- for (Pair<Executor, Future<String>> refreshPair : futures) {
- Executor executor = refreshPair.getFirst();
+ for (final Pair<Executor, Future<String>> refreshPair : futures) {
+ final Executor executor = refreshPair.getFirst();
executor.setExecutorInfo(null); // invalidate cached EXecutorInfo
try {
// max 5 secs
- String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
+ final String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
logger.info(String.format(
- "Successfully refreshed executor: %s with executor info : %s",
- executor, jsonString));
- } catch (TimeoutException e) {
+ "Successfully refreshed executor: %s with executor info : %s",
+ executor, jsonString));
+ } catch (final TimeoutException e) {
wasSuccess = false;
logger.error("Timed out while waiting for ExecutorInfo refresh"
- + executor, e);
- } catch (Exception e) {
+ + executor, e);
+ } catch (final Exception e) {
wasSuccess = false;
logger.error("Failed to update ExecutorInfo for executor : "
- + executor, e);
+ + executor, e);
}
}
// update is successful for all executors
if (wasSuccess) {
- lastSuccessfulExecutorInfoRefresh = System.currentTimeMillis();
+ this.lastSuccessfulExecutorInfoRefresh = System.currentTimeMillis();
}
}
}
/**
- * Throws exception if running in local mode
- * {@inheritDoc}
+ * Throws exception if running in local mode {@inheritDoc}
+ *
* @see azkaban.executor.ExecutorManagerAdapter#disableQueueProcessorThread()
*/
@Override
public void disableQueueProcessorThread() throws ExecutorManagerException {
if (isMultiExecutorMode()) {
- queueProcessor.setActive(false);
+ this.queueProcessor.setActive(false);
} else {
throw new ExecutorManagerException(
- "Cannot disable QueueProcessor in local mode");
+ "Cannot disable QueueProcessor in local mode");
}
}
/**
- * Throws exception if running in local mode
- * {@inheritDoc}
+ * Throws exception if running in local mode {@inheritDoc}
+ *
* @see azkaban.executor.ExecutorManagerAdapter#enableQueueProcessorThread()
*/
@Override
public void enableQueueProcessorThread() throws ExecutorManagerException {
if (isMultiExecutorMode()) {
- queueProcessor.setActive(true);
+ this.queueProcessor.setActive(true);
} else {
throw new ExecutorManagerException(
- "Cannot enable QueueProcessor in local mode");
+ "Cannot enable QueueProcessor in local mode");
}
}
public State getQueueProcessorThreadState() {
- if (isMultiExecutorMode())
- return queueProcessor.getState();
- else
+ if (isMultiExecutorMode()) {
+ return this.queueProcessor.getState();
+ } else {
return State.NEW; // not started in local mode
+ }
}
/**
- * Returns state of QueueProcessor False, no flow is being dispatched True ,
- * flows are being dispatched as expected
- *
- * @return
+ * Returns state of QueueProcessor False, no flow is being dispatched True , flows are being
+ * dispatched as expected
*/
public boolean isQueueProcessorThreadActive() {
- if (isMultiExecutorMode())
- return queueProcessor.isActive();
- else
+ if (isMultiExecutorMode()) {
+ return this.queueProcessor.isActive();
+ } else {
return false;
+ }
}
/**
* Return last Successful ExecutorInfo Refresh for all active executors
- *
- * @return
*/
public long getLastSuccessfulExecutorInfoRefresh() {
return this.lastSuccessfulExecutorInfoRefresh;
@@ -351,8 +335,6 @@ public class ExecutorManager extends EventHandler implements
/**
* Get currently supported Comparators available to use via azkaban.properties
- *
- * @return
*/
public Set<String> getAvailableExecutorComparatorNames() {
return ExecutorComparator.getAvailableComparatorNames();
@@ -361,8 +343,6 @@ public class ExecutorManager extends EventHandler implements
/**
* Get currently supported filters available to use via azkaban.properties
- *
- * @return
*/
public Set<String> getAvailableExecutorFilterNames() {
return ExecutorFilter.getAvailableFilterNames();
@@ -370,21 +350,21 @@ public class ExecutorManager extends EventHandler implements
@Override
public State getExecutorManagerThreadState() {
- return executingManager.getState();
+ return this.executingManager.getState();
}
public String getExecutorThreadStage() {
- return updaterStage;
+ return this.updaterStage;
}
@Override
public boolean isExecutorManagerThreadActive() {
- return executingManager.isAlive();
+ return this.executingManager.isAlive();
}
@Override
public long getLastExecutorManagerThreadCheckTime() {
- return lastThreadCheckTime;
+ return this.lastThreadCheckTime;
}
public long getLastCleanerThreadCheckTime() {
@@ -393,30 +373,29 @@ public class ExecutorManager extends EventHandler implements
@Override
public Collection<Executor> getAllActiveExecutors() {
- return Collections.unmodifiableCollection(activeExecutors);
+ return Collections.unmodifiableCollection(this.activeExecutors);
}
/**
- *
* {@inheritDoc}
*
* @see azkaban.executor.ExecutorManagerAdapter#fetchExecutor(int)
*/
@Override
- public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
- for (Executor executor : activeExecutors) {
+ public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
+ for (final Executor executor : this.activeExecutors) {
if (executor.getId() == executorId) {
return executor;
}
}
- return executorLoader.fetchExecutor(executorId);
+ return this.executorLoader.fetchExecutor(executorId);
}
@Override
public Set<String> getPrimaryServerHosts() {
// Only one for now. More probably later.
- HashSet<String> ports = new HashSet<String>();
- for (Executor executor : activeExecutors) {
+ final HashSet<String> ports = new HashSet<>();
+ for (final Executor executor : this.activeExecutors) {
ports.add(executor.getHost() + ":" + executor.getPort());
}
return ports;
@@ -425,21 +404,21 @@ public class ExecutorManager extends EventHandler implements
@Override
public Set<String> getAllActiveExecutorServerHosts() {
// Includes non primary server/hosts
- HashSet<String> ports = new HashSet<String>();
- for (Executor executor : activeExecutors) {
+ final HashSet<String> ports = new HashSet<>();
+ for (final Executor executor : this.activeExecutors) {
ports.add(executor.getHost() + ":" + executor.getPort());
}
// include executor which were initially active and still has flows running
- for (Pair<ExecutionReference, ExecutableFlow> running : runningFlows
- .values()) {
- ExecutionReference ref = running.getFirst();
+ for (final Pair<ExecutionReference, ExecutableFlow> running : this.runningFlows
+ .values()) {
+ final ExecutionReference ref = running.getFirst();
ports.add(ref.getHost() + ":" + ref.getPort());
}
return ports;
}
private void loadRunningFlows() throws ExecutorManagerException {
- runningFlows.putAll(executorLoader.fetchActiveFlows());
+ this.runningFlows.putAll(this.executorLoader.fetchActiveFlows());
}
/*
@@ -447,46 +426,47 @@ public class ExecutorManager extends EventHandler implements
* any executor
*/
private void loadQueuedFlows() throws ExecutorManagerException {
- List<Pair<ExecutionReference, ExecutableFlow>> retrievedExecutions =
- executorLoader.fetchQueuedFlows();
+ final List<Pair<ExecutionReference, ExecutableFlow>> retrievedExecutions =
+ this.executorLoader.fetchQueuedFlows();
if (retrievedExecutions != null) {
- for (Pair<ExecutionReference, ExecutableFlow> pair : retrievedExecutions) {
- queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
+ for (final Pair<ExecutionReference, ExecutableFlow> pair : retrievedExecutions) {
+ this.queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
}
}
}
/**
- * Gets a list of all the active (running flows and non-dispatched flows)
- * executions for a given project and flow {@inheritDoc}. Results should
- * be sorted as we assume this while setting up pipelined execution Id.
+ * Gets a list of all the active (running flows and non-dispatched flows) executions for a given
+ * project and flow {@inheritDoc}. Results should be sorted as we assume this while setting up
+ * pipelined execution Id.
*
- * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int,
- * java.lang.String)
+ * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int, java.lang.String)
*/
@Override
- public List<Integer> getRunningFlows(int projectId, String flowId) {
- List<Integer> executionIds = new ArrayList<Integer>();
+ public List<Integer> getRunningFlows(final int projectId, final String flowId) {
+ final List<Integer> executionIds = new ArrayList<>();
executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
- queuedFlows.getAllEntries()));
+ this.queuedFlows.getAllEntries()));
// it's possible an execution is runningCandidate, meaning it's in dispatching state neither in queuedFlows nor runningFlows,
// so checks the runningCandidate as well.
- if (runningCandidate != null) {
- executionIds.addAll(getRunningFlowsHelper(projectId, flowId, Lists.newArrayList(runningCandidate)));
+ if (this.runningCandidate != null) {
+ executionIds
+ .addAll(
+ getRunningFlowsHelper(projectId, flowId, Lists.newArrayList(this.runningCandidate)));
}
executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
- runningFlows.values()));
+ this.runningFlows.values()));
Collections.sort(executionIds);
return executionIds;
}
/* Helper method for getRunningFlows */
- private List<Integer> getRunningFlowsHelper(int projectId, String flowId,
- Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
- List<Integer> executionIds = new ArrayList<Integer>();
- for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+ private List<Integer> getRunningFlowsHelper(final int projectId, final String flowId,
+ final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ final List<Integer> executionIds = new ArrayList<>();
+ for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
if (ref.getSecond().getFlowId().equals(flowId)
- && ref.getSecond().getProjectId() == projectId) {
+ && ref.getSecond().getProjectId() == projectId) {
executionIds.add(ref.getFirst().getExecId());
}
}
@@ -494,56 +474,53 @@ public class ExecutorManager extends EventHandler implements
}
/**
- *
* {@inheritDoc}
*
* @see azkaban.executor.ExecutorManagerAdapter#getActiveFlowsWithExecutor()
*/
@Override
public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
- throws IOException {
- List<Pair<ExecutableFlow, Executor>> flows =
- new ArrayList<Pair<ExecutableFlow, Executor>>();
- getActiveFlowsWithExecutorHelper(flows, queuedFlows.getAllEntries());
- getActiveFlowsWithExecutorHelper(flows, runningFlows.values());
+ throws IOException {
+ final List<Pair<ExecutableFlow, Executor>> flows =
+ new ArrayList<>();
+ getActiveFlowsWithExecutorHelper(flows, this.queuedFlows.getAllEntries());
+ getActiveFlowsWithExecutorHelper(flows, this.runningFlows.values());
return flows;
}
/* Helper method for getActiveFlowsWithExecutor */
private void getActiveFlowsWithExecutorHelper(
- List<Pair<ExecutableFlow, Executor>> flows,
- Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
- for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
- flows.add(new Pair<ExecutableFlow, Executor>(ref.getSecond(), ref
- .getFirst().getExecutor()));
+ final List<Pair<ExecutableFlow, Executor>> flows,
+ final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+ flows.add(new Pair<>(ref.getSecond(), ref
+ .getFirst().getExecutor()));
}
}
/**
- * Checks whether the given flow has an active (running, non-dispatched)
- * executions {@inheritDoc}
+ * Checks whether the given flow has an active (running, non-dispatched) executions {@inheritDoc}
*
- * @see azkaban.executor.ExecutorManagerAdapter#isFlowRunning(int,
- * java.lang.String)
+ * @see azkaban.executor.ExecutorManagerAdapter#isFlowRunning(int, java.lang.String)
*/
@Override
- public boolean isFlowRunning(int projectId, String flowId) {
+ public boolean isFlowRunning(final int projectId, final String flowId) {
boolean isRunning = false;
isRunning =
- isRunning
- || isFlowRunningHelper(projectId, flowId, queuedFlows.getAllEntries());
+ isRunning
+ || isFlowRunningHelper(projectId, flowId, this.queuedFlows.getAllEntries());
isRunning =
- isRunning
- || isFlowRunningHelper(projectId, flowId, runningFlows.values());
+ isRunning
+ || isFlowRunningHelper(projectId, flowId, this.runningFlows.values());
return isRunning;
}
/* Search a running flow in a collection */
- private boolean isFlowRunningHelper(int projectId, String flowId,
- Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
- for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+ private boolean isFlowRunningHelper(final int projectId, final String flowId,
+ final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
if (ref.getSecond().getProjectId() == projectId
- && ref.getSecond().getFlowId().equals(flowId)) {
+ && ref.getSecond().getFlowId().equals(flowId)) {
return true;
}
}
@@ -556,9 +533,9 @@ public class ExecutorManager extends EventHandler implements
* @see azkaban.executor.ExecutorManagerAdapter#getExecutableFlow(int)
*/
@Override
- public ExecutableFlow getExecutableFlow(int execId)
- throws ExecutorManagerException {
- return executorLoader.fetchExecutableFlow(execId);
+ public ExecutableFlow getExecutableFlow(final int execId)
+ throws ExecutorManagerException {
+ return this.executorLoader.fetchExecutableFlow(execId);
}
/**
@@ -570,9 +547,9 @@ public class ExecutorManager extends EventHandler implements
*/
@Override
public List<ExecutableFlow> getRunningFlows() {
- ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
- getActiveFlowHelper(flows, queuedFlows.getAllEntries());
- getActiveFlowHelper(flows, runningFlows.values());
+ final ArrayList<ExecutableFlow> flows = new ArrayList<>();
+ getActiveFlowHelper(flows, this.queuedFlows.getAllEntries());
+ getActiveFlowHelper(flows, this.runningFlows.values());
return flows;
}
@@ -580,9 +557,9 @@ public class ExecutorManager extends EventHandler implements
* Helper method to get all running flows from a Pair<ExecutionReference,
* ExecutableFlow collection
*/
- private void getActiveFlowHelper(ArrayList<ExecutableFlow> flows,
- Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
- for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+ private void getActiveFlowHelper(final ArrayList<ExecutableFlow> flows,
+ final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
flows.add(ref.getSecond());
}
}
@@ -595,9 +572,9 @@ public class ExecutorManager extends EventHandler implements
* @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
*/
public String getRunningFlowIds() {
- List<Integer> allIds = new ArrayList<Integer>();
- getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
- getRunningFlowsIdsHelper(allIds, runningFlows.values());
+ final List<Integer> allIds = new ArrayList<>();
+ getRunningFlowsIdsHelper(allIds, this.queuedFlows.getAllEntries());
+ getRunningFlowsIdsHelper(allIds, this.runningFlows.values());
Collections.sort(allIds);
return allIds.toString();
}
@@ -610,21 +587,21 @@ public class ExecutorManager extends EventHandler implements
* @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
*/
public String getQueuedFlowIds() {
- List<Integer> allIds = new ArrayList<Integer>();
- getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
+ final List<Integer> allIds = new ArrayList<>();
+ getRunningFlowsIdsHelper(allIds, this.queuedFlows.getAllEntries());
Collections.sort(allIds);
return allIds.toString();
}
public long getQueuedFlowSize() {
- return queuedFlows.size();
+ return this.queuedFlows.size();
}
/* Helper method to flow ids of all running flows */
- private void getRunningFlowsIdsHelper(List<Integer> allIds,
- Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
- for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+ private void getRunningFlowsIdsHelper(final List<Integer> allIds,
+ final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
allIds.add(ref.getSecond().getExecutionId());
}
}
@@ -633,9 +610,9 @@ public class ExecutorManager extends EventHandler implements
public List<ExecutableFlow> getRecentlyFinishedFlows() {
List<ExecutableFlow> flows = new ArrayList<>();
try {
- flows = executorLoader.fetchRecentlyFinishedFlows(
+ flows = this.executorLoader.fetchRecentlyFinishedFlows(
RECENTLY_FINISHED_LIFETIME);
- } catch(ExecutorManagerException e) {
+ } catch (final ExecutorManagerException e) {
//Todo jamiesjc: fix error handling.
logger.error("Failed to fetch recently finished flows.", e);
}
@@ -643,158 +620,155 @@ public class ExecutorManager extends EventHandler implements
}
@Override
- public List<ExecutableFlow> getExecutableFlows(Project project,
- String flowId, int skip, int size) throws ExecutorManagerException {
- List<ExecutableFlow> flows =
- executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
+ public List<ExecutableFlow> getExecutableFlows(final Project project,
+ final String flowId, final int skip, final int size) throws ExecutorManagerException {
+ final List<ExecutableFlow> flows =
+ this.executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
return flows;
}
@Override
- public List<ExecutableFlow> getExecutableFlows(int skip, int size)
+ public List<ExecutableFlow> getExecutableFlows(final int skip, final int size)
throws ExecutorManagerException {
- List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
+ final List<ExecutableFlow> flows = this.executorLoader.fetchFlowHistory(skip, size);
return flows;
}
@Override
- public List<ExecutableFlow> getExecutableFlows(String flowIdContains,
- int skip, int size) throws ExecutorManagerException {
- List<ExecutableFlow> flows =
- executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
+ public List<ExecutableFlow> getExecutableFlows(final String flowIdContains,
+ final int skip, final int size) throws ExecutorManagerException {
+ final List<ExecutableFlow> flows =
+ this.executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
0, -1, -1, skip, size);
return flows;
}
@Override
- public List<ExecutableFlow> getExecutableFlows(String projContain,
- String flowContain, String userContain, int status, long begin, long end,
- int skip, int size) throws ExecutorManagerException {
- List<ExecutableFlow> flows =
- executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
+ public List<ExecutableFlow> getExecutableFlows(final String projContain,
+ final String flowContain, final String userContain, final int status, final long begin,
+ final long end,
+ final int skip, final int size) throws ExecutorManagerException {
+ final List<ExecutableFlow> flows =
+ this.executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
status, begin, end, skip, size);
return flows;
}
@Override
- public List<ExecutableJobInfo> getExecutableJobs(Project project,
- String jobId, int skip, int size) throws ExecutorManagerException {
- List<ExecutableJobInfo> nodes =
- executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
+ public List<ExecutableJobInfo> getExecutableJobs(final Project project,
+ final String jobId, final int skip, final int size) throws ExecutorManagerException {
+ final List<ExecutableJobInfo> nodes =
+ this.executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
return nodes;
}
@Override
- public int getNumberOfJobExecutions(Project project, String jobId)
+ public int getNumberOfJobExecutions(final Project project, final String jobId)
throws ExecutorManagerException {
- return executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
+ return this.executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
}
@Override
- public int getNumberOfExecutions(Project project, String flowId)
+ public int getNumberOfExecutions(final Project project, final String flowId)
throws ExecutorManagerException {
- return executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
+ return this.executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
}
@Override
- public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset,
- int length) throws ExecutorManagerException {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ public LogData getExecutableFlowLog(final ExecutableFlow exFlow, final int offset,
+ final int length) throws ExecutorManagerException {
+ final Pair<ExecutionReference, ExecutableFlow> pair =
+ this.runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
- Pair<String, String> typeParam = new Pair<String, String>("type", "flow");
- Pair<String, String> offsetParam =
- new Pair<String, String>("offset", String.valueOf(offset));
- Pair<String, String> lengthParam =
- new Pair<String, String>("length", String.valueOf(length));
-
- @SuppressWarnings("unchecked")
- Map<String, Object> result =
+ final Pair<String, String> typeParam = new Pair<>("type", "flow");
+ final Pair<String, String> offsetParam =
+ new Pair<>("offset", String.valueOf(offset));
+ final Pair<String, String> lengthParam =
+ new Pair<>("length", String.valueOf(length));
+
+ @SuppressWarnings("unchecked") final Map<String, Object> result =
callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
typeParam, offsetParam, lengthParam);
return LogData.createLogDataFromObject(result);
} else {
- LogData value =
- executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset,
+ final LogData value =
+ this.executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset,
length);
return value;
}
}
@Override
- public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId,
- int offset, int length, int attempt) throws ExecutorManagerException {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ public LogData getExecutionJobLog(final ExecutableFlow exFlow, final String jobId,
+ final int offset, final int length, final int attempt) throws ExecutorManagerException {
+ final Pair<ExecutionReference, ExecutableFlow> pair =
+ this.runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
- Pair<String, String> typeParam = new Pair<String, String>("type", "job");
- Pair<String, String> jobIdParam =
- new Pair<String, String>("jobId", jobId);
- Pair<String, String> offsetParam =
- new Pair<String, String>("offset", String.valueOf(offset));
- Pair<String, String> lengthParam =
- new Pair<String, String>("length", String.valueOf(length));
- Pair<String, String> attemptParam =
- new Pair<String, String>("attempt", String.valueOf(attempt));
-
- @SuppressWarnings("unchecked")
- Map<String, Object> result =
+ final Pair<String, String> typeParam = new Pair<>("type", "job");
+ final Pair<String, String> jobIdParam =
+ new Pair<>("jobId", jobId);
+ final Pair<String, String> offsetParam =
+ new Pair<>("offset", String.valueOf(offset));
+ final Pair<String, String> lengthParam =
+ new Pair<>("length", String.valueOf(length));
+ final Pair<String, String> attemptParam =
+ new Pair<>("attempt", String.valueOf(attempt));
+
+ @SuppressWarnings("unchecked") final Map<String, Object> result =
callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
return LogData.createLogDataFromObject(result);
} else {
- LogData value =
- executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
+ final LogData value =
+ this.executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
offset, length);
return value;
}
}
@Override
- public List<Object> getExecutionJobStats(ExecutableFlow exFlow, String jobId,
- int attempt) throws ExecutorManagerException {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ public List<Object> getExecutionJobStats(final ExecutableFlow exFlow, final String jobId,
+ final int attempt) throws ExecutorManagerException {
+ final Pair<ExecutionReference, ExecutableFlow> pair =
+ this.runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
- return executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
+ return this.executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
attempt);
}
- Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
- Pair<String, String> attemptParam =
- new Pair<String, String>("attempt", String.valueOf(attempt));
+ final Pair<String, String> jobIdParam = new Pair<>("jobId", jobId);
+ final Pair<String, String> attemptParam =
+ new Pair<>("attempt", String.valueOf(attempt));
- @SuppressWarnings("unchecked")
- Map<String, Object> result =
+ @SuppressWarnings("unchecked") final Map<String, Object> result =
callExecutorServer(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
jobIdParam, attemptParam);
- @SuppressWarnings("unchecked")
- List<Object> jobStats = (List<Object>) result.get("attachments");
+ @SuppressWarnings("unchecked") final List<Object> jobStats = (List<Object>) result
+ .get("attachments");
return jobStats;
}
@Override
- public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
- String jobId, int offset, int length, int attempt)
+ public JobMetaData getExecutionJobMetaData(final ExecutableFlow exFlow,
+ final String jobId, final int offset, final int length, final int attempt)
throws ExecutorManagerException {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ final Pair<ExecutionReference, ExecutableFlow> pair =
+ this.runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
- Pair<String, String> typeParam = new Pair<String, String>("type", "job");
- Pair<String, String> jobIdParam =
- new Pair<String, String>("jobId", jobId);
- Pair<String, String> offsetParam =
- new Pair<String, String>("offset", String.valueOf(offset));
- Pair<String, String> lengthParam =
- new Pair<String, String>("length", String.valueOf(length));
- Pair<String, String> attemptParam =
- new Pair<String, String>("attempt", String.valueOf(attempt));
-
- @SuppressWarnings("unchecked")
- Map<String, Object> result =
+ final Pair<String, String> typeParam = new Pair<>("type", "job");
+ final Pair<String, String> jobIdParam =
+ new Pair<>("jobId", jobId);
+ final Pair<String, String> offsetParam =
+ new Pair<>("offset", String.valueOf(offset));
+ final Pair<String, String> lengthParam =
+ new Pair<>("length", String.valueOf(length));
+ final Pair<String, String> attemptParam =
+ new Pair<>("attempt", String.valueOf(attempt));
+
+ @SuppressWarnings("unchecked") final Map<String, Object> result =
callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
return JobMetaData.createJobMetaDataFromObject(result);
@@ -804,38 +778,38 @@ public class ExecutorManager extends EventHandler implements
}
/**
- * if flows was dispatched to an executor, cancel by calling Executor else if
- * flow is still in queue, remove from queue and finalize {@inheritDoc}
+ * if flows was dispatched to an executor, cancel by calling Executor else if flow is still in
+ * queue, remove from queue and finalize {@inheritDoc}
*
* @see azkaban.executor.ExecutorManagerAdapter#cancelFlow(azkaban.executor.ExecutableFlow,
- * java.lang.String)
+ * java.lang.String)
*/
@Override
- public void cancelFlow(ExecutableFlow exFlow, String userId)
- throws ExecutorManagerException {
+ public void cancelFlow(final ExecutableFlow exFlow, final String userId)
+ throws ExecutorManagerException {
synchronized (exFlow) {
- if (runningFlows.containsKey(exFlow.getExecutionId())) {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ if (this.runningFlows.containsKey(exFlow.getExecutionId())) {
+ final Pair<ExecutionReference, ExecutableFlow> pair =
+ this.runningFlows.get(exFlow.getExecutionId());
callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
- userId);
- } else if (queuedFlows.hasExecution(exFlow.getExecutionId())) {
- queuedFlows.dequeue(exFlow.getExecutionId());
+ userId);
+ } else if (this.queuedFlows.hasExecution(exFlow.getExecutionId())) {
+ this.queuedFlows.dequeue(exFlow.getExecutionId());
finalizeFlows(exFlow);
} else {
throw new ExecutorManagerException("Execution "
- + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
- + " isn't running.");
+ + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ + " isn't running.");
}
}
}
@Override
- public void resumeFlow(ExecutableFlow exFlow, String userId)
+ public void resumeFlow(final ExecutableFlow exFlow, final String userId)
throws ExecutorManagerException {
synchronized (exFlow) {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ final Pair<ExecutionReference, ExecutableFlow> pair =
+ this.runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -846,11 +820,11 @@ public class ExecutorManager extends EventHandler implements
}
@Override
- public void pauseFlow(ExecutableFlow exFlow, String userId)
+ public void pauseFlow(final ExecutableFlow exFlow, final String userId)
throws ExecutorManagerException {
synchronized (exFlow) {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ final Pair<ExecutionReference, ExecutableFlow> pair =
+ this.runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -861,60 +835,60 @@ public class ExecutorManager extends EventHandler implements
}
@Override
- public void pauseExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ public void pauseExecutingJobs(final ExecutableFlow exFlow, final String userId,
+ final String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId,
jobIds);
}
@Override
- public void resumeExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ public void resumeExecutingJobs(final ExecutableFlow exFlow, final String userId,
+ final String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId,
jobIds);
}
@Override
- public void retryFailures(ExecutableFlow exFlow, String userId)
+ public void retryFailures(final ExecutableFlow exFlow, final String userId)
throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
}
@Override
- public void retryExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ public void retryExecutingJobs(final ExecutableFlow exFlow, final String userId,
+ final String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId,
jobIds);
}
@Override
- public void disableExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ public void disableExecutingJobs(final ExecutableFlow exFlow, final String userId,
+ final String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId,
jobIds);
}
@Override
- public void enableExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ public void enableExecutingJobs(final ExecutableFlow exFlow, final String userId,
+ final String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId,
jobIds);
}
@Override
- public void cancelExecutingJobs(ExecutableFlow exFlow, String userId,
- String... jobIds) throws ExecutorManagerException {
+ public void cancelExecutingJobs(final ExecutableFlow exFlow, final String userId,
+ final String... jobIds) throws ExecutorManagerException {
modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId,
jobIds);
}
@SuppressWarnings("unchecked")
- private Map<String, Object> modifyExecutingJobs(ExecutableFlow exFlow,
- String command, String userId, String... jobIds)
+ private Map<String, Object> modifyExecutingJobs(final ExecutableFlow exFlow,
+ final String command, final String userId, final String... jobIds)
throws ExecutorManagerException {
synchronized (exFlow) {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(exFlow.getExecutionId());
+ final Pair<ExecutionReference, ExecutableFlow> pair =
+ this.runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -923,9 +897,9 @@ public class ExecutorManager extends EventHandler implements
Map<String, Object> response = null;
if (jobIds != null && jobIds.length > 0) {
- for (String jobId : jobIds) {
+ for (final String jobId : jobIds) {
if (!jobId.isEmpty()) {
- ExecutableNode node = exFlow.getExecutableNode(jobId);
+ final ExecutableNode node = exFlow.getExecutableNode(jobId);
if (node == null) {
throw new ExecutorManagerException("Job " + jobId
+ " doesn't exist in execution " + exFlow.getExecutionId()
@@ -933,18 +907,18 @@ public class ExecutorManager extends EventHandler implements
}
}
}
- String ids = StringUtils.join(jobIds, ',');
+ final String ids = StringUtils.join(jobIds, ',');
response =
callExecutorServer(pair.getFirst(),
ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
- new Pair<String, String>(
+ new Pair<>(
ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
- new Pair<String, String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
+ new Pair<>(ConnectorParams.MODIFY_JOBS_LIST, ids));
} else {
response =
callExecutorServer(pair.getFirst(),
ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
- new Pair<String, String>(
+ new Pair<>(
ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
}
@@ -953,31 +927,31 @@ public class ExecutorManager extends EventHandler implements
}
@Override
- public String submitExecutableFlow(ExecutableFlow exflow, String userId)
- throws ExecutorManagerException {
+ public String submitExecutableFlow(final ExecutableFlow exflow, final String userId)
+ throws ExecutorManagerException {
- String exFlowKey = exflow.getProjectName() + "." + exflow.getId() + ".submitFlow";
+ final String exFlowKey = exflow.getProjectName() + "." + exflow.getId() + ".submitFlow";
// using project and flow name to prevent race condition when same flow is submitted by API and schedule at the same time
// causing two same flow submission entering this piece.
synchronized (exFlowKey.intern()) {
- String flowId = exflow.getFlowId();
+ final String flowId = exflow.getFlowId();
logger.info("Submitting execution flow " + flowId + " by " + userId);
String message = "";
- if (queuedFlows.isFull()) {
+ if (this.queuedFlows.isFull()) {
message =
- String
- .format(
- "Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
- flowId, exflow.getProjectName());
+ String
+ .format(
+ "Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
+ flowId, exflow.getProjectName());
logger.error(message);
} else {
- int projectId = exflow.getProjectId();
+ final int projectId = exflow.getProjectId();
exflow.setSubmitUser(userId);
exflow.setSubmitTime(System.currentTimeMillis());
- List<Integer> running = getRunningFlows(projectId, flowId);
+ final List<Integer> running = getRunningFlows(projectId, flowId);
ExecutionOptions options = exflow.getExecutionOptions();
if (options == null) {
@@ -990,55 +964,55 @@ public class ExecutorManager extends EventHandler implements
if (!running.isEmpty()) {
if (options.getConcurrentOption().equals(
- ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
+ ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
Collections.sort(running);
- Integer runningExecId = running.get(running.size() - 1);
+ final Integer runningExecId = running.get(running.size() - 1);
options.setPipelineExecutionId(runningExecId);
message =
- "Flow " + flowId + " is already running with exec id "
- + runningExecId + ". Pipelining level "
- + options.getPipelineLevel() + ". \n";
+ "Flow " + flowId + " is already running with exec id "
+ + runningExecId + ". Pipelining level "
+ + options.getPipelineLevel() + ". \n";
} else if (options.getConcurrentOption().equals(
- ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
+ ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
throw new ExecutorManagerException("Flow " + flowId
- + " is already running. Skipping execution.",
- ExecutorManagerException.Reason.SkippedExecution);
+ + " is already running. Skipping execution.",
+ ExecutorManagerException.Reason.SkippedExecution);
} else {
// The settings is to run anyways.
message =
- "Flow " + flowId + " is already running with exec id "
- + StringUtils.join(running, ",")
- + ". Will execute concurrently. \n";
+ "Flow " + flowId + " is already running with exec id "
+ + StringUtils.join(running, ",")
+ + ". Will execute concurrently. \n";
}
}
- boolean memoryCheck =
- !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
- ProjectWhitelist.WhitelistType.MemoryCheck);
+ final boolean memoryCheck =
+ !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
+ ProjectWhitelist.WhitelistType.MemoryCheck);
options.setMemoryCheck(memoryCheck);
// The exflow id is set by the loader. So it's unavailable until after
// this call.
- executorLoader.uploadExecutableFlow(exflow);
+ this.executorLoader.uploadExecutableFlow(exflow);
// We create an active flow reference in the datastore. If the upload
// fails, we remove the reference.
- ExecutionReference reference =
- new ExecutionReference(exflow.getExecutionId());
+ final ExecutionReference reference =
+ new ExecutionReference(exflow.getExecutionId());
if (isMultiExecutorMode()) {
//Take MultiExecutor route
- executorLoader.addActiveExecutableReference(reference);
- queuedFlows.enqueue(exflow, reference);
+ this.executorLoader.addActiveExecutableReference(reference);
+ this.queuedFlows.enqueue(exflow, reference);
} else {
// assign only local executor we have
- Executor choosenExecutor = activeExecutors.iterator().next();
- executorLoader.addActiveExecutableReference(reference);
+ final Executor choosenExecutor = this.activeExecutors.iterator().next();
+ this.executorLoader.addActiveExecutableReference(reference);
try {
dispatch(reference, exflow, choosenExecutor);
this.commonMetrics.markDispatchSuccess();
- } catch (ExecutorManagerException e) {
+ } catch (final ExecutorManagerException e) {
// When flow dispatch fails, should update the flow status
// to FAILED in execution_flows DB table as well. Currently
// this logic is only implemented in multiExecutorMode but
@@ -1049,84 +1023,86 @@ public class ExecutorManager extends EventHandler implements
}
}
message +=
- "Execution submitted successfully with exec id "
- + exflow.getExecutionId();
+ "Execution submitted successfully with exec id "
+ + exflow.getExecutionId();
}
return message;
}
}
- private void cleanOldExecutionLogs(long millis) {
- long beforeDeleteLogsTimestamp = System.currentTimeMillis();
+ private void cleanOldExecutionLogs(final long millis) {
+ final long beforeDeleteLogsTimestamp = System.currentTimeMillis();
try {
- int count = executorLoader.removeExecutionLogsByTime(millis);
+ final int count = this.executorLoader.removeExecutionLogsByTime(millis);
logger.info("Cleaned up " + count + " log entries.");
- } catch (ExecutorManagerException e) {
+ } catch (final ExecutorManagerException e) {
logger.error("log clean up failed. ", e);
}
- logger.info("log clean up time: " + (System.currentTimeMillis() - beforeDeleteLogsTimestamp)/1000 + " seconds.");
+ logger.info(
+ "log clean up time: " + (System.currentTimeMillis() - beforeDeleteLogsTimestamp) / 1000
+ + " seconds.");
}
- private Map<String, Object> callExecutorServer(ExecutableFlow exflow,
- Executor executor, String action) throws ExecutorManagerException {
+ private Map<String, Object> callExecutorServer(final ExecutableFlow exflow,
+ final Executor executor, final String action) throws ExecutorManagerException {
try {
return callExecutorServer(executor.getHost(), executor.getPort(), action,
- exflow.getExecutionId(), null, (Pair<String, String>[]) null);
- } catch (IOException e) {
+ exflow.getExecutionId(), null, (Pair<String, String>[]) null);
+ } catch (final IOException e) {
throw new ExecutorManagerException(e);
}
}
- private Map<String, Object> callExecutorServer(ExecutionReference ref,
- String action, String user) throws ExecutorManagerException {
+ private Map<String, Object> callExecutorServer(final ExecutionReference ref,
+ final String action, final String user) throws ExecutorManagerException {
try {
return callExecutorServer(ref.getHost(), ref.getPort(), action,
ref.getExecId(), user, (Pair<String, String>[]) null);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new ExecutorManagerException(e);
}
}
- private Map<String, Object> callExecutorServer(ExecutionReference ref,
- String action, Pair<String, String>... params)
+ private Map<String, Object> callExecutorServer(final ExecutionReference ref,
+ final String action, final Pair<String, String>... params)
throws ExecutorManagerException {
try {
return callExecutorServer(ref.getHost(), ref.getPort(), action,
ref.getExecId(), null, params);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new ExecutorManagerException(e);
}
}
- private Map<String, Object> callExecutorServer(ExecutionReference ref,
- String action, String user, Pair<String, String>... params)
+ private Map<String, Object> callExecutorServer(final ExecutionReference ref,
+ final String action, final String user, final Pair<String, String>... params)
throws ExecutorManagerException {
try {
return callExecutorServer(ref.getHost(), ref.getPort(), action,
ref.getExecId(), user, params);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new ExecutorManagerException(e);
}
}
- private Map<String, Object> callExecutorServer(String host, int port,
- String action, Integer executionId, String user,
- Pair<String, String>... params) throws IOException {
- List<Pair<String, String>> paramList = new ArrayList<Pair<String,String>>();
+ private Map<String, Object> callExecutorServer(final String host, final int port,
+ final String action, final Integer executionId, final String user,
+ final Pair<String, String>... params) throws IOException {
+ final List<Pair<String, String>> paramList = new ArrayList<>();
// if params = null
- if(params != null) {
+ if (params != null) {
paramList.addAll(Arrays.asList(params));
}
paramList
- .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
- paramList.add(new Pair<String, String>(ConnectorParams.EXECID_PARAM, String
- .valueOf(executionId)));
- paramList.add(new Pair<String, String>(ConnectorParams.USER_PARAM, user));
+ .add(new Pair<>(ConnectorParams.ACTION_PARAM, action));
+ paramList.add(new Pair<>(ConnectorParams.EXECID_PARAM, String
+ .valueOf(executionId)));
+ paramList.add(new Pair<>(ConnectorParams.USER_PARAM, user));
- Map<String, Object> jsonResponse =
- callExecutorForJsonObject(host, port, "/executor", paramList);
+ final Map<String, Object> jsonResponse =
+ callExecutorForJsonObject(host, port, "/executor", paramList);
return jsonResponse;
}
@@ -1135,15 +1111,14 @@ public class ExecutorManager extends EventHandler implements
* Helper method used by ExecutorManager to call executor and return json
* object map
*/
- private Map<String, Object> callExecutorForJsonObject(String host, int port,
- String path, List<Pair<String, String>> paramList) throws IOException {
- String responseString =
- callExecutorForJsonString(host, port, path, paramList);
-
- @SuppressWarnings("unchecked")
- Map<String, Object> jsonResponse =
- (Map<String, Object>) JSONUtils.parseJSONFromString(responseString);
- String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
+ private Map<String, Object> callExecutorForJsonObject(final String host, final int port,
+ final String path, final List<Pair<String, String>> paramList) throws IOException {
+ final String responseString =
+ callExecutorForJsonString(host, port, path, paramList);
+
+ @SuppressWarnings("unchecked") final Map<String, Object> jsonResponse =
+ (Map<String, Object>) JSONUtils.parseJSONFromString(responseString);
+ final String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
if (error != null) {
throw new IOException(error);
}
@@ -1154,37 +1129,33 @@ public class ExecutorManager extends EventHandler implements
* Helper method used by ExecutorManager to call executor and return raw json
* string
*/
- private String callExecutorForJsonString(String host, int port, String path,
- List<Pair<String, String>> paramList) throws IOException {
+ private String callExecutorForJsonString(final String host, final int port, final String path,
+ List<Pair<String, String>> paramList) throws IOException {
if (paramList == null) {
- paramList = new ArrayList<Pair<String, String>>();
+ paramList = new ArrayList<>();
}
- ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
- @SuppressWarnings("unchecked")
- URI uri =
- ExecutorApiClient.buildUri(host, port, path, true,
- paramList.toArray(new Pair[0]));
+ final ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
+ @SuppressWarnings("unchecked") final URI uri =
+ ExecutorApiClient.buildUri(host, port, path, true,
+ paramList.toArray(new Pair[0]));
return apiclient.httpGet(uri, null);
}
/**
- * Manage servlet call for stats servlet in Azkaban execution server
- * {@inheritDoc}
- *
- * @throws ExecutorManagerException
+ * Manage servlet call for stats servlet in Azkaban execution server {@inheritDoc}
*
* @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String,
- * azkaban.utils.Pair[])
+ * azkaban.utils.Pair[])
*/
@Override
- public Map<String, Object> callExecutorStats(int executorId, String action,
- Pair<String, String>... params) throws IOException, ExecutorManagerException {
- Executor executor = fetchExecutor(executorId);
+ public Map<String, Object> callExecutorStats(final int executorId, final String action,
+ final Pair<String, String>... params) throws IOException, ExecutorManagerException {
+ final Executor executor = fetchExecutor(executorId);
- List<Pair<String, String>> paramList =
- new ArrayList<Pair<String, String>>();
+ final List<Pair<String, String>> paramList =
+ new ArrayList<>();
// if params = null
if (params != null) {
@@ -1192,220 +1163,72 @@ public class ExecutorManager extends EventHandler implements
}
paramList
- .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
+ .add(new Pair<>(ConnectorParams.ACTION_PARAM, action));
return callExecutorForJsonObject(executor.getHost(), executor.getPort(),
- "/stats", paramList);
+ "/stats", paramList);
}
@Override
- public Map<String, Object> callExecutorJMX(String hostPort, String action,
- String mBean) throws IOException {
- List<Pair<String, String>> paramList =
- new ArrayList<Pair<String, String>>();
+ public Map<String, Object> callExecutorJMX(final String hostPort, final String action,
+ final String mBean) throws IOException {
+ final List<Pair<String, String>> paramList =
+ new ArrayList<>();
- paramList.add(new Pair<String, String>(action, ""));
- if(mBean != null) {
- paramList.add(new Pair<String, String>(ConnectorParams.JMX_MBEAN, mBean));
+ paramList.add(new Pair<>(action, ""));
+ if (mBean != null) {
+ paramList.add(new Pair<>(ConnectorParams.JMX_MBEAN, mBean));
}
- String[] hostPortSplit = hostPort.split(":");
+ final String[] hostPortSplit = hostPort.split(":");
return callExecutorForJsonObject(hostPortSplit[0],
- Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
+ Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
}
@Override
public void shutdown() {
if (isMultiExecutorMode()) {
- queueProcessor.shutdown();
+ this.queueProcessor.shutdown();
}
- executingManager.shutdown();
+ this.executingManager.shutdown();
}
- private class ExecutingManagerUpdaterThread extends Thread {
- private boolean shutdown = false;
-
- public ExecutingManagerUpdaterThread() {
- this.setName("ExecutorManagerUpdaterThread");
- }
-
- private int waitTimeIdleMs = 2000;
- private int waitTimeMs = 500;
-
- // When we have an http error, for that flow, we'll check every 10 secs, 6
- // times (1 mins) before we evict.
- private int numErrors = 6;
- private long errorThreshold = 10000;
-
- private void shutdown() {
- shutdown = true;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void run() {
- while (!shutdown) {
- try {
- lastThreadCheckTime = System.currentTimeMillis();
- updaterStage = "Starting update all flows.";
-
- Map<Executor, List<ExecutableFlow>> exFlowMap =
- getFlowToExecutorMap();
- ArrayList<ExecutableFlow> finishedFlows =
- new ArrayList<ExecutableFlow>();
- ArrayList<ExecutableFlow> finalizeFlows =
- new ArrayList<ExecutableFlow>();
-
- if (exFlowMap.size() > 0) {
- for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
- .entrySet()) {
- List<Long> updateTimesList = new ArrayList<Long>();
- List<Integer> executionIdsList = new ArrayList<Integer>();
-
- Executor executor = entry.getKey();
-
- updaterStage =
- "Starting update flows on " + executor.getHost() + ":"
- + executor.getPort();
-
- // We pack the parameters of the same host together before we
- // query.
- fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
- updateTimesList);
-
- Pair<String, String> updateTimes =
- new Pair<String, String>(
- ConnectorParams.UPDATE_TIME_LIST_PARAM,
- JSONUtils.toJSON(updateTimesList));
- Pair<String, String> executionIds =
- new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
- JSONUtils.toJSON(executionIdsList));
-
- Map<String, Object> results = null;
- try {
- results =
- callExecutorServer(executor.getHost(),
- executor.getPort(), ConnectorParams.UPDATE_ACTION,
- null, null, executionIds, updateTimes);
- } catch (IOException e) {
- logger.error(e);
- for (ExecutableFlow flow : entry.getValue()) {
- Pair<ExecutionReference, ExecutableFlow> pair =
- runningFlows.get(flow.getExecutionId());
-
- updaterStage =
- "Failed to get update. Doing some clean up for flow "
- + pair.getSecond().getExecutionId();
-
- if (pair != null) {
- ExecutionReference ref = pair.getFirst();
- int numErrors = ref.getNumErrors();
- if (ref.getNumErrors() < this.numErrors) {
- ref.setNextCheckTime(System.currentTimeMillis()
- + errorThreshold);
- ref.setNumErrors(++numErrors);
- } else {
- logger.error("Evicting flow " + flow.getExecutionId()
- + ". The executor is unresponsive.");
- // TODO should send out an unresponsive email here.
- finalizeFlows.add(pair.getSecond());
- }
- }
- }
- }
-
- // We gets results
- if (results != null) {
- List<Map<String, Object>> executionUpdates =
- (List<Map<String, Object>>) results
- .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
- for (Map<String, Object> updateMap : executionUpdates) {
- try {
- ExecutableFlow flow = updateExecution(updateMap);
-
- updaterStage = "Updated flow " + flow.getExecutionId();
-
- if (isFinished(flow)) {
- finishedFlows.add(flow);
- finalizeFlows.add(flow);
- }
- } catch (ExecutorManagerException e) {
- ExecutableFlow flow = e.getExecutableFlow();
- logger.error(e);
-
- if (flow != null) {
- logger.error("Finalizing flow " + flow.getExecutionId());
- finalizeFlows.add(flow);
- }
- }
- }
- }
- }
+ private void finalizeFlows(final ExecutableFlow flow) {
- updaterStage =
- "Finalizing " + finalizeFlows.size() + " error flows.";
-
- // Kill error flows
- for (ExecutableFlow flow : finalizeFlows) {
- finalizeFlows(flow);
- }
- }
-
- updaterStage = "Updated all active flows. Waiting for next round.";
-
- synchronized (this) {
- try {
- if (runningFlows.size() > 0) {
- this.wait(waitTimeMs);
- } else {
- this.wait(waitTimeIdleMs);
- }
- } catch (InterruptedException e) {
- }
- }
- } catch (Exception e) {
- logger.error(e);
- }
- }
- }
- }
-
- private void finalizeFlows(ExecutableFlow flow) {
-
- int execId = flow.getExecutionId();
+ final int execId = flow.getExecutionId();
boolean alertUser = true;
- updaterStage = "finalizing flow " + execId;
+ this.updaterStage = "finalizing flow " + execId;
// First we check if the execution in the datastore is complete
try {
- ExecutableFlow dsFlow;
+ final ExecutableFlow dsFlow;
if (isFinished(flow)) {
dsFlow = flow;
} else {
- updaterStage = "finalizing flow " + execId + " loading from db";
- dsFlow = executorLoader.fetchExecutableFlow(execId);
+ this.updaterStage = "finalizing flow " + execId + " loading from db";
+ dsFlow = this.executorLoader.fetchExecutableFlow(execId);
// If it's marked finished, we're good. If not, we fail everything and
// then mark it finished.
if (!isFinished(dsFlow)) {
- updaterStage = "finalizing flow " + execId + " failing the flow";
+ this.updaterStage = "finalizing flow " + execId + " failing the flow";
failEverything(dsFlow);
- executorLoader.updateExecutableFlow(dsFlow);
+ this.executorLoader.updateExecutableFlow(dsFlow);
}
}
- updaterStage = "finalizing flow " + execId + " deleting active reference";
+ this.updaterStage = "finalizing flow " + execId + " deleting active reference";
// Delete the executing reference.
if (flow.getEndTime() == -1) {
flow.setEndTime(System.currentTimeMillis());
- executorLoader.updateExecutableFlow(dsFlow);
+ this.executorLoader.updateExecutableFlow(dsFlow);
}
- executorLoader.removeActiveExecutableReference(execId);
+ this.executorLoader.removeActiveExecutableReference(execId);
- updaterStage = "finalizing flow " + execId + " cleaning from memory";
- runningFlows.remove(execId);
- } catch (ExecutorManagerException e) {
+ this.updaterStage = "finalizing flow " + execId + " cleaning from memory";
+ this.runningFlows.remove(execId);
+ } catch (final ExecutorManagerException e) {
alertUser = false; // failed due to azkaban internal error, not to alert user
logger.error(e);
}
@@ -1414,26 +1237,26 @@ public class ExecutorManager extends EventHandler implements
// target no longer had
// the reference.
- updaterStage = "finalizing flow " + execId + " alerting and emailing";
- if(alertUser) {
- ExecutionOptions options = flow.getExecutionOptions();
+ this.updaterStage = "finalizing flow " + execId + " alerting and emailing";
+ if (alertUser) {
+ final ExecutionOptions options = flow.getExecutionOptions();
// But we can definitely email them.
- Alerter mailAlerter = alerterHolder.get("email");
+ final Alerter mailAlerter = this.alerterHolder.get("email");
if (flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED) {
if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) {
try {
mailAlerter.alertOnError(flow);
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.error(e);
}
}
if (options.getFlowParameters().containsKey("alert.type")) {
- String alertType = options.getFlowParameters().get("alert.type");
- Alerter alerter = alerterHolder.get(alertType);
+ final String alertType = options.getFlowParameters().get("alert.type");
+ final Alerter alerter = this.alerterHolder.get(alertType);
if (alerter != null) {
try {
alerter.alertOnError(flow);
- } catch (Exception e) {
+ } catch (final Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.error("Failed to alert by " + alertType);
@@ -1447,17 +1270,17 @@ public class ExecutorManager extends EventHandler implements
try {
mailAlerter.alertOnSuccess(flow);
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.error(e);
}
}
if (options.getFlowParameters().containsKey("alert.type")) {
- String alertType = options.getFlowParameters().get("alert.type");
- Alerter alerter = alerterHolder.get(alertType);
+ final String alertType = options.getFlowParameters().get("alert.type");
+ final Alerter alerter = this.alerterHolder.get(alertType);
if (alerter != null) {
try {
alerter.alertOnSuccess(flow);
- } catch (Exception e) {
+ } catch (final Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.error("Failed to alert by " + alertType);
@@ -1471,23 +1294,23 @@ public class ExecutorManager extends EventHandler implements
}
- private void failEverything(ExecutableFlow exFlow) {
- long time = System.currentTimeMillis();
- for (ExecutableNode node : exFlow.getExecutableNodes()) {
+ private void failEverything(final ExecutableFlow exFlow) {
+ final long time = System.currentTimeMillis();
+ for (final ExecutableNode node : exFlow.getExecutableNodes()) {
switch (node.getStatus()) {
- case SUCCEEDED:
- case FAILED:
- case KILLED:
- case SKIPPED:
- case DISABLED:
- continue;
- // case UNKNOWN:
- case READY:
- node.setStatus(Status.KILLED);
- break;
- default:
- node.setStatus(Status.FAILED);
- break;
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ case SKIPPED:
+ case DISABLED:
+ continue;
+ // case UNKNOWN:
+ case READY:
+ node.setStatus(Status.KILLED);
+ break;
+ default:
+ node.setStatus(Status.FAILED);
+ break;
}
if (node.getStartTime() == -1) {
@@ -1505,25 +1328,25 @@ public class ExecutorManager extends EventHandler implements
exFlow.setStatus(Status.FAILED);
}
- private ExecutableFlow updateExecution(Map<String, Object> updateData)
+ private ExecutableFlow updateExecution(final Map<String, Object> updateData)
throws ExecutorManagerException {
- Integer execId =
+ final Integer execId =
(Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
if (execId == null) {
throw new ExecutorManagerException(
"Response is malformed. Need exec id to update.");
}
- Pair<ExecutionReference, ExecutableFlow> refPair =
+ final Pair<ExecutionReference, ExecutableFlow> refPair =
this.runningFlows.get(execId);
if (refPair == null) {
throw new ExecutorManagerException(
"No running flow found with the execution id. Removing " + execId);
}
- ExecutionReference ref = refPair.getFirst();
- ExecutableFlow flow = refPair.getSecond();
+ final ExecutionReference ref = refPair.getFirst();
+ final ExecutableFlow flow = refPair.getSecond();
if (updateData.containsKey("error")) {
// The flow should be finished here.
throw new ExecutorManagerException((String) updateData.get("error"), flow);
@@ -1532,33 +1355,33 @@ public class ExecutorManager extends EventHandler implements
// Reset errors.
ref.setNextCheckTime(0);
ref.setNumErrors(0);
- Status oldStatus = flow.getStatus();
+ final Status oldStatus = flow.getStatus();
flow.applyUpdateObject(updateData);
- Status newStatus = flow.getStatus();
+ final Status newStatus = flow.getStatus();
- if(oldStatus != newStatus && newStatus == Status.FAILED) {
+ if (oldStatus != newStatus && newStatus == Status.FAILED) {
this.commonMetrics.markFlowFail();
}
- ExecutionOptions options = flow.getExecutionOptions();
+ final ExecutionOptions options = flow.getExecutionOptions();
if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
// We want to see if we should give an email status on first failure.
if (options.getNotifyOnFirstFailure()) {
- Alerter mailAlerter = alerterHolder.get("email");
+ final Alerter mailAlerter = this.alerterHolder.get("email");
try {
mailAlerter.alertOnFirstError(flow);
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
logger.error("Failed to send first error email." + e.getMessage());
}
}
if (options.getFlowParameters().containsKey("alert.type")) {
- String alertType = options.getFlowParameters().get("alert.type");
- Alerter alerter = alerterHolder.get(alertType);
+ final String alertType = options.getFlowParameters().get("alert.type");
+ final Alerter alerter = this.alerterHolder.get(alertType);
if (alerter != null) {
try {
alerter.alertOnFirstError(flow);
- } catch (Exception e) {
+ } catch (final Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.error("Failed to alert by " + alertType);
@@ -1573,7 +1396,7 @@ public class ExecutorManager extends EventHandler implements
return flow;
}
- public boolean isFinished(ExecutableFlow flow) {
+ public boolean isFinished(final ExecutableFlow flow) {
switch (flow.getStatus()) {
case SUCCEEDED:
case FAILED:
@@ -1584,9 +1407,9 @@ public class ExecutorManager extends EventHandler implements
}
}
- private void fillUpdateTimeAndExecId(List<ExecutableFlow> flows,
- List<Integer> executionIds, List<Long> updateTimes) {
- for (ExecutableFlow flow : flows) {
+ private void fillUpdateTimeAndExecId(final List<ExecutableFlow> flows,
+ final List<Integer> executionIds, final List<Long> updateTimes) {
+ for (final ExecutableFlow flow : flows) {
executionIds.add(flow.getExecutionId());
updateTimes.add(flow.getUpdateTime());
}
@@ -1594,14 +1417,14 @@ public class ExecutorManager extends EventHandler implements
/* Group Executable flow by Executors to reduce number of REST calls */
private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
- HashMap<Executor, List<ExecutableFlow>> exFlowMap =
- new HashMap<Executor, List<ExecutableFlow>>();
+ final HashMap<Executor, List<ExecutableFlow>> exFlowMap =
+ new HashMap<>();
- for (Pair<ExecutionReference, ExecutableFlow> runningFlow : runningFlows
- .values()) {
- ExecutionReference ref = runningFlow.getFirst();
- ExecutableFlow flow = runningFlow.getSecond();
- Executor executor = ref.getExecutor();
+ for (final Pair<ExecutionReference, ExecutableFlow> runningFlow : this.runningFlows
+ .values()) {
+ final ExecutionReference ref = runningFlow.getFirst();
+ final ExecutableFlow flow = runningFlow.getSecond();
+ final Executor executor = ref.getExecutor();
// We can set the next check time to prevent the checking of certain
// flows.
@@ -1611,7 +1434,7 @@ public class ExecutorManager extends EventHandler implements
List<ExecutableFlow> flows = exFlowMap.get(executor);
if (flows == null) {
- flows = new ArrayList<ExecutableFlow>();
+ flows = new ArrayList<>();
exFlowMap.put(executor, flows);
}
@@ -1622,22 +1445,199 @@ public class ExecutorManager extends EventHandler implements
}
@Override
- public int getExecutableFlows(int projectId, String flowId, int from,
- int length, List<ExecutableFlow> outputList)
+ public int getExecutableFlows(final int projectId, final String flowId, final int from,
+ final int length, final List<ExecutableFlow> outputList)
throws ExecutorManagerException {
- List<ExecutableFlow> flows =
- executorLoader.fetchFlowHistory(projectId, flowId, from, length);
+ final List<ExecutableFlow> flows =
+ this.executorLoader.fetchFlowHistory(projectId, flowId, from, length);
outputList.addAll(flows);
- return executorLoader.fetchNumExecutableFlows(projectId, flowId);
+ return this.executorLoader.fetchNumExecutableFlows(projectId, flowId);
}
@Override
- public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId,
- int from, int length, Status status) throws ExecutorManagerException {
- return executorLoader.fetchFlowHistory(projectId, flowId, from, length,
+ public List<ExecutableFlow> getExecutableFlows(final int projectId, final String flowId,
+ final int from, final int length, final Status status) throws ExecutorManagerException {
+ return this.executorLoader.fetchFlowHistory(projectId, flowId, from, length,
status);
}
+ /**
+ * Calls executor to dispatch the flow, update db to assign the executor and in-memory state of
+ * executableFlow
+ */
+ private void dispatch(final ExecutionReference reference, final ExecutableFlow exflow,
+ final Executor choosenExecutor) throws ExecutorManagerException {
+ exflow.setUpdateTime(System.currentTimeMillis());
+
+ this.executorLoader.assignExecutor(choosenExecutor.getId(),
+ exflow.getExecutionId());
+ try {
+ callExecutorServer(exflow, choosenExecutor,
+ ConnectorParams.EXECUTE_ACTION);
+ } catch (final ExecutorManagerException ex) {
+ logger.error("Rolling back executor assignment for execution id:"
+ + exflow.getExecutionId(), ex);
+ this.executorLoader.unassignExecutor(exflow.getExecutionId());
+ throw new ExecutorManagerException(ex);
+ }
+ reference.setExecutor(choosenExecutor);
+
+ // move from flow to running flows
+ this.runningFlows.put(exflow.getExecutionId(),
+ new Pair<>(reference, exflow));
+
+ logger.info(String.format(
+ "Successfully dispatched exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
+ }
+
+ private class ExecutingManagerUpdaterThread extends Thread {
+
+ private final int waitTimeIdleMs = 2000;
+ private final int waitTimeMs = 500;
+ // When we have an http error, for that flow, we'll check every 10 secs, 6
+ // times (1 mins) before we evict.
+ private final int numErrors = 6;
+ private final long errorThreshold = 10000;
+ private boolean shutdown = false;
+
+ public ExecutingManagerUpdaterThread() {
+ this.setName("ExecutorManagerUpdaterThread");
+ }
+
+ private void shutdown() {
+ this.shutdown = true;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void run() {
+ while (!this.shutdown) {
+ try {
+ ExecutorManager.this.lastThreadCheckTime = System.currentTimeMillis();
+ ExecutorManager.this.updaterStage = "Starting update all flows.";
+
+ final Map<Executor, List<ExecutableFlow>> exFlowMap =
+ getFlowToExecutorMap();
+ final ArrayList<ExecutableFlow> finishedFlows =
+ new ArrayList<>();
+ final ArrayList<ExecutableFlow> finalizeFlows =
+ new ArrayList<>();
+
+ if (exFlowMap.size() > 0) {
+ for (final Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
+ .entrySet()) {
+ final List<Long> updateTimesList = new ArrayList<>();
+ final List<Integer> executionIdsList = new ArrayList<>();
+
+ final Executor executor = entry.getKey();
+
+ ExecutorManager.this.updaterStage =
+ "Starting update flows on " + executor.getHost() + ":"
+ + executor.getPort();
+
+ // We pack the parameters of the same host together before we
+ // query.
+ fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
+ updateTimesList);
+
+ final Pair<String, String> updateTimes =
+ new Pair<>(
+ ConnectorParams.UPDATE_TIME_LIST_PARAM,
+ JSONUtils.toJSON(updateTimesList));
+ final Pair<String, String> executionIds =
+ new Pair<>(ConnectorParams.EXEC_ID_LIST_PARAM,
+ JSONUtils.toJSON(executionIdsList));
+
+ Map<String, Object> results = null;
+ try {
+ results =
+ callExecutorServer(executor.getHost(),
+ executor.getPort(), ConnectorParams.UPDATE_ACTION,
+ null, null, executionIds, updateTimes);
+ } catch (final IOException e) {
+ logger.error(e);
+ for (final ExecutableFlow flow : entry.getValue()) {
+ final Pair<ExecutionReference, ExecutableFlow> pair =
+ ExecutorManager.this.runningFlows.get(flow.getExecutionId());
+
+ ExecutorManager.this.updaterStage =
+ "Failed to get update. Doing some clean up for flow "
+ + pair.getSecond().getExecutionId();
+
+ if (pair != null) {
+ final ExecutionReference ref = pair.getFirst();
+ int numErrors = ref.getNumErrors();
+ if (ref.getNumErrors() < this.numErrors) {
+ ref.setNextCheckTime(System.currentTimeMillis()
+ + this.errorThreshold);
+ ref.setNumErrors(++numErrors);
+ } else {
+ logger.error("Evicting flow " + flow.getExecutionId()
+ + ". The executor is unresponsive.");
+ // TODO should send out an unresponsive email here.
+ finalizeFlows.add(pair.getSecond());
+ }
+ }
+ }
+ }
+
+ // We gets results
+ if (results != null) {
+ final List<Map<String, Object>> executionUpdates =
+ (List<Map<String, Object>>) results
+ .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
+ for (final Map<String, Object> updateMap : executionUpdates) {
+ try {
+ final ExecutableFlow flow = updateExecution(updateMap);
+
+ ExecutorManager.this.updaterStage = "Updated flow " + flow.getExecutionId();
+
+ if (isFinished(flow)) {
+ finishedFlows.add(flow);
+ finalizeFlows.add(flow);
+ }
+ } catch (final ExecutorManagerException e) {
+ final ExecutableFlow flow = e.getExecutableFlow();
+ logger.error(e);
+
+ if (flow != null) {
+ logger.error("Finalizing flow " + flow.getExecutionId());
+ finalizeFlows.add(flow);
+ }
+ }
+ }
+ }
+ }
+
+ ExecutorManager.this.updaterStage =
+ "Finalizing " + finalizeFlows.size() + " error flows.";
+
+ // Kill error flows
+ for (final ExecutableFlow flow : finalizeFlows) {
+ finalizeFlows(flow);
+ }
+ }
+
+ ExecutorManager.this.updaterStage = "Updated all active flows. Waiting for next round.";
+
+ synchronized (this) {
+ try {
+ if (ExecutorManager.this.runningFlows.size() > 0) {
+ this.wait(this.waitTimeMs);
+ } else {
+ this.wait(this.waitTimeIdleMs);
+ }
+ } catch (final InterruptedException e) {
+ }
+ }
+ } catch (final Exception e) {
+ logger.error(e);
+ }
+ }
+ }
+ }
+
/*
* cleaner thread to clean up execution_logs, etc in DB. Runs every hour.
*/
@@ -1652,33 +1652,33 @@ public class ExecutorManager extends EventHandler implements
private boolean shutdown = false;
private long lastLogCleanTime = -1;
- public CleanerThread(long executionLogsRetentionMs) {
+ public CleanerThread(final long executionLogsRetentionMs) {
this.executionLogsRetentionMs = executionLogsRetentionMs;
this.setName("AzkabanWebServer-Cleaner-Thread");
}
@SuppressWarnings("unused")
public void shutdown() {
- shutdown = true;
+ this.shutdown = true;
this.interrupt();
}
@Override
public void run() {
- while (!shutdown) {
+ while (!this.shutdown) {
synchronized (this) {
try {
- lastCleanerThreadCheckTime = System.currentTimeMillis();
+ ExecutorManager.this.lastCleanerThreadCheckTime = System.currentTimeMillis();
// Cleanup old stuff.
- long currentTime = System.currentTimeMillis();
- if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > lastLogCleanTime) {
+ final long currentTime = System.currentTimeMillis();
+ if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > this.lastLogCleanTime) {
cleanExecutionLogs();
- lastLogCleanTime = currentTime;
+ this.lastLogCleanTime = currentTime;
}
wait(CLEANER_THREAD_WAIT_INTERVAL_MS);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
logger.info("Interrupted. Probably to shut down.");
}
}
@@ -1687,49 +1687,20 @@ public class ExecutorManager extends EventHandler implements
private void cleanExecutionLogs() {
logger.info("Cleaning old logs from execution_logs");
- long cutoff = System.currentTimeMillis() - executionLogsRetentionMs;
+ final long cutoff = System.currentTimeMillis() - this.executionLogsRetentionMs;
logger.info("Cleaning old log files before "
+ new DateTime(cutoff).toString());
cleanOldExecutionLogs(System.currentTimeMillis()
- - executionLogsRetentionMs);
+ - this.executionLogsRetentionMs);
}
}
- /**
- * Calls executor to dispatch the flow, update db to assign the executor and
- * in-memory state of executableFlow
- */
- private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
- Executor choosenExecutor) throws ExecutorManagerException {
- exflow.setUpdateTime(System.currentTimeMillis());
-
- executorLoader.assignExecutor(choosenExecutor.getId(),
- exflow.getExecutionId());
- try {
- callExecutorServer(exflow, choosenExecutor,
- ConnectorParams.EXECUTE_ACTION);
- } catch (ExecutorManagerException ex) {
- logger.error("Rolling back executor assignment for execution id:"
- + exflow.getExecutionId(), ex);
- executorLoader.unassignExecutor(exflow.getExecutionId());
- throw new ExecutorManagerException(ex);
- }
- reference.setExecutor(choosenExecutor);
-
- // move from flow to running flows
- runningFlows.put(exflow.getExecutionId(),
- new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
-
- logger.info(String.format(
- "Successfully dispatched exec %d with error count %d",
- exflow.getExecutionId(), reference.getNumErrors()));
- }
-
/*
* This thread is responsible for processing queued flows using dispatcher and
* making rest api calls to executor server
*/
private class QueueProcessorThread extends Thread {
+
private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
private final int maxDispatchingErrors;
private final long activeExecutorRefreshWindowInMilisec;
@@ -1738,78 +1709,78 @@ public class ExecutorManager extends EventHandler implements
private volatile boolean shutdown = false;
private volatile boolean isActive = true;
- public QueueProcessorThread(boolean isActive,
- long activeExecutorRefreshWindowInTime,
- int activeExecutorRefreshWindowInFlows,
- int maxDispatchingErrors) {
+ public QueueProcessorThread(final boolean isActive,
+ final long activeExecutorRefreshWindowInTime,
+ final int activeExecutorRefreshWindowInFlows,
+ final int maxDispatchingErrors) {
setActive(isActive);
this.maxDispatchingErrors = maxDispatchingErrors;
this.activeExecutorRefreshWindowInFlows =
- activeExecutorRefreshWindowInFlows;
+ activeExecutorRefreshWindowInFlows;
this.activeExecutorRefreshWindowInMilisec =
- activeExecutorRefreshWindowInTime;
+ activeExecutorRefreshWindowInTime;
this.setName("AzkabanWebServer-QueueProcessor-Thread");
}
- public void setActive(boolean isActive) {
- this.isActive = isActive;
- logger.info("QueueProcessorThread active turned " + this.isActive);
+ public boolean isActive() {
+ return this.isActive;
}
- public boolean isActive() {
- return isActive;
+ public void setActive(final boolean isActive) {
+ this.isActive = isActive;
+ logger.info("QueueProcessorThread active turned " + this.isActive);
}
public void shutdown() {
- shutdown = true;
+ this.shutdown = true;
this.interrupt();
}
@Override
public void run() {
// Loops till QueueProcessorThread is shutdown
- while (!shutdown) {
+ while (!this.shutdown) {
synchronized (this) {
try {
// start processing queue if active, other wait for sometime
- if (isActive) {
- processQueuedFlows(activeExecutorRefreshWindowInMilisec,
- activeExecutorRefreshWindowInFlows);
+ if (this.isActive) {
+ processQueuedFlows(this.activeExecutorRefreshWindowInMilisec,
+ this.activeExecutorRefreshWindowInFlows);
}
wait(QUEUE_PROCESSOR_WAIT_IN_MS);
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.error(
- "QueueProcessorThread Interrupted. Probably to shut down.", e);
+ "QueueProcessorThread Interrupted. Probably to shut down.", e);
}
}
}
}
/* Method responsible for processing the non-dispatched flows */
- private void processQueuedFlows(long activeExecutorsRefreshWindow,
- int maxContinuousFlowProcessed) throws InterruptedException,
- ExecutorManagerException {
+ private void processQueuedFlows(final long activeExecutorsRefreshWindow,
+ final int maxContinuousFlowProcessed) throws InterruptedException,
+ ExecutorManagerException {
long lastExecutorRefreshTime = 0;
int currentContinuousFlowProcessed = 0;
- while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
- ExecutionReference reference = runningCandidate.getFirst();
- ExecutableFlow exflow = runningCandidate.getSecond();
- long currentTime = System.currentTimeMillis();
+ while (isActive() && (ExecutorManager.this.runningCandidate = ExecutorManager.this.queuedFlows
+ .fetchHead()) != null) {
+ final ExecutionReference reference = ExecutorManager.this.runningCandidate.getFirst();
+ final ExecutableFlow exflow = ExecutorManager.this.runningCandidate.getSecond();
+ final long currentTime = System.currentTimeMillis();
// if we have dispatched more than maxContinuousFlowProcessed or
// It has been more then activeExecutorsRefreshWindow millisec since we
// refreshed
if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
- || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
+ || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
// Refresh executorInfo for all activeExecutors
refreshExecutors();
lastExecutorRefreshTime = currentTime;
currentContinuousFlowProcessed = 0;
}
-
/**
* <pre>
* TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
@@ -1823,119 +1794,121 @@ public class ExecutorManager extends EventHandler implements
* 2. re-attempting a flow (which has been tried before) is considered as all executors are busy
* </pre>
*/
- if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
+ if (exflow.getUpdateTime() > lastExecutorRefreshTime) {
// put back in the queue
- queuedFlows.enqueue(exflow, reference);
- runningCandidate = null;
- long sleepInterval =
- activeExecutorsRefreshWindow
- - (currentTime - lastExecutorRefreshTime);
+ ExecutorManager.this.queuedFlows.enqueue(exflow, reference);
+ ExecutorManager.this.runningCandidate = null;
+ final long sleepInterval =
+ activeExecutorsRefreshWindow
+ - (currentTime - lastExecutorRefreshTime);
// wait till next executor refresh
sleep(sleepInterval);
} else {
exflow.setUpdateTime(currentTime);
// process flow with current snapshot of activeExecutors
- selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
- runningCandidate = null;
+ selectExecutorAndDispatchFlow(reference, exflow, new HashSet<>(
+ ExecutorManager.this.activeExecutors));
+ ExecutorManager.this.runningCandidate = null;
}
// do not count failed flow processsing (flows still in queue)
- if(queuedFlows.getFlow(exflow.getExecutionId()) == null) {
+ if (ExecutorManager.this.queuedFlows.getFlow(exflow.getExecutionId()) == null) {
currentContinuousFlowProcessed++;
}
}
}
/* process flow with a snapshot of available Executors */
- private void selectExecutorAndDispatchFlow(ExecutionReference reference,
- ExecutableFlow exflow, Set<Executor> availableExecutors)
- throws ExecutorManagerException {
+ private void selectExecutorAndDispatchFlow(final ExecutionReference reference,
+ final ExecutableFlow exflow, final Set<Executor> availableExecutors)
+ throws ExecutorManagerException {
synchronized (exflow) {
- Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
+ final Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
if (selectedExecutor != null) {
try {
dispatch(reference, exflow, selectedExecutor);
- commonMetrics.markDispatchSuccess();
- } catch (ExecutorManagerException e) {
- commonMetrics.markDispatchFail();
+ ExecutorManager.this.commonMetrics.markDispatchSuccess();
+ } catch (final ExecutorManagerException e) {
+ ExecutorManager.this.commonMetrics.markDispatchFail();
logger.warn(String.format(
- "Executor %s responded with exception for exec: %d",
- selectedExecutor, exflow.getExecutionId()), e);
+ "Executor %s responded with exception for exec: %d",
+ selectedExecutor, exflow.getExecutionId()), e);
handleDispatchExceptionCase(reference, exflow, selectedExecutor,
- availableExecutors);
+ availableExecutors);
}
} else {
- commonMetrics.markDispatchFail();
+ ExecutorManager.this.commonMetrics.markDispatchFail();
handleNoExecutorSelectedCase(reference, exflow);
}
}
}
/* Helper method to fetch overriding Executor, if a valid user has specifed otherwise return null */
- private Executor getUserSpecifiedExecutor(ExecutionOptions options,
- int executionId) {
+ private Executor getUserSpecifiedExecutor(final ExecutionOptions options,
+ final int executionId) {
Executor executor = null;
if (options != null
- && options.getFlowParameters() != null
- && options.getFlowParameters().containsKey(
+ && options.getFlowParameters() != null
+ && options.getFlowParameters().containsKey(
ExecutionOptions.USE_EXECUTOR)) {
try {
- int executorId =
- Integer.valueOf(options.getFlowParameters().get(
- ExecutionOptions.USE_EXECUTOR));
+ final int executorId =
+ Integer.valueOf(options.getFlowParameters().get(
+ ExecutionOptions.USE_EXECUTOR));
executor = fetchExecutor(executorId);
if (executor == null) {
logger
- .warn(String
- .format(
- "User specified executor id: %d for execution id: %d is not active, Looking up db.",
- executorId, executionId));
- executor = executorLoader.fetchExecutor(executorId);
+ .warn(String
+ .format(
+ "User specified executor id: %d for execution id: %d is not active, Looking up db.",
+ executorId, executionId));
+ executor = ExecutorManager.this.executorLoader.fetchExecutor(executorId);
if (executor == null) {
logger
- .warn(String
- .format(
- "User specified executor id: %d for execution id: %d is missing from db. Defaulting to availableExecutors",
- executorId, executionId));
+ .warn(String
+ .format(
+ "User specified executor id: %d for execution id: %d is missing from db. Defaulting to availableExecutors",
+ executorId, executionId));
}
}
- } catch (ExecutorManagerException ex) {
+ } catch (final ExecutorManagerException ex) {
logger.error("Failed to fetch user specified executor for exec_id = "
- + executionId, ex);
+ + executionId, ex);
}
}
return executor;
}
/* Choose Executor for exflow among the available executors */
- private Executor selectExecutor(ExecutableFlow exflow,
- Set<Executor> availableExecutors) {
+ private Executor selectExecutor(final ExecutableFlow exflow,
+ final Set<Executor> availableExecutors) {
Executor choosenExecutor =
- getUserSpecifiedExecutor(exflow.getExecutionOptions(),
- exflow.getExecutionId());
+ getUserSpecifiedExecutor(exflow.getExecutionOptions(),
+ exflow.getExecutionId());
// If no executor was specified by admin
if (choosenExecutor == null) {
logger.info("Using dispatcher for execution id :"
- + exflow.getExecutionId());
- ExecutorSelector selector = new ExecutorSelector(filterList, comparatorWeightsMap);
+ + exflow.getExecutionId());
+ final ExecutorSelector selector = new ExecutorSelector(ExecutorManager.this.filterList,
+ ExecutorManager.this.comparatorWeightsMap);
choosenExecutor = selector.getBest(availableExecutors, exflow);
}
return choosenExecutor;
}
- private void handleDispatchExceptionCase(ExecutionReference reference,
- ExecutableFlow exflow, Executor lastSelectedExecutor,
- Set<Executor> remainingExecutors) throws ExecutorManagerException {
+ private void handleDispatchExceptionCase(final ExecutionReference reference,
+ final ExecutableFlow exflow, final Executor lastSelectedExecutor,
+ final Set<Executor> remainingExecutors) throws ExecutorManagerException {
logger
- .info(String
- .format(
- "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
- exflow.getExecutionId(), reference.getNumErrors()));
+ .info(String
+ .format(
+ "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
reference.setNumErrors(reference.getNumErrors() + 1);
if (reference.getNumErrors() > this.maxDispatchingErrors
- || remainingExecutors.size() <= 1) {
+ || remainingExecutors.size() <= 1) {
logger.error("Failed to process queued flow");
finalizeFlows(exflow);
} else {
@@ -1945,16 +1918,16 @@ public class ExecutorManager extends EventHandler implements
}
}
- private void handleNoExecutorSelectedCase(ExecutionReference reference,
- ExecutableFlow exflow) throws ExecutorManagerException {
+ private void handleNoExecutorSelectedCase(final ExecutionReference reference,
+ final ExecutableFlow exflow) throws ExecutorManagerException {
logger
- .info(String
- .format(
- "Reached handleNoExecutorSelectedCase stage for exec %d with error count %d",
- exflow.getExecutionId(), reference.getNumErrors()));
+ .info(String
+ .format(
+ "Reached handleNoExecutorSelectedCase stage for exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
// TODO: handle scenario where a high priority flow failing to get
// schedule can starve all others
- queuedFlows.enqueue(exflow, reference);
+ ExecutorManager.this.queuedFlows.enqueue(exflow, reference);
}
}
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index fec3bd3..429d58b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -175,17 +175,14 @@ public interface ExecutorManagerAdapter {
throws ExecutorManagerException;
/**
- * Manage servlet call for stats servlet in Azkaban execution server
- * Action can take any of the following values
- * <ul>
- * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_REPORTINGINTERVAL}<li>
- * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_CLEANINGINTERVAL}<li>
- * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_MAXREPORTERPOINTS}<li>
- * <li>{@link azkaban.executor.ConnectorParams#STATS_GET_ALLMETRICSNAME}<li>
- * <li>{@link azkaban.executor.ConnectorParams#STATS_GET_METRICHISTORY}<li>
- * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_ENABLEMETRICS}<li>
- * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_DISABLEMETRICS}<li>
- * </ul>
+ * Manage servlet call for stats servlet in Azkaban execution server Action can take any of the
+ * following values <ul> <li>{@link azkaban.executor.ConnectorParams#STATS_SET_REPORTINGINTERVAL}<li>
+ * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_CLEANINGINTERVAL}<li> <li>{@link
+ * azkaban.executor.ConnectorParams#STATS_SET_MAXREPORTERPOINTS}<li> <li>{@link
+ * azkaban.executor.ConnectorParams#STATS_GET_ALLMETRICSNAME}<li> <li>{@link
+ * azkaban.executor.ConnectorParams#STATS_GET_METRICHISTORY}<li> <li>{@link
+ * azkaban.executor.ConnectorParams#STATS_SET_ENABLEMETRICS}<li> <li>{@link
+ * azkaban.executor.ConnectorParams#STATS_SET_DISABLEMETRICS}<li> </ul>
*/
public Map<String, Object> callExecutorStats(int executorId, String action,
Pair<String, String>... param) throws IOException, ExecutorManagerException;
@@ -206,8 +203,7 @@ public interface ExecutorManagerAdapter {
public Set<? extends String> getPrimaryServerHosts();
/**
- * Returns a collection of all the active executors maintained by active
- * executors
+ * Returns a collection of all the active executors maintained by active executors
*/
public Collection<Executor> getAllActiveExecutors();
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 3d1da87..da29ddf 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -30,11 +30,11 @@ import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.apache.commons.dbutils.DbUtils;
-import org.apache.log4j.Logger;
@Singleton
public class JdbcExecutorLoader extends AbstractJdbcLoader implements
ExecutorLoader {
+
private final ExecutionFlowDao executionFlowDao;
private final ExecutorDao executorDao;
private final ExecutionJobDao executionJobDao;
@@ -48,20 +48,20 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Inject
public JdbcExecutorLoader(final Props props, final CommonMetrics commonMetrics,
- final ExecutionFlowDao executionFlowDao,
- final ExecutorDao executorDao,
- final ExecutionJobDao executionJobDao,
- final ExecutionLogsDao executionLogsDao,
- final ExecutorEventsDao executorEventsDao,
- final ActiveExecutingFlowsDao activeExecutingFlowsDao,
- final FetchActiveFlowDao fetchActiveFlowDao,
- final AssignExecutorDao assignExecutorDao,
- final NumExecutionsDao numExecutionsDao) {
+ final ExecutionFlowDao executionFlowDao,
+ final ExecutorDao executorDao,
+ final ExecutionJobDao executionJobDao,
+ final ExecutionLogsDao executionLogsDao,
+ final ExecutorEventsDao executorEventsDao,
+ final ActiveExecutingFlowsDao activeExecutingFlowsDao,
+ final FetchActiveFlowDao fetchActiveFlowDao,
+ final AssignExecutorDao assignExecutorDao,
+ final NumExecutionsDao numExecutionsDao) {
super(props, commonMetrics);
this.executionFlowDao = executionFlowDao;
this.executorDao = executorDao;
this.executionJobDao = executionJobDao;
- this.executionLogsDao= executionLogsDao;
+ this.executionLogsDao = executionLogsDao;
this.executorEventsDao = executorEventsDao;
this.activeExecutingFlowsDao = activeExecutingFlowsDao;
this.fetchActiveFlowDao = fetchActiveFlowDao;
@@ -95,9 +95,9 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return this.executionFlowDao.fetchExecutableFlow(id);
}
- @Override
+ @Override
public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
return this.executionFlowDao.fetchQueuedFlows();
}
@@ -143,26 +143,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
- final int skip, final int num) throws ExecutorManagerException {
+ final int skip, final int num) throws ExecutorManagerException {
return this.executionFlowDao.fetchFlowHistory(projectId, flowId, skip, num);
}
@Override
public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
- final int skip, final int num, final Status status) throws ExecutorManagerException {
+ final int skip, final int num, final Status status) throws ExecutorManagerException {
return this.executionFlowDao.fetchFlowHistory(projectId, flowId, skip, num, status);
}
@Override
public List<ExecutableFlow> fetchFlowHistory(final int skip, final int num)
throws ExecutorManagerException {
- return this.executionFlowDao.fetchFlowHistory(skip,num);
+ return this.executionFlowDao.fetchFlowHistory(skip, num);
}
@Override
public List<ExecutableFlow> fetchFlowHistory(final String projContain,
- final String flowContains, final String userNameContains, final int status, final long startTime,
- final long endTime, final int skip, final int num) throws ExecutorManagerException {
+ final String flowContains, final String userNameContains, final int status,
+ final long startTime,
+ final long endTime, final int skip, final int num) throws ExecutorManagerException {
return this.executionFlowDao.fetchFlowHistory(projContain, flowContains,
userNameContains, status, startTime, endTime, skip, num);
}
@@ -237,14 +238,15 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public List<ExecutableJobInfo> fetchJobHistory(final int projectId, final String jobId,
- final int skip, final int size) throws ExecutorManagerException {
+ final int skip, final int size) throws ExecutorManagerException {
return this.executionJobDao.fetchJobHistory(projectId, jobId, skip, size);
}
@Override
- public LogData fetchLogs(final int execId, final String name, final int attempt, final int startByte,
- final int length) throws ExecutorManagerException {
+ public LogData fetchLogs(final int execId, final String name, final int attempt,
+ final int startByte,
+ final int length) throws ExecutorManagerException {
return this.executionLogsDao.fetchLogs(execId, name, attempt, startByte, length);
}
@@ -257,7 +259,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
@Override
- public void uploadLogFile(final int execId, final String name, final int attempt, final File... files)
+ public void uploadLogFile(final int execId, final String name, final int attempt,
+ final File... files)
throws ExecutorManagerException {
this.executionLogsDao.uploadLogFile(execId, name, attempt, files);
}
@@ -279,65 +282,65 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
return connection;
}
- @Override
+ @Override
public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
return this.executorDao.fetchAllExecutors();
}
- @Override
+ @Override
public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
return this.executorDao.fetchActiveExecutors();
}
- @Override
+ @Override
public Executor fetchExecutor(final String host, final int port)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
return this.executorDao.fetchExecutor(host, port);
}
- @Override
+ @Override
public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
return this.executorDao.fetchExecutor(executorId);
}
- @Override
+ @Override
public void updateExecutor(final Executor executor) throws ExecutorManagerException {
this.executorDao.updateExecutor(executor);
}
@Override
public Executor addExecutor(final String host, final int port)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
return this.executorDao.addExecutor(host, port);
}
- @Override
+ @Override
public void removeExecutor(final String host, final int port) throws ExecutorManagerException {
this.executorDao.removeExecutor(host, port);
}
@Override
public void postExecutorEvent(final Executor executor, final EventType type, final String user,
- final String message) throws ExecutorManagerException{
+ final String message) throws ExecutorManagerException {
this.executorEventsDao.postExecutorEvent(executor, type, user, message);
}
@Override
public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
- final int offset) throws ExecutorManagerException {
+ final int offset) throws ExecutorManagerException {
return this.executorEventsDao.getExecutorEvents(executor, num, offset);
}
@Override
public void assignExecutor(final int executorId, final int executionId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
this.assignExecutorDao.assignExecutor(executorId, executionId);
}
@Override
public Executor fetchExecutorByExecutionId(final int executionId)
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
return this.executorDao.fetchExecutorByExecutionId(executionId);
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
index 7eb592d..84ae246 100644
--- a/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
+++ b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
@@ -34,8 +34,7 @@ public class QueuedExecutions {
}
/**
- * Wraps BoundedQueue Take method to have a corresponding update in
- * queuedFlowMap lookup table
+ * Wraps BoundedQueue Take method to have a corresponding update in queuedFlowMap lookup table
*/
public Pair<ExecutionReference, ExecutableFlow> fetchHead()
throws InterruptedException {
@@ -136,8 +135,7 @@ public class QueuedExecutions {
}
/**
- * Fetch Activereference for an execution. Returns null, if execution not in
- * queue
+ * Fetch Activereference for an execution. Returns null, if execution not in queue
*/
public ExecutionReference getReference(final int executionId) {
if (hasExecution(executionId)) {
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
index 18260d9..2624e03 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -91,7 +91,8 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
* NOTE : this is a static filter which means the filter will be filtering based on the system
* standard which is not
* Coming for the passed flow.
- * Ideally this filter will make sure only the executor hasn't reached the Max allowed # of
+ * Ideally this filter will make sure only the executor hasn't reached the Max allowed #
+ * of
* executing flows.
* </pre>
*/
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
index e1cfab2..64f6da4 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
@@ -48,8 +48,8 @@ public final class FactorComparator<T> {
}
/**
- * static function to generate an instance of the class.
- * refer to the constructor for the param definitions.
+ * static function to generate an instance of the class. refer to the constructor for the param
+ * definitions.
*/
public static <T> FactorComparator<T> create(final String factorName, final int weight,
final Comparator<T> comparator) {
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
index 97fb89f..d5d5c81 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
@@ -44,8 +44,8 @@ public final class FactorFilter<T, V> {
}
/**
- * static function to generate an instance of the class.
- * refer to the constructor for the param definitions.
+ * static function to generate an instance of the class. refer to the constructor for the param
+ * definitions.
*/
public static <T, V> FactorFilter<T, V> create(final String factorName,
final Filter<T, V> filter) {
diff --git a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
index f99fd4f..24ed874 100644
--- a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
+++ b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
@@ -27,8 +27,7 @@ public class CommonJobProperties {
public static final String JOB_TYPE = "type";
/**
- * Force a node to be a root node in a flow, even if there are other jobs
- * dependent on it.
+ * Force a node to be a root node in a flow, even if there are other jobs dependent on it.
*/
public static final String ROOT_NODE = "root.node";
@@ -48,8 +47,7 @@ public class CommonJobProperties {
public static final String RETRY_BACKOFF = "retry.backoff";
/**
- * Comma delimited list of email addresses for both failure and success
- * messages
+ * Comma delimited list of email addresses for both failure and success messages
*/
public static final String NOTIFY_EMAILS = "notify.emails";
@@ -103,8 +101,7 @@ public class CommonJobProperties {
public static final String JOB_ID = "azkaban.job.id";
/**
- * The execution id. This should be unique per flow, but may not be due to
- * restarts.
+ * The execution id. This should be unique per flow, but may not be due to restarts.
*/
public static final String EXEC_ID = "azkaban.flow.execid";
@@ -129,8 +126,7 @@ public class CommonJobProperties {
public static final String PROJECT_LAST_CHANGED_DATE = "azkaban.flow.projectlastchangeddate";
/**
- * The version of the project the flow is running. This may change if a forced
- * hotspot occurs.
+ * The version of the project the flow is running. This may change if a forced hotspot occurs.
*/
public static final String PROJECT_VERSION = "azkaban.flow.projectversion";
@@ -168,4 +164,4 @@ public class CommonJobProperties {
public static final String FLOW_START_TIMEZONE =
"azkaban.flow.start.timezone";
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/jmx/DisplayName.java b/azkaban-common/src/main/java/azkaban/jmx/DisplayName.java
index 7f7d1b2..683956e 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/DisplayName.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/DisplayName.java
@@ -24,8 +24,8 @@ import java.lang.annotation.Target;
import javax.management.DescriptorKey;
/**
- * DisplayName - This annotation allows to supply a display name for a method in
- * the MBean interface.
+ * DisplayName - This annotation allows to supply a display name for a method in the MBean
+ * interface.
*/
@Documented
@Target(ElementType.METHOD)
diff --git a/azkaban-common/src/main/java/azkaban/jmx/ParameterName.java b/azkaban-common/src/main/java/azkaban/jmx/ParameterName.java
index 36f75f1..a8deb04 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/ParameterName.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/ParameterName.java
@@ -16,8 +16,8 @@
package azkaban.jmx;
/**
- * ParameterName - This annotation allows to supply
- * a parameter name for a method in the MBean interface.
+ * ParameterName - This annotation allows to supply a parameter name for a method in the MBean
+ * interface.
*/
import java.lang.annotation.Documented;
diff --git a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
index 680db51..93fc766 100644
--- a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
+++ b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
@@ -15,8 +15,7 @@ import java.util.Collection;
import org.apache.log4j.Logger;
/**
- * Responsible for validating the job callback related properties at project
- * upload time
+ * Responsible for validating the job callback related properties at project upload time
*
* @author hluu
*/
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/Job.java b/azkaban-common/src/main/java/azkaban/jobExecutor/Job.java
index 02c72d9..9e2a10d 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/Job.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/Job.java
@@ -19,12 +19,9 @@ package azkaban.jobExecutor;
import azkaban.utils.Props;
/**
- * This interface defines a Raw Job interface. Each job defines
- * <ul>
- * <li>Job Type : {HADOOP, UNIX, JAVA, SUCCESS_TEST, CONTROLLER}</li>
- * <li>Job ID/Name : {String}</li>
- * <li>Arguments: Key/Value Map for Strings</li>
- * </ul>
+ * This interface defines a Raw Job interface. Each job defines <ul> <li>Job Type : {HADOOP, UNIX,
+ * JAVA, SUCCESS_TEST, CONTROLLER}</li> <li>Job ID/Name : {String}</li> <li>Arguments: Key/Value Map
+ * for Strings</li> </ul>
*
* A job is required to have a constructor Job(String jobId, Props props)
*/
@@ -37,8 +34,8 @@ public interface Job {
public String getId();
/**
- * Run the job. In general this method can only be run once. Must either
- * succeed or throw an exception.
+ * Run the job. In general this method can only be run once. Must either succeed or throw an
+ * exception.
*/
public void run() throws Exception;
@@ -50,8 +47,7 @@ public interface Job {
public void cancel() throws Exception;
/**
- * Returns a progress report between [0 - 1.0] to indicate the percentage
- * complete
+ * Returns a progress report between [0 - 1.0] to indicate the percentage complete
*
* @throws Exception If getting progress fails
*/
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/LongArgJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/LongArgJob.java
index 53488b0..405da38 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/LongArgJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/LongArgJob.java
@@ -26,8 +26,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
/**
- * A job that passes all the job properties as command line arguments in "long"
- * format, e.g. --key1 value1 --key2 value2 ...
+ * A job that passes all the job properties as command line arguments in "long" format, e.g. --key1
+ * value1 --key2 value2 ...
*/
public abstract class LongArgJob extends AbstractProcessJob {
@@ -101,8 +101,8 @@ public abstract class LongArgJob extends AbstractProcessJob {
}
/**
- * This gives access to the process builder used to construct the process. An
- * overriding class can use this to add to the command being executed.
+ * This gives access to the process builder used to construct the process. An overriding class can
+ * use this to add to the command being executed.
*/
protected AzkabanProcessBuilder getBuilder() {
return this.builder;
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 0a4a1fc..d059eeb 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -77,8 +77,8 @@ public class ProcessJob extends AbstractProcessJob {
}
/**
- * Splits the command into a unix like command line structure. Quotes and
- * single quotes are treated as nested strings.
+ * Splits the command into a unix like command line structure. Quotes and single quotes are
+ * treated as nested strings.
*/
public static String[] partitionCommandLine(final String command) {
final ArrayList<String> commands = new ArrayList<>();
@@ -360,8 +360,8 @@ public class ProcessJob extends AbstractProcessJob {
}
/**
- * Checks to see if user has write access to current working directory which many users
- * need for their jobs to store temporary data/jars on the executor.
+ * Checks to see if user has write access to current working directory which many users need for
+ * their jobs to store temporary data/jars on the executor.
*
* Accomplishes this by using execute-as-user to try to create an empty file in the cwd.
*
@@ -379,8 +379,8 @@ public class ProcessJob extends AbstractProcessJob {
}
/**
- * Changes permission on current working directory so that the directory is owned by the user
- * and the group remains azkaban.
+ * Changes permission on current working directory so that the directory is owned by the user and
+ * the group remains azkaban.
*
* Leverages execute-as-user with "root" as the user to run the command.
*
@@ -401,9 +401,9 @@ public class ProcessJob extends AbstractProcessJob {
}
/**
- * This is used to get the min/max memory size requirement by processes.
- * SystemMemoryInfo can use the info to determine if the memory request can be
- * fulfilled. For Java process, this should be Xms/Xmx setting.
+ * This is used to get the min/max memory size requirement by processes. SystemMemoryInfo can use
+ * the info to determine if the memory request can be fulfilled. For Java process, this should be
+ * Xms/Xmx setting.
*
* @return pair of min/max memory size
*/
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ScriptJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ScriptJob.java
index 845d13b..f41d0bf 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ScriptJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ScriptJob.java
@@ -21,9 +21,8 @@ import com.google.common.collect.ImmutableSet;
import org.apache.log4j.Logger;
/**
- * A script job issues a command of the form [EXECUTABLE] [SCRIPT] --key1 val1
- * ... --key2 val2 executable -- the interpretor command to execute script --
- * the script to pass in (requried)
+ * A script job issues a command of the form [EXECUTABLE] [SCRIPT] --key1 val1 ... --key2 val2
+ * executable -- the interpretor command to execute script -- the script to pass in (requried)
*/
public class ScriptJob extends LongArgJob {
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index 4175d58..480f43a 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -34,8 +34,7 @@ import org.apache.log4j.Logger;
/**
* An improved version of java.lang.Process.
*
- * Output is read by separate threads to avoid deadlock and logged to log4j
- * loggers.
+ * Output is read by separate threads to avoid deadlock and logged to log4j loggers.
*/
public class AzkabanProcess {
@@ -150,8 +149,7 @@ public class AzkabanProcess {
/**
* Await the start of this process
*
- * When this method returns, the job process has been created and a this.processId has been
- * set.
+ * When this method returns, the job process has been created and a this.processId has been set.
*
* @throws InterruptedException if the thread is interrupted while waiting.
*/
diff --git a/azkaban-common/src/main/java/azkaban/jobtype/JobTypePluginSet.java b/azkaban-common/src/main/java/azkaban/jobtype/JobTypePluginSet.java
index ab5cf93..aebf22e 100644
--- a/azkaban-common/src/main/java/azkaban/jobtype/JobTypePluginSet.java
+++ b/azkaban-common/src/main/java/azkaban/jobtype/JobTypePluginSet.java
@@ -23,11 +23,11 @@ import java.util.Map;
/**
* Container for job type plugins
*
- * This contains the jobClass objects, the properties for loading plugins, and
- * the properties given by default to the plugin.
+ * This contains the jobClass objects, the properties for loading plugins, and the properties given
+ * by default to the plugin.
*
- * This class is not thread safe, so adding to this class should only be
- * populated and controlled by the JobTypeManager
+ * This class is not thread safe, so adding to this class should only be populated and controlled by
+ * the JobTypeManager
*/
public class JobTypePluginSet {
@@ -94,8 +94,7 @@ public class JobTypePluginSet {
}
/**
- * Get the properties that will be given to the plugin as default job
- * properties.
+ * Get the properties that will be given to the plugin as default job properties.
*/
public Props getPluginJobProps(final String jobTypeName) {
return this.pluginJobPropsMap.get(jobTypeName);
diff --git a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
index 2427a4c..463ed34 100644
--- a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
+++ b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
@@ -96,10 +96,8 @@ public abstract class AbstractMetric<T> implements IMetric<T>, Cloneable {
}
/**
- * Method used to notify manager for a tracking event.
- * Metric is free to call this method as per implementation.
- * Timer based or Azkaban events are the most common implementation
- * {@inheritDoc}
+ * Method used to notify manager for a tracking event. Metric is free to call this method as per
+ * implementation. Timer based or Azkaban events are the most common implementation {@inheritDoc}
*
* @see azkaban.metric.IMetric#notifyManager()
*/
diff --git a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
index 34fce3a..cb2e487 100644
--- a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
@@ -50,8 +50,7 @@ public class GangliaMetricEmitter implements IMetricEmitter {
}
/**
- * Report metric by executing command line interface of gmetrics
- * {@inheritDoc}
+ * Report metric by executing command line interface of gmetrics {@inheritDoc}
*
* @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
*/
diff --git a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
index 66a66bf..e222c32 100644
--- a/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
+++ b/azkaban-common/src/main/java/azkaban/metric/inmemoryemitter/InMemoryMetricEmitter.java
@@ -32,8 +32,8 @@ import org.apache.log4j.Logger;
/**
- * Metric Emitter which maintains in memory snapshots of the metrics
- * This is also the default metric emitter and used by /stats servlet
+ * Metric Emitter which maintains in memory snapshots of the metrics This is also the default metric
+ * emitter and used by /stats servlet
*/
public class InMemoryMetricEmitter implements IMetricEmitter {
@@ -83,8 +83,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
}
/**
- * Ingest metric in snapshot data structure while maintaining interval
- * {@inheritDoc}
+ * Ingest metric in snapshot data structure while maintaining interval {@inheritDoc}
*
* @see azkaban.metric.IMetricEmitter#reportMetric(azkaban.metric.IMetric)
*/
@@ -231,8 +230,7 @@ public class InMemoryMetricEmitter implements IMetricEmitter {
}
/**
- * Clear snapshot data structure
- * {@inheritDoc}
+ * Clear snapshot data structure {@inheritDoc}
*
* @see azkaban.metric.IMetricEmitter#purgeAllData()
*/
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
index ab71461..d29eba4 100644
--- a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -25,13 +25,10 @@ import org.apache.log4j.Logger;
/**
- * Manager for access or updating metric related functionality of Azkaban
- * MetricManager is responsible all handling all action requests from statsServlet in Exec server
- * <p> Metric Manager 'has a' relationship with :-
- * <ul>
- * <li>all the metric Azkaban is tracking</li>
- * <li>all the emitters Azkaban is supposed to report metrics</li>
- * </ul></p>
+ * Manager for access or updating metric related functionality of Azkaban MetricManager is
+ * responsible all handling all action requests from statsServlet in Exec server <p> Metric Manager
+ * 'has a' relationship with :- <ul> <li>all the metric Azkaban is tracking</li> <li>all the
+ * emitters Azkaban is supposed to report metrics</li> </ul></p>
*/
public class MetricReportManager {
@@ -44,14 +41,14 @@ public class MetricReportManager {
private static volatile MetricReportManager instance = null;
private static volatile boolean isManagerEnabled;
/**
- * List of all the metrics that Azkaban is tracking
- * Manager is not concerned with type of metric as long as it honors IMetric contracts
+ * List of all the metrics that Azkaban is tracking Manager is not concerned with type of metric
+ * as long as it honors IMetric contracts
*/
private final List<IMetric<?>> metrics;
/**
- * List of all the emitter listening all the metrics
- * Manager is not concerned with how emitter is reporting value.
- * Manager is only responsible to notify all emitters whenever an IMetric wants to be notified
+ * List of all the emitter listening all the metrics Manager is not concerned with how emitter is
+ * reporting value. Manager is only responsible to notify all emitters whenever an IMetric wants
+ * to be notified
*/
private final List<IMetricEmitter> metricEmitters;
private final ExecutorService executorService;
@@ -216,8 +213,7 @@ public class MetricReportManager {
}
/**
- * Shutdown execution service
- * {@inheritDoc}
+ * Shutdown execution service {@inheritDoc}
*
* @see java.lang.Object#finalize()
*/
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 7a62928..7e91f7a 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -22,9 +22,9 @@ import com.google.inject.Singleton;
import java.util.concurrent.atomic.AtomicLong;
/**
- * This singleton class CommonMetrics is in charge of collecting varieties of metrics
- * which are accessed in both web and exec modules. That said, these metrics will be
- * exposed in both Web server and executor.
+ * This singleton class CommonMetrics is in charge of collecting varieties of metrics which are
+ * accessed in both web and exec modules. That said, these metrics will be exposed in both Web
+ * server and executor.
*/
@Singleton
public class CommonMetrics {
@@ -71,8 +71,8 @@ public class CommonMetrics {
}
/**
- * Mark flowFailMeter when a flow is considered as FAILED.
- * This method could be called by Web Server or Executor, as they both detect flow failure.
+ * Mark flowFailMeter when a flow is considered as FAILED. This method could be called by Web
+ * Server or Executor, as they both detect flow failure.
*/
public void markFlowFail() {
this.flowFailMeter.mark();
diff --git a/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java b/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
index de6981e..b5760fc 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
@@ -57,8 +57,8 @@ public class MetricsManager {
}
/**
- * A {@link Meter} measures the rate of events over time (e.g., “requests per second”).
- * Here we track 1-minute moving averages.
+ * A {@link Meter} measures the rate of events over time (e.g., “requests per second”). Here we
+ * track 1-minute moving averages.
*/
public Meter addMeter(final String name) {
final Meter curr = this.registry.meter(name);
@@ -79,9 +79,8 @@ public class MetricsManager {
}
/**
- * reporting metrics to remote metrics collector.
- * Note: this method must be synchronized, since both web server and executor
- * will call it during initialization.
+ * reporting metrics to remote metrics collector. Note: this method must be synchronized, since
+ * both web server and executor will call it during initialization.
*/
public synchronized void startReporting(final String reporterName, final Props props) {
final String metricsReporterClassName = props.get(CUSTOM_METRICS_REPORTER_CLASS_NAME);
@@ -98,7 +97,7 @@ public class MetricsManager {
log.error("Encountered error while loading and instantiating "
+ metricsReporterClassName, e);
throw new IllegalStateException("Encountered error while loading and instantiating "
- + metricsReporterClassName, e);
+ + metricsReporterClassName, e);
}
} else {
log.error(String.format("No value for property: %s or %s was found",
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
index 16b9bf0..f6fa3e1 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
@@ -39,6 +39,7 @@ import org.apache.commons.dbutils.ResultSetHandler;
class JdbcProjectHandlerSet {
public static class ProjectResultHandler implements ResultSetHandler<List<Project>> {
+
public static String SELECT_PROJECT_BY_NAME =
"SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=?";
@@ -109,12 +110,15 @@ class JdbcProjectHandlerSet {
}
}
- public static class ProjectPermissionsResultHandler implements ResultSetHandler<List<Triple<String, Boolean, Permission>>> {
+ public static class ProjectPermissionsResultHandler implements
+ ResultSetHandler<List<Triple<String, Boolean, Permission>>> {
+
public static String SELECT_PROJECT_PERMISSION =
"SELECT project_id, modified_time, name, permissions, isGroup FROM project_permissions WHERE project_id=?";
@Override
- public List<Triple<String, Boolean, Permission>> handle(final ResultSet rs) throws SQLException {
+ public List<Triple<String, Boolean, Permission>> handle(final ResultSet rs)
+ throws SQLException {
if (!rs.next()) {
return Collections.emptyList();
}
@@ -134,6 +138,7 @@ class JdbcProjectHandlerSet {
}
public static class ProjectFlowsResultHandler implements ResultSetHandler<List<Flow>> {
+
public static String SELECT_PROJECT_FLOW =
"SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=? AND flow_id=?";
@@ -182,7 +187,9 @@ class JdbcProjectHandlerSet {
}
}
- public static class ProjectPropertiesResultsHandler implements ResultSetHandler<List<Pair<String, Props>>> {
+ public static class ProjectPropertiesResultsHandler implements
+ ResultSetHandler<List<Pair<String, Props>>> {
+
public static String SELECT_PROJECT_PROPERTY =
"SELECT project_id, version, name, modified_time, encoding_type, property FROM project_properties WHERE project_id=? AND version=? AND name=?";
@@ -225,6 +232,7 @@ class JdbcProjectHandlerSet {
}
public static class ProjectLogsResultHandler implements ResultSetHandler<List<ProjectLogEvent>> {
+
public static String SELECT_PROJECT_EVENTS_ORDER =
"SELECT project_id, event_type, event_time, username, message FROM project_events WHERE project_id=? ORDER BY event_time DESC LIMIT ? OFFSET ?";
@@ -243,7 +251,8 @@ class JdbcProjectHandlerSet {
final String message = rs.getString(5);
final ProjectLogEvent event =
- new ProjectLogEvent(projectId, ProjectLogEvent.EventType.fromInteger(eventType), eventTime, username,
+ new ProjectLogEvent(projectId, ProjectLogEvent.EventType.fromInteger(eventType),
+ eventTime, username,
message);
events.add(event);
} while (rs.next());
@@ -253,6 +262,7 @@ class JdbcProjectHandlerSet {
}
public static class ProjectFileChunkResultHandler implements ResultSetHandler<List<byte[]>> {
+
public static String SELECT_PROJECT_CHUNKS_FILE =
"SELECT project_id, version, chunk, size, file FROM project_files WHERE project_id=? AND version=? AND chunk >= ? AND chunk < ? ORDER BY chunk ASC";
@@ -273,7 +283,9 @@ class JdbcProjectHandlerSet {
}
}
- public static class ProjectVersionResultHandler implements ResultSetHandler<List<ProjectFileHandler>> {
+ public static class ProjectVersionResultHandler implements
+ ResultSetHandler<List<ProjectFileHandler>> {
+
public static String SELECT_PROJECT_VERSION =
"SELECT project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, resource_id "
+ "FROM project_versions WHERE project_id=? AND version=?";
@@ -297,7 +309,8 @@ class JdbcProjectHandlerSet {
final String resourceId = rs.getString(9);
final ProjectFileHandler handler =
- new ProjectFileHandler(projectId, version, uploadTime, uploader, fileType, fileName, numChunks, md5,
+ new ProjectFileHandler(projectId, version, uploadTime, uploader, fileType, fileName,
+ numChunks, md5,
resourceId);
handlers.add(handler);
@@ -308,6 +321,7 @@ class JdbcProjectHandlerSet {
}
public static class IntHandler implements ResultSetHandler<Integer> {
+
public static String SELECT_LATEST_VERSION = "SELECT MAX(version) FROM project_versions WHERE project_id=?";
@Override
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
index 8e28396..99eef86 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
@@ -59,12 +59,13 @@ import org.apache.log4j.Logger;
/**
- * This class implements ProjectLoader using new azkaban-db code to allow DB failover.
- * TODO kunkun-tang: This class is too long. In future, we should split {@link ProjectLoader} interface
+ * This class implements ProjectLoader using new azkaban-db code to allow DB failover. TODO
+ * kunkun-tang: This class is too long. In future, we should split {@link ProjectLoader} interface
* and have multiple short class implementations.
*/
@Singleton
public class JdbcProjectImpl implements ProjectLoader {
+
private static final Logger logger = Logger.getLogger(JdbcProjectImpl.class);
private static final int CHUCK_SIZE = 1024 * 1024 * 10;
@@ -106,7 +107,8 @@ public class JdbcProjectImpl implements ProjectLoader {
return projects;
}
- private void setProjectPermission(final Project project, final Triple<String, Boolean, Permission> perm) {
+ private void setProjectPermission(final Project project,
+ final Triple<String, Boolean, Permission> perm) {
if (perm.getSecond()) {
project.setGroupPermission(perm.getFirst(), perm.getThird());
} else {
@@ -154,7 +156,8 @@ public class JdbcProjectImpl implements ProjectLoader {
List<Project> projects = this.dbOperator
.query(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME, handler, name);
if (projects.isEmpty()) {
- projects = this.dbOperator.query(ProjectResultHandler.SELECT_PROJECT_BY_NAME, handler, name);
+ projects = this.dbOperator
+ .query(ProjectResultHandler.SELECT_PROJECT_BY_NAME, handler, name);
if (projects.isEmpty()) {
throw new ProjectManagerException("No project with name " + name + " exists in db.");
}
@@ -167,12 +170,14 @@ public class JdbcProjectImpl implements ProjectLoader {
}
} catch (final SQLException ex) {
logger.error(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME + " failed.", ex);
- throw new ProjectManagerException(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME + " failed.", ex);
+ throw new ProjectManagerException(
+ ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME + " failed.", ex);
}
return project;
}
- private List<Triple<String, Boolean, Permission>> fetchPermissionsForProject(final Project project)
+ private List<Triple<String, Boolean, Permission>> fetchPermissionsForProject(
+ final Project project)
throws ProjectManagerException {
final ProjectPermissionsResultHandler permHander = new ProjectPermissionsResultHandler();
@@ -180,10 +185,12 @@ public class JdbcProjectImpl implements ProjectLoader {
try {
permissions =
this.dbOperator
- .query(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION, permHander, project.getId());
+ .query(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION, permHander,
+ project.getId());
} catch (final SQLException ex) {
logger.error(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION + " failed.", ex);
- throw new ProjectManagerException("Query for permissions for " + project.getName() + " failed.", ex);
+ throw new ProjectManagerException(
+ "Query for permissions for " + project.getName() + " failed.", ex);
}
return permissions;
}
@@ -191,11 +198,11 @@ public class JdbcProjectImpl implements ProjectLoader {
/**
* Creates a Project in the db.
*
- * It will throw an exception if it finds an active project of the same name,
- * or the SQL fails
+ * It will throw an exception if it finds an active project of the same name, or the SQL fails
*/
@Override
- public synchronized Project createNewProject(final String name, final String description, final User creator)
+ public synchronized Project createNewProject(final String name, final String description,
+ final User creator)
throws ProjectManagerException {
final ProjectResultHandler handler = new ProjectResultHandler();
@@ -204,7 +211,8 @@ public class JdbcProjectImpl implements ProjectLoader {
final List<Project> projects = this.dbOperator
.query(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME, handler, name);
if (!projects.isEmpty()) {
- throw new ProjectManagerException("Active project with name " + name + " already exists in db.");
+ throw new ProjectManagerException(
+ "Active project with name " + name + " already exists in db.");
}
} catch (final SQLException ex) {
logger.error(ex);
@@ -215,8 +223,9 @@ public class JdbcProjectImpl implements ProjectLoader {
"INSERT INTO projects ( name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob) values (?,?,?,?,?,?,?,?,?)";
final SQLTransaction<Integer> insertProject = transOperator -> {
final long time = System.currentTimeMillis();
- return transOperator.update(INSERT_PROJECT, name, true, time, time, null, creator.getUserId(), description,
- this.defaultEncodingType.getNumVal(), null);
+ return transOperator
+ .update(INSERT_PROJECT, name, true, time, time, null, creator.getUserId(), description,
+ this.defaultEncodingType.getNumVal(), null);
};
// Insert project
@@ -227,17 +236,20 @@ public class JdbcProjectImpl implements ProjectLoader {
}
} catch (final SQLException ex) {
logger.error(INSERT_PROJECT + " failed.", ex);
- throw new ProjectManagerException("Insert project" + name + " for existing project failed. ", ex);
+ throw new ProjectManagerException("Insert project" + name + " for existing project failed. ",
+ ex);
}
return fetchProjectByName(name);
}
@Override
- public void uploadProjectFile(final int projectId, final int version, final File localFile, final String uploader)
+ public void uploadProjectFile(final int projectId, final int version, final File localFile,
+ final String uploader)
throws ProjectManagerException {
final long startMs = System.currentTimeMillis();
- logger.info(String.format("Uploading Project ID: %d file: %s [%d bytes]", projectId, localFile.getName(),
- localFile.length()));
+ logger.info(String
+ .format("Uploading Project ID: %d file: %s [%d bytes]", projectId, localFile.getName(),
+ localFile.length()));
/*
* The below transaction uses one connection to do all operations. Ideally, we should commit
@@ -249,7 +261,8 @@ public class JdbcProjectImpl implements ProjectLoader {
final SQLTransaction<Integer> uploadProjectFileTransaction = transOperator -> {
/* Step 1: Update DB with new project info */
- addProjectToProjectVersions(transOperator, projectId, version, localFile, uploader, computeHash(localFile), null);
+ addProjectToProjectVersions(transOperator, projectId, version, localFile, uploader,
+ computeHash(localFile), null);
transOperator.getConnection().commit();
/* Step 2: Upload File in chunks to DB */
@@ -268,7 +281,8 @@ public class JdbcProjectImpl implements ProjectLoader {
}
final long duration = (System.currentTimeMillis() - startMs) / 1000;
- logger.info(String.format("Uploaded Project ID: %d file: %s [%d bytes] in %d sec", projectId, localFile.getName(),
+ logger.info(String.format("Uploaded Project ID: %d file: %s [%d bytes] in %d sec", projectId,
+ localFile.getName(),
localFile.length(), duration));
}
@@ -297,7 +311,8 @@ public class JdbcProjectImpl implements ProjectLoader {
// when one transaction completes, it automatically commits.
final SQLTransaction<Integer> transaction = transOperator -> {
- addProjectToProjectVersions(transOperator, projectId, version, localFile, uploader, md5, resourceId);
+ addProjectToProjectVersions(transOperator, projectId, version, localFile, uploader, md5,
+ resourceId);
return 1;
};
try {
@@ -311,21 +326,20 @@ public class JdbcProjectImpl implements ProjectLoader {
/**
* Insert a new version record to TABLE project_versions before uploading files.
*
- * The reason for this operation:
- * When error chunking happens in remote mysql server, incomplete file data remains
- * in DB, and an SQL exception is thrown. If we don't have this operation before uploading file,
- * the SQL exception prevents AZ from creating the new version record in Table project_versions.
- * However, the Table project_files still reserve the incomplete files, which causes troubles
- * when uploading a new file: Since the version in TABLE project_versions is still old, mysql will stop
- * inserting new files to db.
+ * The reason for this operation: When error chunking happens in remote mysql server, incomplete
+ * file data remains in DB, and an SQL exception is thrown. If we don't have this operation before
+ * uploading file, the SQL exception prevents AZ from creating the new version record in Table
+ * project_versions. However, the Table project_files still reserve the incomplete files, which
+ * causes troubles when uploading a new file: Since the version in TABLE project_versions is still
+ * old, mysql will stop inserting new files to db.
*
- * Why this operation is safe:
- * When AZ uploads a new zip file, it always fetches the latest version proj_v from TABLE project_version,
- * proj_v+1 will be used as the new version for the uploading files.
+ * Why this operation is safe: When AZ uploads a new zip file, it always fetches the latest
+ * version proj_v from TABLE project_version, proj_v+1 will be used as the new version for the
+ * uploading files.
*
- * Assume error chunking happens on day 1. proj_v is created for this bad file (old file version + 1).
- * When we upload a new project zip in day2, new file in day 2 will use the new version (proj_v + 1).
- * When file uploading completes, AZ will clean all old chunks in DB afterward.
+ * Assume error chunking happens on day 1. proj_v is created for this bad file (old file version +
+ * 1). When we upload a new project zip in day2, new file in day 2 will use the new version
+ * (proj_v + 1). When file uploading completes, AZ will clean all old chunks in DB afterward.
*/
private void addProjectToProjectVersions(
final DatabaseTransOperator transOperator,
@@ -348,13 +362,15 @@ public class JdbcProjectImpl implements ProjectLoader {
transOperator.update(INSERT_PROJECT_VERSION, projectId, version, updateTime, uploader,
Files.getFileExtension(localFile.getName()), localFile.getName(), md5, 0, resourceId);
} catch (final SQLException e) {
- final String msg = String.format("Error initializing project id: %d version: %d ", projectId, version);
+ final String msg = String
+ .format("Error initializing project id: %d version: %d ", projectId, version);
logger.error(msg, e);
throw new ProjectManagerException(msg, e);
}
}
- private int uploadFileInChunks(final DatabaseTransOperator transOperator, final int projectId, final int version, final File localFile)
+ private int uploadFileInChunks(final DatabaseTransOperator transOperator, final int projectId,
+ final int version, final File localFile)
throws ProjectManagerException {
// Really... I doubt we'll get a > 2gig file. So int casting it is!
@@ -394,7 +410,9 @@ public class JdbcProjectImpl implements ProjectLoader {
}
} catch (final IOException e) {
throw new ProjectManagerException(
- String.format("Error chunking file. projectId: %d, version: %d, file:%s[%d bytes], chunk: %d", projectId,
+ String.format(
+ "Error chunking file. projectId: %d, version: %d, file:%s[%d bytes], chunk: %d",
+ projectId,
version, localFile.getName(), localFile.length(), chunk));
} finally {
IOUtils.closeQuietly(bufferedStream);
@@ -405,7 +423,8 @@ public class JdbcProjectImpl implements ProjectLoader {
/**
* we update num_chunks's actual number to db here.
*/
- private void updateChunksInProjectVersions(final DatabaseTransOperator transOperator, final int projectId, final int version, final int chunk)
+ private void updateChunksInProjectVersions(final DatabaseTransOperator transOperator,
+ final int projectId, final int version, final int chunk)
throws ProjectManagerException {
final String UPDATE_PROJECT_NUM_CHUNKS =
@@ -415,7 +434,8 @@ public class JdbcProjectImpl implements ProjectLoader {
transOperator.getConnection().commit();
} catch (final SQLException e) {
logger.error("Error updating project " + projectId + " : chunk_num " + chunk, e);
- throw new ProjectManagerException("Error updating project " + projectId + " : chunk_num " + chunk, e);
+ throw new ProjectManagerException(
+ "Error updating project " + projectId + " : chunk_num " + chunk, e);
}
}
@@ -425,19 +445,22 @@ public class JdbcProjectImpl implements ProjectLoader {
try {
final List<ProjectFileHandler> projectFiles =
this.dbOperator
- .query(ProjectVersionResultHandler.SELECT_PROJECT_VERSION, pfHandler, projectId, version);
+ .query(ProjectVersionResultHandler.SELECT_PROJECT_VERSION, pfHandler, projectId,
+ version);
if (projectFiles == null || projectFiles.isEmpty()) {
return null;
}
return projectFiles.get(0);
} catch (final SQLException ex) {
logger.error("Query for uploaded file for project id " + projectId + " failed.", ex);
- throw new ProjectManagerException("Query for uploaded file for project id " + projectId + " failed.", ex);
+ throw new ProjectManagerException(
+ "Query for uploaded file for project id " + projectId + " failed.", ex);
}
}
@Override
- public ProjectFileHandler getUploadedFile(final int projectId, final int version) throws ProjectManagerException {
+ public ProjectFileHandler getUploadedFile(final int projectId, final int version)
+ throws ProjectManagerException {
final ProjectFileHandler projHandler = fetchProjectMetaData(projectId, version);
if (projHandler == null) {
return null;
@@ -447,7 +470,8 @@ public class JdbcProjectImpl implements ProjectLoader {
File file;
try {
try {
- file = File.createTempFile(projHandler.getFileName(), String.valueOf(version), this.tempDir);
+ file = File
+ .createTempFile(projHandler.getFileName(), String.valueOf(version), this.tempDir);
bStream = new BufferedOutputStream(new FileOutputStream(file));
} catch (final IOException e) {
throw new ProjectManagerException("Error creating temp file for stream.");
@@ -461,11 +485,13 @@ public class JdbcProjectImpl implements ProjectLoader {
List<byte[]> data = null;
try {
data = this.dbOperator
- .query(ProjectFileChunkResultHandler.SELECT_PROJECT_CHUNKS_FILE, chunkHandler, projectId,
- version, fromChunk, toChunk);
+ .query(ProjectFileChunkResultHandler.SELECT_PROJECT_CHUNKS_FILE, chunkHandler,
+ projectId,
+ version, fromChunk, toChunk);
} catch (final SQLException e) {
logger.error(e);
- throw new ProjectManagerException("Query for uploaded file for " + projectId + " failed.", e);
+ throw new ProjectManagerException("Query for uploaded file for " + projectId + " failed.",
+ e);
}
try {
@@ -503,7 +529,8 @@ public class JdbcProjectImpl implements ProjectLoader {
}
@Override
- public void changeProjectVersion(final Project project, final int version, final String user) throws ProjectManagerException {
+ public void changeProjectVersion(final Project project, final int version, final String user)
+ throws ProjectManagerException {
final long timestamp = System.currentTimeMillis();
try {
final String UPDATE_PROJECT_VERSION =
@@ -515,12 +542,14 @@ public class JdbcProjectImpl implements ProjectLoader {
project.setLastModifiedUser(user);
} catch (final SQLException e) {
logger.error("Error updating switching project version " + project.getName(), e);
- throw new ProjectManagerException("Error updating switching project version " + project.getName(), e);
+ throw new ProjectManagerException(
+ "Error updating switching project version " + project.getName(), e);
}
}
@Override
- public void updatePermission(final Project project, final String name, final Permission perm, final boolean isGroup)
+ public void updatePermission(final Project project, final String name, final Permission perm,
+ final boolean isGroup)
throws ProjectManagerException {
final long updateTime = System.currentTimeMillis();
@@ -529,15 +558,20 @@ public class JdbcProjectImpl implements ProjectLoader {
final String INSERT_PROJECT_PERMISSION =
"INSERT INTO project_permissions (project_id, modified_time, name, permissions, isGroup) values (?,?,?,?,?)"
+ "ON DUPLICATE KEY UPDATE modified_time = VALUES(modified_time), permissions = VALUES(permissions)";
- this.dbOperator.update(INSERT_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(), isGroup);
+ this.dbOperator
+ .update(INSERT_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(),
+ isGroup);
} else {
final String MERGE_PROJECT_PERMISSION =
"MERGE INTO project_permissions (project_id, modified_time, name, permissions, isGroup) KEY (project_id, name) values (?,?,?,?,?)";
- this.dbOperator.update(MERGE_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(), isGroup);
+ this.dbOperator
+ .update(MERGE_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(),
+ isGroup);
}
} catch (final SQLException ex) {
logger.error("Error updating project permission", ex);
- throw new ProjectManagerException("Error updating project " + project.getName() + " permissions for " + name, ex);
+ throw new ProjectManagerException(
+ "Error updating project " + project.getName() + " permissions for " + name, ex);
}
if (isGroup) {
@@ -560,7 +594,8 @@ public class JdbcProjectImpl implements ProjectLoader {
return data;
}
- private void updateProjectSettings(final Project project, final EncodingType encType) throws ProjectManagerException {
+ private void updateProjectSettings(final Project project, final EncodingType encType)
+ throws ProjectManagerException {
final String UPDATE_PROJECT_SETTINGS = "UPDATE projects SET enc_type=?, settings_blob=? WHERE id=?";
final String json = JSONUtils.toJSON(project.toObject());
@@ -582,14 +617,16 @@ public class JdbcProjectImpl implements ProjectLoader {
}
@Override
- public void removePermission(final Project project, final String name, final boolean isGroup) throws ProjectManagerException {
+ public void removePermission(final Project project, final String name, final boolean isGroup)
+ throws ProjectManagerException {
final String DELETE_PROJECT_PERMISSION =
"DELETE FROM project_permissions WHERE project_id=? AND name=? AND isGroup=?";
try {
this.dbOperator.update(DELETE_PROJECT_PERMISSION, project.getId(), name, isGroup);
} catch (final SQLException e) {
logger.error("remove Permission failed.", e);
- throw new ProjectManagerException("Error deleting project " + project.getName() + " permissions for " + name, e);
+ throw new ProjectManagerException(
+ "Error deleting project " + project.getName() + " permissions for " + name, e);
}
if (isGroup) {
@@ -600,7 +637,8 @@ public class JdbcProjectImpl implements ProjectLoader {
}
@Override
- public List<Triple<String, Boolean, Permission>> getProjectPermissions(final Project project) throws ProjectManagerException {
+ public List<Triple<String, Boolean, Permission>> getProjectPermissions(final Project project)
+ throws ProjectManagerException {
return fetchPermissionsForProject(project);
}
@@ -609,7 +647,8 @@ public class JdbcProjectImpl implements ProjectLoader {
* We should rewrite the code to follow the literal meanings.
*/
@Override
- public void removeProject(final Project project, final String user) throws ProjectManagerException {
+ public void removeProject(final Project project, final String user)
+ throws ProjectManagerException {
final long updateTime = System.currentTimeMillis();
final String UPDATE_INACTIVE_PROJECT =
@@ -623,13 +662,15 @@ public class JdbcProjectImpl implements ProjectLoader {
}
@Override
- public boolean postEvent(final Project project, final EventType type, final String user, final String message) {
+ public boolean postEvent(final Project project, final EventType type, final String user,
+ final String message) {
final String INSERT_PROJECT_EVENTS =
"INSERT INTO project_events (project_id, event_type, event_time, username, message) values (?,?,?,?,?)";
final long updateTime = System.currentTimeMillis();
try {
this.dbOperator
- .update(INSERT_PROJECT_EVENTS, project.getId(), type.getNumVal(), updateTime, user, message);
+ .update(INSERT_PROJECT_EVENTS, project.getId(), type.getNumVal(), updateTime, user,
+ message);
} catch (final SQLException e) {
logger.error("post event failed,", e);
return false;
@@ -638,13 +679,15 @@ public class JdbcProjectImpl implements ProjectLoader {
}
@Override
- public List<ProjectLogEvent> getProjectEvents(final Project project, final int num, final int skip) throws ProjectManagerException {
+ public List<ProjectLogEvent> getProjectEvents(final Project project, final int num,
+ final int skip) throws ProjectManagerException {
final ProjectLogsResultHandler logHandler = new ProjectLogsResultHandler();
List<ProjectLogEvent> events = null;
try {
events = this.dbOperator
- .query(ProjectLogsResultHandler.SELECT_PROJECT_EVENTS_ORDER, logHandler, project.getId(), num,
- skip);
+ .query(ProjectLogsResultHandler.SELECT_PROJECT_EVENTS_ORDER, logHandler, project.getId(),
+ num,
+ skip);
} catch (final SQLException e) {
logger.error("Error getProjectEvents, project " + project.getName(), e);
throw new ProjectManagerException("Error getProjectEvents, project " + project.getName(), e);
@@ -654,18 +697,21 @@ public class JdbcProjectImpl implements ProjectLoader {
}
@Override
- public void updateDescription(final Project project, final String description, final String user) throws ProjectManagerException {
+ public void updateDescription(final Project project, final String description, final String user)
+ throws ProjectManagerException {
final String UPDATE_PROJECT_DESCRIPTION =
"UPDATE projects SET description=?,modified_time=?,last_modified_by=? WHERE id=?";
final long updateTime = System.currentTimeMillis();
try {
- this.dbOperator.update(UPDATE_PROJECT_DESCRIPTION, description, updateTime, user, project.getId());
+ this.dbOperator
+ .update(UPDATE_PROJECT_DESCRIPTION, description, updateTime, user, project.getId());
project.setDescription(description);
project.setLastModifiedTimestamp(updateTime);
project.setLastModifiedUser(user);
} catch (final SQLException e) {
logger.error(e);
- throw new ProjectManagerException("Error update Description, project " + project.getName(), e);
+ throw new ProjectManagerException("Error update Description, project " + project.getName(),
+ e);
}
}
@@ -676,12 +722,14 @@ public class JdbcProjectImpl implements ProjectLoader {
return this.dbOperator.query(IntHandler.SELECT_LATEST_VERSION, handler, project.getId());
} catch (final SQLException e) {
logger.error(e);
- throw new ProjectManagerException("Error marking project " + project.getName() + " as inactive", e);
+ throw new ProjectManagerException(
+ "Error marking project " + project.getName() + " as inactive", e);
}
}
@Override
- public void uploadFlows(final Project project, final int version, final Collection<Flow> flows) throws ProjectManagerException {
+ public void uploadFlows(final Project project, final int version, final Collection<Flow> flows)
+ throws ProjectManagerException {
// We do one at a time instead of batch... because well, the batch could be
// large.
logger.info("Uploading flows");
@@ -695,7 +743,8 @@ public class JdbcProjectImpl implements ProjectLoader {
}
@Override
- public void uploadFlow(final Project project, final int version, final Flow flow) throws ProjectManagerException {
+ public void uploadFlow(final Project project, final int version, final Flow flow)
+ throws ProjectManagerException {
logger.info("Uploading flow " + flow.getId());
try {
uploadFlow(project, version, flow, this.defaultEncodingType);
@@ -705,7 +754,8 @@ public class JdbcProjectImpl implements ProjectLoader {
}
@Override
- public void updateFlow(final Project project, final int version, final Flow flow) throws ProjectManagerException {
+ public void updateFlow(final Project project, final int version, final Flow flow)
+ throws ProjectManagerException {
logger.info("Uploading flow " + flow.getId());
try {
final String json = JSONUtils.toJSON(flow.toObject());
@@ -715,7 +765,8 @@ public class JdbcProjectImpl implements ProjectLoader {
"UPDATE project_flows SET encoding_type=?,json=? WHERE project_id=? AND version=? AND flow_id=?";
try {
this.dbOperator
- .update(UPDATE_FLOW, this.defaultEncodingType.getNumVal(), data, project.getId(), version, flow.getId());
+ .update(UPDATE_FLOW, this.defaultEncodingType.getNumVal(), data, project.getId(),
+ version, flow.getId());
} catch (final SQLException e) {
logger.error("Error inserting flow", e);
throw new ProjectManagerException("Error inserting flow " + flow.getId(), e);
@@ -725,7 +776,8 @@ public class JdbcProjectImpl implements ProjectLoader {
}
}
- private void uploadFlow(final Project project, final int version, final Flow flow, final EncodingType encType)
+ private void uploadFlow(final Project project, final int version, final Flow flow,
+ final EncodingType encType)
throws ProjectManagerException, IOException {
final String json = JSONUtils.toJSON(flow.toObject());
final byte[] data = convertJsonToBytes(encType, json);
@@ -734,8 +786,9 @@ public class JdbcProjectImpl implements ProjectLoader {
final String INSERT_FLOW =
"INSERT INTO project_flows (project_id, version, flow_id, modified_time, encoding_type, json) values (?,?,?,?,?,?)";
try {
- this.dbOperator.update(INSERT_FLOW, project.getId(), version, flow.getId(), System.currentTimeMillis(),
- encType.getNumVal(), data);
+ this.dbOperator
+ .update(INSERT_FLOW, project.getId(), version, flow.getId(), System.currentTimeMillis(),
+ encType.getNumVal(), data);
} catch (final SQLException e) {
logger.error("Error inserting flow", e);
throw new ProjectManagerException("Error inserting flow " + flow.getId(), e);
@@ -754,16 +807,18 @@ public class JdbcProjectImpl implements ProjectLoader {
try {
flows = this.dbOperator
.query(ProjectFlowsResultHandler.SELECT_ALL_PROJECT_FLOWS, handler, project.getId(),
- project.getVersion());
+ project.getVersion());
} catch (final SQLException e) {
throw new ProjectManagerException(
- "Error fetching flows from project " + project.getName() + " version " + project.getVersion(), e);
+ "Error fetching flows from project " + project.getName() + " version " + project
+ .getVersion(), e);
}
return flows;
}
@Override
- public void uploadProjectProperties(final Project project, final List<Props> properties) throws ProjectManagerException {
+ public void uploadProjectProperties(final Project project, final List<Props> properties)
+ throws ProjectManagerException {
for (final Props props : properties) {
try {
uploadProjectProperty(project, props.getSource(), props);
@@ -774,7 +829,8 @@ public class JdbcProjectImpl implements ProjectLoader {
}
@Override
- public void uploadProjectProperty(final Project project, final Props props) throws ProjectManagerException {
+ public void uploadProjectProperty(final Project project, final Props props)
+ throws ProjectManagerException {
try {
uploadProjectProperty(project, props.getSource(), props);
} catch (final IOException e) {
@@ -783,7 +839,8 @@ public class JdbcProjectImpl implements ProjectLoader {
}
@Override
- public void updateProjectProperty(final Project project, final Props props) throws ProjectManagerException {
+ public void updateProjectProperty(final Project project, final Props props)
+ throws ProjectManagerException {
try {
updateProjectProperty(project, props.getSource(), props);
} catch (final IOException e) {
@@ -813,7 +870,8 @@ public class JdbcProjectImpl implements ProjectLoader {
final byte[] propsData = getBytes(props);
try {
- this.dbOperator.update(INSERT_PROPERTIES, project.getId(), project.getVersion(), name, System.currentTimeMillis(),
+ this.dbOperator.update(INSERT_PROPERTIES, project.getId(), project.getVersion(), name,
+ System.currentTimeMillis(),
this.defaultEncodingType.getNumVal(), propsData);
} catch (final SQLException e) {
throw new ProjectManagerException(
@@ -832,38 +890,45 @@ public class JdbcProjectImpl implements ProjectLoader {
}
@Override
- public Props fetchProjectProperty(final int projectId, final int projectVer, final String propsName) throws ProjectManagerException {
+ public Props fetchProjectProperty(final int projectId, final int projectVer,
+ final String propsName) throws ProjectManagerException {
final ProjectPropertiesResultsHandler handler = new ProjectPropertiesResultsHandler();
try {
final List<Pair<String, Props>> properties =
this.dbOperator
- .query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTY, handler, projectId, projectVer,
- propsName);
+ .query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTY, handler, projectId,
+ projectVer,
+ propsName);
if (properties == null || properties.isEmpty()) {
- logger.warn("Project " + projectId + " version " + projectVer + " property " + propsName + " is empty.");
+ logger.warn("Project " + projectId + " version " + projectVer + " property " + propsName
+ + " is empty.");
return null;
}
return properties.get(0).getSecond();
} catch (final SQLException e) {
- logger.error("Error fetching property " + propsName + " Project " + projectId + " version " + projectVer, e);
+ logger.error("Error fetching property " + propsName + " Project " + projectId + " version "
+ + projectVer, e);
throw new ProjectManagerException("Error fetching property " + propsName, e);
}
}
@Override
- public Props fetchProjectProperty(final Project project, final String propsName) throws ProjectManagerException {
+ public Props fetchProjectProperty(final Project project, final String propsName)
+ throws ProjectManagerException {
return fetchProjectProperty(project.getId(), project.getVersion(), propsName);
}
@Override
- public Map<String, Props> fetchProjectProperties(final int projectId, final int version) throws ProjectManagerException {
+ public Map<String, Props> fetchProjectProperties(final int projectId, final int version)
+ throws ProjectManagerException {
try {
- final List<Pair<String, Props>> properties = this.dbOperator.query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTIES,
- new ProjectPropertiesResultsHandler(), projectId, version);
+ final List<Pair<String, Props>> properties = this.dbOperator
+ .query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTIES,
+ new ProjectPropertiesResultsHandler(), projectId, version);
if (properties == null || properties.isEmpty()) {
return null;
}
@@ -879,7 +944,8 @@ public class JdbcProjectImpl implements ProjectLoader {
}
@Override
- public void cleanOlderProjectVersion(final int projectId, final int version) throws ProjectManagerException {
+ public void cleanOlderProjectVersion(final int projectId, final int version)
+ throws ProjectManagerException {
final String DELETE_FLOW = "DELETE FROM project_flows WHERE project_id=? AND version<?";
final String DELETE_PROPERTIES = "DELETE FROM project_properties WHERE project_id=? AND version<?";
final String DELETE_PROJECT_FILES = "DELETE FROM project_files WHERE project_id=? AND version<?";
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index 9245e06..59806c1 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -45,13 +45,11 @@ public interface ProjectLoader {
Project fetchProjectByName(String name) throws ProjectManagerException;
/**
- * Should create an empty project with the given name and user and adds it to
- * the data store. It will auto assign a unique id for this project if
- * successful.
+ * Should create an empty project with the given name and user and adds it to the data store. It
+ * will auto assign a unique id for this project if successful.
*
- * If an active project of the same name exists, it will throw an exception.
- * If the name and description of the project exceeds the store's constraints,
- * it will throw an exception.
+ * If an active project of the same name exists, it will throw an exception. If the name and
+ * description of the project exceeds the store's constraints, it will throw an exception.
*
* @throws ProjectManagerException if an active project of the same name exists.
*/
@@ -65,9 +63,8 @@ public interface ProjectLoader {
throws ProjectManagerException;
/**
- * Adds and updates the user permissions. Does not check if the user is valid.
- * If the permission doesn't exist, it adds. If the permission exists, it
- * updates.
+ * Adds and updates the user permissions. Does not check if the user is valid. If the permission
+ * doesn't exist, it adds. If the permission exists, it updates.
*/
void updatePermission(Project project, String name, Permission perm,
boolean isGroup) throws ProjectManagerException;
@@ -82,8 +79,7 @@ public interface ProjectLoader {
throws ProjectManagerException;
/**
- * Stores logs for a particular project. Will soft fail rather than throw
- * exception.
+ * Stores logs for a particular project. Will soft fail rather than throw exception.
*
* @param message return true if the posting was success.
*/
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 51852a3..1b8c986 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -190,8 +190,6 @@ public class ProjectManager {
/**
* Checks if a project is active using project_name
- *
- * @param name
*/
public Boolean isActiveProject(final String name) {
return this.projectsByName.containsKey(name);
@@ -199,19 +197,13 @@ public class ProjectManager {
/**
* Checks if a project is active using project_id
- *
- * @param name
*/
public Boolean isActiveProject(final int id) {
return this.projectsById.containsKey(id);
}
/**
- * fetch active project from cache and inactive projects from db by
- * project_name
- *
- * @param name
- * @return
+ * fetch active project from cache and inactive projects from db by project_name
*/
public Project getProject(final String name) {
Project fetchedProject = null;
@@ -228,11 +220,7 @@ public class ProjectManager {
}
/**
- * fetch active project from cache and inactive projects from db by
- * project_id
- *
- * @param id
- * @return
+ * fetch active project from cache and inactive projects from db by project_id
*/
public Project getProject(final int id) {
Project fetchedProject = null;
@@ -294,13 +282,8 @@ public class ProjectManager {
}
/**
- * Permanently delete all project files and properties data for all versions
- * of a project and log event in project_events table
- *
- * @param project
- * @param deleter
- * @return
- * @throws ProjectManagerException
+ * Permanently delete all project files and properties data for all versions of a project and log
+ * event in project_events table
*/
public synchronized Project purgeProject(final Project project, final User deleter)
throws ProjectManagerException {
@@ -429,18 +412,14 @@ public class ProjectManager {
}
/**
- * This method retrieves the uploaded project zip file from DB. A temporary
- * file is created to hold the content of the uploaded zip file. This
- * temporary file is provided in the ProjectFileHandler instance and the
- * caller of this method should call method
- * {@ProjectFileHandler.deleteLocalFile}
- * to delete the temporary file.
+ * This method retrieves the uploaded project zip file from DB. A temporary file is created to
+ * hold the content of the uploaded zip file. This temporary file is provided in the
+ * ProjectFileHandler instance and the caller of this method should call method
+ * {@ProjectFileHandler.deleteLocalFile} to delete the temporary file.
*
- * @param project
* @param version - latest version is used if value is -1
- * @return ProjectFileHandler - null if can't find project zip file based on
- * project name and version
- * @throws ProjectManagerException
+ * @return ProjectFileHandler - null if can't find project zip file based on project name and
+ * version
*/
public ProjectFileHandler getProjectFileHandler(final Project project, final int version)
throws ProjectManagerException {
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
index f8c82e1..7790b1a 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
@@ -20,15 +20,15 @@ import org.xml.sax.SAXException;
/**
* @author wkang
*
- * This class manages project whitelist defined in xml config file. An single xml config
- * file contains different types of whitelisted projects. For additional type of whitelist,
- * modify WhitelistType enum.
+ * This class manages project whitelist defined in xml config file. An single xml config file
+ * contains different types of whitelisted projects. For additional type of whitelist, modify
+ * WhitelistType enum.
*
- * The xml config file should in the following format. Please note the tag <MemoryCheck> is
- * same as the defined enum MemoryCheck
+ * The xml config file should in the following format. Please note the tag <MemoryCheck> is same as
+ * the defined enum MemoryCheck
*
- * <ProjectWhitelist> <MemoryCheck> <project projectname="project1" /> <project
- * projectname="project2" /> </MemoryCheck> <ProjectWhitelist>
+ * <ProjectWhitelist> <MemoryCheck> <project projectname="project1" /> <project
+ * projectname="project2" /> </MemoryCheck> <ProjectWhitelist>
*/
public class ProjectWhitelist {
@@ -127,11 +127,10 @@ public class ProjectWhitelist {
}
/**
- * The tag in the project whitelist xml config file should be same as
- * the defined enums.
+ * The tag in the project whitelist xml config file should be same as the defined enums.
*/
public static enum WhitelistType {
MemoryCheck,
NumJobPerFlow
}
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java b/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java
index 6c3e6d4..38b5ae9 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java
@@ -5,8 +5,8 @@ import azkaban.utils.Props;
import java.io.File;
/**
- * Interface to be implemented by plugins which are to be registered with Azkaban
- * as project validators that validate a project before uploaded into Azkaban.
+ * Interface to be implemented by plugins which are to be registered with Azkaban as project
+ * validators that validate a project before uploaded into Azkaban.
*/
public interface ProjectValidator {
@@ -21,9 +21,8 @@ public interface ProjectValidator {
String getValidatorName();
/**
- * Validate the project inside the given directory. The validator, using its own
- * validation logic, will generate a {@link ValidationReport} representing the result of
- * the validation.
+ * Validate the project inside the given directory. The validator, using its own validation logic,
+ * will generate a {@link ValidationReport} representing the result of the validation.
*/
ValidationReport validateProject(Project project, File projectDir);
}
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java
index 321976b..830fa26 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java
@@ -4,12 +4,11 @@ import java.util.HashSet;
import java.util.Set;
/**
- * The result of a project validation generated by a {@link ProjectValidator}. It contains
- * an enum of type {@link ValidationStatus} representing whether the validation passes,
- * generates warnings, or generates errors. Accordingly, three sets of String are also
- * maintained, storing the messages generated by the {@link ProjectValidator} at both
- * {@link ValidationStatus#WARN} and {@link ValidationStatus#ERROR} level, as well as
- * information messages associated with both levels.
+ * The result of a project validation generated by a {@link ProjectValidator}. It contains an enum
+ * of type {@link ValidationStatus} representing whether the validation passes, generates warnings,
+ * or generates errors. Accordingly, three sets of String are also maintained, storing the messages
+ * generated by the {@link ProjectValidator} at both {@link ValidationStatus#WARN} and {@link
+ * ValidationStatus#ERROR} level, as well as information messages associated with both levels.
*/
public class ValidationReport {
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidationStatus.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidationStatus.java
index 6aaed24..c9337b9 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ValidationStatus.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidationStatus.java
@@ -1,8 +1,8 @@
package azkaban.project.validator;
/**
- * Status of the ValidationReport. It also represents the severity of each rule.
- * The order of severity for the status is PASS < WARN < ERROR.
+ * Status of the ValidationReport. It also represents the severity of each rule. The order of
+ * severity for the status is PASS < WARN < ERROR.
*/
public enum ValidationStatus {
PASS("PASS"),
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorClassLoader.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorClassLoader.java
index 2f5ea85..4b39ab6 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorClassLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorClassLoader.java
@@ -14,8 +14,8 @@ import java.util.jar.JarFile;
import sun.net.www.protocol.jar.JarURLConnection;
/**
- * Workaround for jdk 6 disgrace with open jar files & native libs,
- * which is a reason of unrefreshable classloader.
+ * Workaround for jdk 6 disgrace with open jar files & native libs, which is a reason of
+ * unrefreshable classloader.
*/
public class ValidatorClassLoader extends URLClassLoader {
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
index f7d89c7..c34a841 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
@@ -8,9 +8,9 @@ import java.util.Map;
import org.apache.log4j.Logger;
/**
- * ValidatorManager is responsible for loading the list of validators specified in the
- * Azkaban validator configuration file. Once these validators are loaded, the ValidatorManager
- * will use the registered validators to verify each uploaded project before persisting it.
+ * ValidatorManager is responsible for loading the list of validators specified in the Azkaban
+ * validator configuration file. Once these validators are loaded, the ValidatorManager will use the
+ * registered validators to verify each uploaded project before persisting it.
*/
public interface ValidatorManager {
@@ -21,16 +21,16 @@ public interface ValidatorManager {
void loadValidators(Props props, Logger logger);
/**
- * Validate the given project using the registered list of validators. This method returns a
- * map of {@link ValidationReport} with the key being the validator's name and the value being
- * the {@link ValidationReport} generated by that validator.
+ * Validate the given project using the registered list of validators. This method returns a map
+ * of {@link ValidationReport} with the key being the validator's name and the value being the
+ * {@link ValidationReport} generated by that validator.
*/
Map<String, ValidationReport> validate(Project project, File projectDir);
/**
* The ValidatorManager should have a default validator which checks for the most essential
- * components of a project. The ValidatorManager should always load the default validator.
- * This method returns the default validator of this ValidatorManager.
+ * components of a project. The ValidatorManager should always load the default validator. This
+ * method returns the default validator of this ValidatorManager.
*/
ProjectValidator getDefaultValidator();
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
index bf625fe..cc6d1ab 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
@@ -25,17 +25,12 @@ import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
/**
- * Xml implementation of the ValidatorManager. Looks for the property
- * project.validators.xml.file in the azkaban properties.
+ * Xml implementation of the ValidatorManager. Looks for the property project.validators.xml.file in
+ * the azkaban properties.
*
- * The xml to be in the following form:
- * <azkaban-validators>
- * <validator classname="validator class name">
- * <!-- optional configurations for each individual validator -->
- * <property key="validator property key" value="validator property value" />
- * ...
- * </validator>
- * </azkaban-validators>
+ * The xml to be in the following form: <azkaban-validators> <validator classname="validator class
+ * name"> <!-- optional configurations for each individual validator --> <property key="validator
+ * property key" value="validator property value" /> ... </validator> </azkaban-validators>
*/
public class XmlValidatorManager implements ValidatorManager {
@@ -51,8 +46,8 @@ public class XmlValidatorManager implements ValidatorManager {
private Map<String, ProjectValidator> validators;
/**
- * Load the validator plugins from the validator directory (default being validators/) into
- * the validator ClassLoader. This enables creating instances of these validators in the
+ * Load the validator plugins from the validator directory (default being validators/) into the
+ * validator ClassLoader. This enables creating instances of these validators in the
* loadValidators() method.
*/
public XmlValidatorManager(final Props props) {
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
index 60c96c3..ab1ad1e 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
@@ -34,10 +34,9 @@ import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
/**
- * The ScheduleManager stores and executes the schedule. It uses a single thread
- * instead and waits until correct loading time for the flow. It will not remove
- * the flow from the schedule when it is run, which can potentially allow the
- * flow to and overlap each other.
+ * The ScheduleManager stores and executes the schedule. It uses a single thread instead and waits
+ * until correct loading time for the flow. It will not remove the flow from the schedule when it is
+ * run, which can potentially allow the flow to and overlap each other.
*
* TODO kunkun-tang: When new AZ quartz Scheduler comes, we will remove this class.
*/
@@ -55,9 +54,7 @@ public class ScheduleManager implements TriggerAgent {
new LinkedHashMap<>();
/**
- * Give the schedule manager a loader class that will properly load the
- * schedule.
- *
+ * Give the schedule manager a loader class that will properly load the schedule.
*/
@Inject
public ScheduleManager(final ScheduleLoader loader) {
@@ -88,8 +85,7 @@ public class ScheduleManager implements TriggerAgent {
}
/**
- * Shutdowns the scheduler thread. After shutdown, it may not be safe to use
- * it again.
+ * Shutdowns the scheduler thread. After shutdown, it may not be safe to use it again.
*/
@Override
public void shutdown() {
@@ -98,7 +94,6 @@ public class ScheduleManager implements TriggerAgent {
/**
* Retrieves a copy of the list of schedules.
- *
*/
public synchronized List<Schedule> getSchedules()
throws ScheduleManagerException {
@@ -109,7 +104,6 @@ public class ScheduleManager implements TriggerAgent {
/**
* Returns the scheduled flow for the flow name
- *
*/
public Schedule getSchedule(final int projectId, final String flowId)
throws ScheduleManagerException {
@@ -131,7 +125,6 @@ public class ScheduleManager implements TriggerAgent {
/**
* Removes the flow from the schedule if it exists.
- *
*/
public synchronized void removeSchedule(final Schedule sched) {
final Pair<Integer, String> identityPairMap = sched.getScheduleIdentityPair();
@@ -151,26 +144,27 @@ public class ScheduleManager implements TriggerAgent {
}
public Schedule scheduleFlow(final int scheduleId,
- final int projectId,
- final String projectName,
- final String flowName,
- final String status,
- final long firstSchedTime,
- final long endSchedTime,
- final DateTimeZone timezone,
- final ReadablePeriod period,
- final long lastModifyTime,
- final long nextExecTime,
- final long submitTime,
- final String submitUser,
- final ExecutionOptions execOptions,
- final List<SlaOption> slaOptions) {
+ final int projectId,
+ final String projectName,
+ final String flowName,
+ final String status,
+ final long firstSchedTime,
+ final long endSchedTime,
+ final DateTimeZone timezone,
+ final ReadablePeriod period,
+ final long lastModifyTime,
+ final long nextExecTime,
+ final long submitTime,
+ final String submitUser,
+ final ExecutionOptions execOptions,
+ final List<SlaOption> slaOptions) {
final Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status,
firstSchedTime, endSchedTime, timezone, period, lastModifyTime, nextExecTime,
submitTime, submitUser, execOptions, slaOptions, null);
logger
.info("Scheduling flow '" + sched.getScheduleName() + "' for "
- + this._dateFormat.print(firstSchedTime) + " with a period of " + (period == null ? "(non-recurring)"
+ + this._dateFormat.print(firstSchedTime) + " with a period of " + (period == null
+ ? "(non-recurring)"
: period));
insertSchedule(sched);
@@ -178,20 +172,20 @@ public class ScheduleManager implements TriggerAgent {
}
public Schedule cronScheduleFlow(final int scheduleId,
- final int projectId,
- final String projectName,
- final String flowName,
- final String status,
- final long firstSchedTime,
- final long endSchedTime,
- final DateTimeZone timezone,
- final long lastModifyTime,
- final long nextExecTime,
- final long submitTime,
- final String submitUser,
- final ExecutionOptions execOptions,
- final List<SlaOption> slaOptions,
- final String cronExpression) {
+ final int projectId,
+ final String projectName,
+ final String flowName,
+ final String status,
+ final long firstSchedTime,
+ final long endSchedTime,
+ final DateTimeZone timezone,
+ final long lastModifyTime,
+ final long nextExecTime,
+ final long submitTime,
+ final String submitUser,
+ final ExecutionOptions execOptions,
+ final List<SlaOption> slaOptions,
+ final String cronExpression) {
final Schedule sched =
new Schedule(scheduleId, projectId, projectName, flowName, status,
firstSchedTime, endSchedTime, timezone, null, lastModifyTime, nextExecTime,
@@ -203,6 +197,7 @@ public class ScheduleManager implements TriggerAgent {
insertSchedule(sched);
return sched;
}
+
/**
* Schedules the flow, but doesn't save the schedule afterwards.
*/
diff --git a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
index cff70c7..f2f6c87 100644
--- a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
+++ b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
@@ -143,8 +143,7 @@ public class HttpRequestUtils {
}
/**
- * parse a string as number and throws exception if parsed value is not a
- * valid integer
+ * parse a string as number and throws exception if parsed value is not a valid integer
*
* @throws ExecutorManagerException if paramName is not a valid integer
*/
@@ -180,8 +179,7 @@ public class HttpRequestUtils {
}
/**
- * Retrieves the param from the http servlet request. Will throw an exception
- * if not found
+ * Retrieves the param from the http servlet request. Will throw an exception if not found
*/
public static String getParam(final HttpServletRequest request, final String name)
throws ServletException {
@@ -206,8 +204,8 @@ public class HttpRequestUtils {
}
/**
- * Returns the param and parses it into an int. Will throw an exception if not
- * found, or a parse error if the type is incorrect.
+ * Returns the param and parses it into an int. Will throw an exception if not found, or a parse
+ * error if the type is incorrect.
*/
public static int getIntParam(final HttpServletRequest request, final String name)
throws ServletException {
diff --git a/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java b/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java
index a2147b3..81d11b9 100644
--- a/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java
+++ b/azkaban-common/src/main/java/azkaban/server/session/SessionCache.java
@@ -25,10 +25,9 @@ import java.util.concurrent.TimeUnit;
/**
* Cache for web session.
*
- * The following global azkaban properties can be used: max.num.sessions - used
- * to determine the number of live sessions that azkaban will handle. Default is
- * 10000 session.time.to.live -Number of seconds before session expires. Default
- * set to 10 hours.
+ * The following global azkaban properties can be used: max.num.sessions - used to determine the
+ * number of live sessions that azkaban will handle. Default is 10000 session.time.to.live -Number
+ * of seconds before session expires. Default set to 10 hours.
*/
public class SessionCache {
diff --git a/azkaban-common/src/main/java/azkaban/sla/SlaOption.java b/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
index 3e3872b..fbf785e 100644
--- a/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
+++ b/azkaban-common/src/main/java/azkaban/sla/SlaOption.java
@@ -49,7 +49,7 @@ public class SlaOption {
public static final String ACTION_KILL_JOB = "SlaKillJob";
private static final DateTimeFormatter fmt = DateTimeFormat
.forPattern("MM/dd, YYYY HH:mm");
-
+
private String type;
private Map<String, Object> info;
private List<String> actions;
diff --git a/azkaban-common/src/main/java/azkaban/storage/HdfsAuth.java b/azkaban-common/src/main/java/azkaban/storage/HdfsAuth.java
index b9a666d..331627c 100644
--- a/azkaban-common/src/main/java/azkaban/storage/HdfsAuth.java
+++ b/azkaban-common/src/main/java/azkaban/storage/HdfsAuth.java
@@ -58,9 +58,8 @@ public class HdfsAuth {
}
/**
- * API to authorize HDFS access.
- * This logins in the configured user via the keytab.
- * If the user is already logged in then it renews the TGT.
+ * API to authorize HDFS access. This logins in the configured user via the keytab. If the user is
+ * already logged in then it renews the TGT.
*/
public void authorize() {
if (this.isSecurityEnabled) {
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageCleaner.java b/azkaban-common/src/main/java/azkaban/storage/StorageCleaner.java
index 969e15a..9cd518d 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageCleaner.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageCleaner.java
@@ -87,10 +87,9 @@ public class StorageCleaner {
* From the storage perspective, cleanup just needs the {@link Storage#delete(String)} API to
* work.
*
- * Failure cases:
- * - If the storage cleanup fails, the cleanup will be attempted again on the next upload
- * - If the storage cleanup succeeds and the DB cleanup fails, the DB will be cleaned up in the
- * next attempt.
+ * Failure cases: - If the storage cleanup fails, the cleanup will be attempted again on the next
+ * upload - If the storage cleanup succeeds and the DB cleanup fails, the DB will be cleaned up in
+ * the next attempt.
*
* @param projectId project ID
*/
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
index 1373649..d8aae5b 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
@@ -122,8 +122,7 @@ public class StorageManager {
}
/**
- * Clean up project artifacts based on project ID.
- * See {@link StorageCleaner#cleanupProjectArtifacts(int)}
+ * Clean up project artifacts based on project ID. See {@link StorageCleaner#cleanupProjectArtifacts(int)}
*/
public void cleanupProjectArtifacts(final int projectId) {
try {
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
index 85808aa..4112517 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
@@ -41,8 +41,8 @@ public class BasicTimeChecker implements ConditionChecker {
private boolean skipPastChecks = true;
public BasicTimeChecker(final String id, final long firstCheckTime,
- final DateTimeZone timezone, final boolean isRecurring, final boolean skipPastChecks,
- final ReadablePeriod period, final String cronExpression) {
+ final DateTimeZone timezone, final boolean isRecurring, final boolean skipPastChecks,
+ final ReadablePeriod period, final String cronExpression) {
this.id = id;
this.firstCheckTime = firstCheckTime;
this.timezone = timezone;
@@ -56,8 +56,8 @@ public class BasicTimeChecker implements ConditionChecker {
}
public BasicTimeChecker(final String id, final long firstCheckTime,
- final DateTimeZone timezone, final long nextCheckTime, final boolean isRecurring,
- final boolean skipPastChecks, final ReadablePeriod period, final String cronExpression) {
+ final DateTimeZone timezone, final long nextCheckTime, final boolean isRecurring,
+ final boolean skipPastChecks, final ReadablePeriod period, final String cronExpression) {
this.id = id;
this.firstCheckTime = firstCheckTime;
this.timezone = timezone;
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Condition.java b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
index beb9820..92f72c1 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Condition.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
@@ -45,7 +45,7 @@ public class Condition {
}
public Condition(final Map<String, ConditionChecker> checkers, final String expr,
- final long nextCheckTime) {
+ final long nextCheckTime) {
this.nextCheckTime = nextCheckTime;
setCheckers(checkers);
this.expression = jexl.createExpression(expr);
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Trigger.java b/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
index b783628..d882a38 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
@@ -53,10 +53,10 @@ public class Trigger {
}
private Trigger(final int triggerId, final long lastModifyTime, final long submitTime,
- final String submitUser, final String source, final Condition triggerCondition,
- final Condition expireCondition, final List<TriggerAction> actions,
- final List<TriggerAction> expireActions, final Map<String, Object> info,
- final Map<String, Object> context) {
+ final String submitUser, final String source, final Condition triggerCondition,
+ final Condition expireCondition, final List<TriggerAction> actions,
+ final List<TriggerAction> expireActions, final Map<String, Object> info,
+ final Map<String, Object> context) {
requireNonNull(submitUser);
requireNonNull(source);
requireNonNull(triggerCondition);
@@ -216,9 +216,9 @@ public class Trigger {
return this.expireCondition;
}
- public void setExpireCondition(final Condition expireCondition) {
+ public void setExpireCondition(final Condition expireCondition) {
this.expireCondition = expireCondition;
- }
+ }
public List<TriggerAction> getActions() {
return this.actions;
@@ -362,6 +362,7 @@ public class Trigger {
}
public static class TriggerBuilder {
+
private final String submitUser;
private final String source;
private final TriggerStatus status = TriggerStatus.READY;
@@ -377,10 +378,10 @@ public class Trigger {
private Map<String, Object> context = new HashMap<>();
public TriggerBuilder(final String submitUser,
- final String source,
- final Condition triggerCondition,
- final Condition expireCondition,
- final List<TriggerAction> actions) {
+ final String source,
+ final Condition triggerCondition,
+ final Condition expireCondition,
+ final List<TriggerAction> actions) {
this.submitUser = submitUser;
this.source = source;
this.triggerCondition = triggerCondition;
diff --git a/azkaban-common/src/main/java/azkaban/user/UserManager.java b/azkaban-common/src/main/java/azkaban/user/UserManager.java
index 5556f0a..b583e94 100644
--- a/azkaban-common/src/main/java/azkaban/user/UserManager.java
+++ b/azkaban-common/src/main/java/azkaban/user/UserManager.java
@@ -17,12 +17,11 @@
package azkaban.user;
/**
- * Interface for the UserManager. Implementors will have to handle the retrieval
- * of the User object given the username and password.
+ * Interface for the UserManager. Implementors will have to handle the retrieval of the User object
+ * given the username and password.
*
- * The constructor will be called with a azkaban.utils.Props object passed as
- * the only parameter. If such a constructor doesn't exist, than the UserManager
- * instantiation may fail.
+ * The constructor will be called with a azkaban.utils.Props object passed as the only parameter. If
+ * such a constructor doesn't exist, than the UserManager instantiation may fail.
*/
public interface UserManager {
@@ -35,14 +34,12 @@ public interface UserManager {
throws UserManagerException;
/**
- * Returns true if the user is valid. This is used when adding permissions for
- * users
+ * Returns true if the user is valid. This is used when adding permissions for users
*/
public boolean validateUser(String username);
/**
- * Returns true if the group is valid. This is used when adding permissions
- * for groups.
+ * Returns true if the group is valid. This is used when adding permissions for groups.
*/
public boolean validateGroup(String group);
diff --git a/azkaban-common/src/main/java/azkaban/user/UserUtils.java b/azkaban-common/src/main/java/azkaban/user/UserUtils.java
index 687bcda..b196221 100644
--- a/azkaban-common/src/main/java/azkaban/user/UserUtils.java
+++ b/azkaban-common/src/main/java/azkaban/user/UserUtils.java
@@ -1,6 +1,7 @@
package azkaban.user;
public final class UserUtils {
+
private UserUtils() {
}
diff --git a/azkaban-common/src/main/java/azkaban/user/XmlUserManager.java b/azkaban-common/src/main/java/azkaban/user/XmlUserManager.java
index 007865d..9f99251 100644
--- a/azkaban-common/src/main/java/azkaban/user/XmlUserManager.java
+++ b/azkaban-common/src/main/java/azkaban/user/XmlUserManager.java
@@ -34,12 +34,11 @@ import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
/**
- * Xml implementation of the UserManager. Looks for the property
- * user.manager.xml.file in the azkaban properties.
+ * Xml implementation of the UserManager. Looks for the property user.manager.xml.file in the
+ * azkaban properties.
*
- * The xml to be in the following form: <azkaban-users> <user
- * username="username" password="azkaban" roles="admin" groups="azkaban"/>
- * </azkaban-users>
+ * The xml to be in the following form: <azkaban-users> <user username="username" password="azkaban"
+ * roles="admin" groups="azkaban"/> </azkaban-users>
*/
public class XmlUserManager implements UserManager {
diff --git a/azkaban-common/src/main/java/azkaban/utils/CircularBuffer.java b/azkaban-common/src/main/java/azkaban/utils/CircularBuffer.java
index f0a2f80..6f426ea 100644
--- a/azkaban-common/src/main/java/azkaban/utils/CircularBuffer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/CircularBuffer.java
@@ -23,8 +23,8 @@ import java.util.Iterator;
import java.util.List;
/**
- * A circular buffer of items of a given length. It will grow up to the give
- * size as items are appended, then it will begin to overwrite older items.
+ * A circular buffer of items of a given length. It will grow up to the give size as items are
+ * appended, then it will begin to overwrite older items.
*
* @param <T> The type of the item contained.
*/
diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index 8deb690..68d64f1 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -36,8 +36,7 @@ import org.apache.log4j.Logger;
/**
- * Runs a few unix commands. Created this so that I can move to JNI in the
- * future.
+ * Runs a few unix commands. Created this so that I can move to JNI in the future.
*/
public class FileIOUtils {
diff --git a/azkaban-common/src/main/java/azkaban/utils/OsMemoryUtil.java b/azkaban-common/src/main/java/azkaban/utils/OsMemoryUtil.java
index f0b5fc6..1091aeb 100644
--- a/azkaban-common/src/main/java/azkaban/utils/OsMemoryUtil.java
+++ b/azkaban-common/src/main/java/azkaban/utils/OsMemoryUtil.java
@@ -13,8 +13,7 @@ import org.slf4j.LoggerFactory;
/**
* Utility class for getting system memory information
*
- * Note:
- * This check is designed for Linux only.
+ * Note: This check is designed for Linux only.
*/
class OsMemoryUtil {
@@ -84,16 +83,10 @@ class OsMemoryUtil {
}
/**
- * Example file:
- * $ cat /proc/meminfo
- * MemTotal: 65894008 kB
- * MemFree: 59400536 kB
- * Buffers: 409348 kB
- * Cached: 4290236 kB
- * SwapCached: 0 kB
+ * Example file: $ cat /proc/meminfo MemTotal: 65894008 kB MemFree: 59400536 kB
+ * Buffers: 409348 kB Cached: 4290236 kB SwapCached: 0 kB
*
- * Make the method package private to make unit testing easier.
- * Otherwise it can be made private.
+ * Make the method package private to make unit testing easier. Otherwise it can be made private.
*
* @param line the text for a memory usage statistics we are interested in
* @return size of the memory. unit kB. 0 if there is an error.
diff --git a/azkaban-common/src/main/java/azkaban/utils/Props.java b/azkaban-common/src/main/java/azkaban/utils/Props.java
index d0fc84c..fd0cfd1 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Props.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Props.java
@@ -38,8 +38,8 @@ import java.util.TreeMap;
import org.apache.log4j.Logger;
/**
- * Hashmap implementation of a hierarchitical properties with helpful converter
- * functions and Exception throwing. This class is not threadsafe.
+ * Hashmap implementation of a hierarchitical properties with helpful converter functions and
+ * Exception throwing. This class is not threadsafe.
*/
public class Props {
@@ -125,16 +125,15 @@ public class Props {
}
/**
- * Create a Props with a null parent from a list of key value pairing. i.e.
- * [key1, value1, key2, value2 ...]
+ * Create a Props with a null parent from a list of key value pairing. i.e. [key1, value1, key2,
+ * value2 ...]
*/
public static Props of(final String... args) {
return of((Props) null, args);
}
/**
- * Create a Props from a list of key value pairing. i.e. [key1, value1, key2,
- * value2 ...]
+ * Create a Props from a list of key value pairing. i.e. [key1, value1, key2, value2 ...]
*/
public static Props of(final Props parent, final String... args) {
if (args.length % 2 != 0) {
@@ -254,9 +253,8 @@ public class Props {
}
/**
- * Put the given string value for the string key. This method performs any
- * variable substitution in the value replacing any occurance of ${name} with
- * the value of get("name").
+ * Put the given string value for the string key. This method performs any variable substitution
+ * in the value replacing any occurance of ${name} with the value of get("name").
*
* @param key The key to put the value to
* @param value The value to do substitution on and store
@@ -268,10 +266,9 @@ public class Props {
}
/**
- * Put the given Properties into the Props. This method performs any variable
- * substitution in the value replacing any occurrence of ${name} with the
- * value of get("name"). get() is called first on the Props and next on the
- * Properties object.
+ * Put the given Properties into the Props. This method performs any variable substitution in the
+ * value replacing any occurrence of ${name} with the value of get("name"). get() is called first
+ * on the Props and next on the Properties object.
*
* @param properties The properties to put
* @throws IllegalArgumentException If the variable given for substitution is not a valid key in
@@ -354,16 +351,16 @@ public class Props {
}
/**
- * The number of unique keys defined by this Props (keys defined only in
- * parent Props are not counted)
+ * The number of unique keys defined by this Props (keys defined only in parent Props are not
+ * counted)
*/
public int localSize() {
return this._current.size();
}
/**
- * Attempts to return the Class that corresponds to the Props value. If the
- * class doesn't exit, an IllegalArgumentException will be thrown.
+ * Attempts to return the Class that corresponds to the Props value. If the class doesn't exit, an
+ * IllegalArgumentException will be thrown.
*/
public Class<?> getClass(final String key) {
try {
@@ -392,8 +389,7 @@ public class Props {
}
/**
- * Gets the class from the Props. If it doesn't exist, it will return the
- * defaultClass
+ * Gets the class from the Props. If it doesn't exist, it will return the defaultClass
*/
public Class<?> getClass(final String key, final Class<?> defaultClass) {
if (containsKey(key)) {
@@ -404,8 +400,7 @@ public class Props {
}
/**
- * Gets the string from the Props. If it doesn't exist, it will return the
- * defaultValue
+ * Gets the string from the Props. If it doesn't exist, it will return the defaultValue
*/
public String getString(final String key, final String defaultValue) {
if (containsKey(key)) {
@@ -416,8 +411,7 @@ public class Props {
}
/**
- * Gets the string from the Props. If it doesn't exist, throw and
- * UndefinedPropertiesException
+ * Gets the string from the Props. If it doesn't exist, throw and UndefinedPropertiesException
*/
public String getString(final String key) {
if (containsKey(key)) {
@@ -453,8 +447,8 @@ public class Props {
}
/**
- * Returns a list of strings with the comma as the separator of the value. If
- * the value is null, it'll return the defaultValue.
+ * Returns a list of strings with the comma as the separator of the value. If the value is null,
+ * it'll return the defaultValue.
*/
public List<String> getStringList(final String key, final List<String> defaultValue) {
if (containsKey(key)) {
@@ -465,8 +459,8 @@ public class Props {
}
/**
- * Returns a list of strings with the sep as the separator of the value. If
- * the value is null, it'll return the defaultValue.
+ * Returns a list of strings with the sep as the separator of the value. If the value is null,
+ * it'll return the defaultValue.
*/
public List<String> getStringList(final String key, final List<String> defaultValue,
final String sep) {
@@ -478,8 +472,8 @@ public class Props {
}
/**
- * Returns true if the value equals "true". If the value is null, then the
- * default value is returned.
+ * Returns true if the value equals "true". If the value is null, then the default value is
+ * returned.
*/
public boolean getBoolean(final String key, final boolean defaultValue) {
if (containsKey(key)) {
@@ -503,9 +497,8 @@ public class Props {
}
/**
- * Returns the long representation of the value. If the value is null, then
- * the default value is returned. If the value isn't a long, then a parse
- * exception will be thrown.
+ * Returns the long representation of the value. If the value is null, then the default value is
+ * returned. If the value isn't a long, then a parse exception will be thrown.
*/
public long getLong(final String name, final long defaultValue) {
if (containsKey(name)) {
@@ -517,8 +510,8 @@ public class Props {
/**
* Returns the long representation of the value. If the value is null, then a
- * UndefinedPropertyException will be thrown. If the value isn't a long, then
- * a parse exception will be thrown.
+ * UndefinedPropertyException will be thrown. If the value isn't a long, then a parse exception
+ * will be thrown.
*/
public long getLong(final String name) {
if (containsKey(name)) {
@@ -530,9 +523,8 @@ public class Props {
}
/**
- * Returns the int representation of the value. If the value is null, then the
- * default value is returned. If the value isn't a int, then a parse exception
- * will be thrown.
+ * Returns the int representation of the value. If the value is null, then the default value is
+ * returned. If the value isn't a int, then a parse exception will be thrown.
*/
public int getInt(final String name, final int defaultValue) {
if (containsKey(name)) {
@@ -544,8 +536,8 @@ public class Props {
/**
* Returns the int representation of the value. If the value is null, then a
- * UndefinedPropertyException will be thrown. If the value isn't a int, then a
- * parse exception will be thrown.
+ * UndefinedPropertyException will be thrown. If the value isn't a int, then a parse exception
+ * will be thrown.
*/
public int getInt(final String name) {
if (containsKey(name)) {
@@ -557,9 +549,8 @@ public class Props {
}
/**
- * Returns the double representation of the value. If the value is null, then
- * the default value is returned. If the value isn't a double, then a parse
- * exception will be thrown.
+ * Returns the double representation of the value. If the value is null, then the default value is
+ * returned. If the value isn't a double, then a parse exception will be thrown.
*/
public double getDouble(final String name, final double defaultValue) {
if (containsKey(name)) {
@@ -570,9 +561,9 @@ public class Props {
}
/**
- * Returns the double representation of the value. If the value is null, then
- * a UndefinedPropertyException will be thrown. If the value isn't a double,
- * then a parse exception will be thrown.
+ * Returns the double representation of the value. If the value is null, then a
+ * UndefinedPropertyException will be thrown. If the value isn't a double, then a parse exception
+ * will be thrown.
*/
public double getDouble(final String name) {
if (containsKey(name)) {
@@ -584,9 +575,8 @@ public class Props {
}
/**
- * Returns the uri representation of the value. If the value is null, then the
- * default value is returned. If the value isn't a uri, then a
- * IllegalArgumentException will be thrown.
+ * Returns the uri representation of the value. If the value is null, then the default value is
+ * returned. If the value isn't a uri, then a IllegalArgumentException will be thrown.
*/
public URI getUri(final String name) {
if (containsKey(name)) {
@@ -602,9 +592,8 @@ public class Props {
}
/**
- * Returns the double representation of the value. If the value is null, then
- * the default value is returned. If the value isn't a uri, then a
- * IllegalArgumentException will be thrown.
+ * Returns the double representation of the value. If the value is null, then the default value is
+ * returned. If the value isn't a uri, then a IllegalArgumentException will be thrown.
*/
public URI getUri(final String name, final URI defaultValue) {
if (containsKey(name)) {
diff --git a/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
index 89242a8..251e764 100644
--- a/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
+++ b/azkaban-common/src/main/java/azkaban/utils/RestfulApiClient.java
@@ -39,8 +39,7 @@ import org.apache.http.impl.client.HttpClients;
import org.apache.log4j.Logger;
/**
- * class handles the communication between the application and
- * a Restful API based web server.
+ * class handles the communication between the application and a Restful API based web server.
*
* @param T : type of the returning response object. Note: the idea of this abstract class is to
* provide a wrapper for the logic around HTTP layer communication so development work can take this
@@ -152,12 +151,10 @@ public abstract class RestfulApiClient<T> {
}
/**
- * Method to transform the response returned by the httpClient into the
- * type specified.
- * Note: Method need to handle case such as failed request.
- * Also method is not supposed to pass the response object out
- * via the returning value as the response will be closed after the
- * execution steps out of the method context.
+ * Method to transform the response returned by the httpClient into the type specified. Note:
+ * Method need to handle case such as failed request. Also method is not supposed to pass the
+ * response object out via the returning value as the response will be closed after the execution
+ * steps out of the method context.
**/
protected abstract T parseResponse(HttpResponse response)
throws HttpResponseException, IOException;
diff --git a/azkaban-common/src/main/java/azkaban/utils/StdOutErrRedirect.java b/azkaban-common/src/main/java/azkaban/utils/StdOutErrRedirect.java
index 00b9967..e135383 100644
--- a/azkaban-common/src/main/java/azkaban/utils/StdOutErrRedirect.java
+++ b/azkaban-common/src/main/java/azkaban/utils/StdOutErrRedirect.java
@@ -22,9 +22,8 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
/**
- * A class to encapsulate the redirection of stdout and stderr to log4j
- * This allows us to catch messages written to the console (although we should
- * not be using System.out to write out).
+ * A class to encapsulate the redirection of stdout and stderr to log4j This allows us to catch
+ * messages written to the console (although we should not be using System.out to write out).
*/
public class StdOutErrRedirect {
diff --git a/azkaban-common/src/main/java/azkaban/utils/SwapQueue.java b/azkaban-common/src/main/java/azkaban/utils/SwapQueue.java
index a13b5f3..8a16539 100644
--- a/azkaban-common/src/main/java/azkaban/utils/SwapQueue.java
+++ b/azkaban-common/src/main/java/azkaban/utils/SwapQueue.java
@@ -21,8 +21,8 @@ import java.util.Collection;
import java.util.Iterator;
/**
- * Queue that swaps its lists. Allows for non-blocking writes when reading. Swap
- * should be called before every read.
+ * Queue that swaps its lists. Allows for non-blocking writes when reading. Swap should be called
+ * before every read.
*/
public class SwapQueue<T> implements Iterable<T> {
@@ -35,8 +35,7 @@ public class SwapQueue<T> implements Iterable<T> {
}
/**
- * Swaps primaryQueue with secondary queue. The previous primary queue will be
- * released.
+ * Swaps primaryQueue with secondary queue. The previous primary queue will be released.
*/
public synchronized void swap() {
this.primaryQueue = this.secondaryQueue;
diff --git a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
index 5886bc9..8d0af8c 100644
--- a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
+++ b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
@@ -5,12 +5,12 @@ import org.slf4j.LoggerFactory;
/**
- * This class is used to maintain system memory information. Processes utilizing
- * large amount of memory should consult this class to see if the system has enough
- * memory to proceed the operation.
+ * This class is used to maintain system memory information. Processes utilizing large amount of
+ * memory should consult this class to see if the system has enough memory to proceed the
+ * operation.
*
- * Memory information is obtained from /proc/meminfo, so only Unix/Linux like system
- * will support this class.
+ * Memory information is obtained from /proc/meminfo, so only Unix/Linux like system will support
+ * this class.
*
* All the memory size used in this function is in KB.
*/
diff --git a/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
index d7d377b..392dac3 100644
--- a/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
+++ b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
@@ -16,8 +16,7 @@
package azkaban.utils;
/**
- * Interface for listener to get notified before and after a task has been
- * executed.
+ * Interface for listener to get notified before and after a task has been executed.
*
* @author hluu
*/
@@ -26,4 +25,4 @@ public interface ThreadPoolExecutingListener {
public void beforeExecute(Runnable r);
public void afterExecute(Runnable r);
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
index 6fc918f..2b71961 100644
--- a/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
+++ b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
@@ -25,11 +25,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
/**
- * A simple subclass of {@link ThreadPoolExecutor} to keep track of in progress
- * tasks as well as other interesting statistics.
+ * A simple subclass of {@link ThreadPoolExecutor} to keep track of in progress tasks as well as
+ * other interesting statistics.
*
- * The content of this class is copied from article "Java theory and practice:
- * Instrumenting applications with JMX"
+ * The content of this class is copied from article "Java theory and practice: Instrumenting
+ * applications with JMX"
*
* @author hluu
*/
@@ -109,4 +109,4 @@ public class TrackingThreadPool extends ThreadPoolExecutor {
public void afterExecute(final Runnable r) {
}
}
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/Utils.java b/azkaban-common/src/main/java/azkaban/utils/Utils.java
index d024bd2..30680d0 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Utils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Utils.java
@@ -67,8 +67,8 @@ public class Utils {
}
/**
- * Equivalent to Object.equals except that it handles nulls. If a and b are
- * both null, true is returned.
+ * Equivalent to Object.equals except that it handles nulls. If a and b are both null, true is
+ * returned.
*/
public static boolean equals(final Object a, final Object b) {
if (a == null || b == null) {
diff --git a/azkaban-common/src/main/java/azkaban/utils/WebUtils.java b/azkaban-common/src/main/java/azkaban/utils/WebUtils.java
index 0fe3ae1..2ff8e5a 100644
--- a/azkaban-common/src/main/java/azkaban/utils/WebUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/WebUtils.java
@@ -167,12 +167,11 @@ public class WebUtils {
}
/**
- * Gets the actual client IP address inspecting the X-Forwarded-For
- * HTTP header or using the provided 'remote IP address' from the
- * low level TCP connection from the client.
+ * Gets the actual client IP address inspecting the X-Forwarded-For HTTP header or using the
+ * provided 'remote IP address' from the low level TCP connection from the client.
*
- * If multiple IP addresses are provided in the X-Forwarded-For header
- * then the first one (first hop) is used
+ * If multiple IP addresses are provided in the X-Forwarded-For header then the first one (first
+ * hop) is used
*
* @param httpHeaders List of HTTP headers for the current request
* @param remoteAddr The client IP address and port from the current request's TCP connection
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index badcb18..ea4cc79 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -124,11 +124,11 @@ public class ExecutionFlowDaoTest {
public void fetchFlowHistory() throws Exception {
final ExecutableFlow flow = createTestFlow();
this.executionFlowDao.uploadExecutableFlow(flow);
- final List<ExecutableFlow> flowList1 = this.executionFlowDao.fetchFlowHistory(0,2 );
+ final List<ExecutableFlow> flowList1 = this.executionFlowDao.fetchFlowHistory(0, 2);
assertThat(flowList1.size()).isEqualTo(1);
final List<ExecutableFlow> flowList2 = this.executionFlowDao
- .fetchFlowHistory(flow.getProjectId(), flow.getId(),0,2 );
+ .fetchFlowHistory(flow.getProjectId(), flow.getId(), 0, 2);
assertThat(flowList2.size()).isEqualTo(1);
final ExecutableFlow fetchFlow =
@@ -182,7 +182,8 @@ public class ExecutionFlowDaoTest {
flow2.setStatus(Status.PREPARING);
this.executionFlowDao.uploadExecutableFlow(flow2);
- final List<Pair<ExecutionReference, ExecutableFlow>> fetchedQueuedFlows = this.executionFlowDao.fetchQueuedFlows();
+ final List<Pair<ExecutionReference, ExecutableFlow>> fetchedQueuedFlows = this.executionFlowDao
+ .fetchQueuedFlows();
assertThat(fetchedQueuedFlows.size()).isEqualTo(2);
final Pair<ExecutionReference, ExecutableFlow> fetchedFlow1 = fetchedQueuedFlows.get(0);
final Pair<ExecutionReference, ExecutableFlow> fetchedFlow2 = fetchedQueuedFlows.get(1);
@@ -200,7 +201,8 @@ public class ExecutionFlowDaoTest {
this.executionFlowDao.uploadExecutableFlow(flow);
this.assignExecutor.assignExecutor(executor.getId(), flow.getExecutionId());
- final Executor fetchExecutor = this.executorDao.fetchExecutorByExecutionId(flow.getExecutionId());
+ final Executor fetchExecutor = this.executorDao
+ .fetchExecutorByExecutionId(flow.getExecutionId());
assertThat(fetchExecutor).isEqualTo(executor);
this.assignExecutor.unassignExecutor(flow.getExecutionId());
@@ -216,13 +218,13 @@ public class ExecutionFlowDaoTest {
// Since we haven't inserted any executors, 1 should be non-existent executor id.
assertThatThrownBy(
() -> this.assignExecutor.assignExecutor(1, flow.getExecutionId()))
- .isInstanceOf(ExecutorManagerException.class)
- .hasMessageContaining("non-existent executor");
+ .isInstanceOf(ExecutorManagerException.class)
+ .hasMessageContaining("non-existent executor");
}
/* Test exception when assigning an executor to a non-existent flow execution */
@Test
- public void testAssignExecutorInvalidExecution() throws Exception{
+ public void testAssignExecutorInvalidExecution() throws Exception {
final String host = "localhost";
final int port = 12345;
final Executor executor = this.executorDao.addExecutor(host, port);
@@ -287,13 +289,15 @@ public class ExecutionFlowDaoTest {
assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isFalse();
}
- @Test @Ignore
+ @Test
+ @Ignore
// TODO jamiesjc: Active_execution_flow table is already deprecated. we should remove related
// test methods as well.
public void testFetchActiveFlowsReferenceChanged() throws Exception {
}
- @Test @Ignore
+ @Test
+ @Ignore
// TODO jamiesjc: Active_execution_flow table is already deprecated. we should remove related
// test methods as well.
public void testFetchActiveFlowByExecId() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionLogsDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionLogsDaoTest.java
index f22a597..b4b8436 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionLogsDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionLogsDaoTest.java
@@ -69,8 +69,8 @@ public class ExecutionLogsDaoTest {
public void testSmallUploadLog() throws ExecutorManagerException {
final File logDir = ExecutionsTestUtil.getFlowDir(LOG_TEST_DIR_NAME);
final File[] smalllog =
- { new File(logDir, "log1.log"), new File(logDir, "log2.log"),
- new File(logDir, "log3.log") };
+ {new File(logDir, "log1.log"), new File(logDir, "log2.log"),
+ new File(logDir, "log3.log")};
this.executionLogsDao.uploadLogFile(1, "smallFiles", 0, smalllog);
@@ -92,8 +92,8 @@ public class ExecutionLogsDaoTest {
// Multiple of 255 for Henry the Eigth
final File[] largelog =
- { new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"),
- new File(logDir, "largeLog3.log") };
+ {new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"),
+ new File(logDir, "largeLog3.log")};
this.executionLogsDao.uploadLogFile(1, "largeFiles", 0, largelog);
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 8152f4c..ce79de2 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -68,7 +68,7 @@ public class ExecutorManagerTest {
/* Helper method to create a ExecutorManager Instance */
private ExecutorManager createMultiExecutorManagerInstance()
- throws ExecutorManagerException {
+ throws ExecutorManagerException {
return createMultiExecutorManagerInstance(new MockExecutorLoader());
}
@@ -107,14 +107,14 @@ public class ExecutorManagerTest {
final ExecutorManager manager =
new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
final Set<Executor> activeExecutors =
- new HashSet(manager.getAllActiveExecutors());
+ new HashSet(manager.getAllActiveExecutors());
Assert.assertEquals(activeExecutors.size(), 1);
final Executor executor = activeExecutors.iterator().next();
Assert.assertEquals(executor.getHost(), "localhost");
Assert.assertEquals(executor.getPort(), 12345);
Assert.assertArrayEquals(activeExecutors.toArray(), loader
- .fetchActiveExecutors().toArray());
+ .fetchActiveExecutors().toArray());
}
/*
@@ -130,9 +130,9 @@ public class ExecutorManagerTest {
final ExecutorManager manager =
new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
final Set<Executor> activeExecutors =
- new HashSet(manager.getAllActiveExecutors());
- Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
- executor1, executor2 });
+ new HashSet(manager.getAllActiveExecutors());
+ Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[]{
+ executor1, executor2});
}
/*
@@ -146,7 +146,7 @@ public class ExecutorManagerTest {
final ExecutorManager manager =
new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
- new Executor[] { executor1 });
+ new Executor[]{executor1});
// mark older executor as inactive
executor1.setActive(false);
@@ -156,7 +156,7 @@ public class ExecutorManagerTest {
manager.setupExecutors();
Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
- new Executor[] { executor2, executor3 });
+ new Executor[]{executor2, executor3});
}
/*
@@ -171,9 +171,9 @@ public class ExecutorManagerTest {
final ExecutorManager manager =
new ExecutorManager(this.props, loader, this.alertHolder, this.commonMetrics);
final Set<Executor> activeExecutors =
- new HashSet(manager.getAllActiveExecutors());
+ new HashSet(manager.getAllActiveExecutors());
Assert.assertArrayEquals(activeExecutors.toArray(),
- new Executor[] { executor1 });
+ new Executor[]{executor1});
// mark older executor as inactive
executor1.setActive(false);
@@ -217,7 +217,7 @@ public class ExecutorManagerTest {
final List<Integer> testFlows = Arrays.asList(flow1.getExecutionId(), flow2.getExecutionId());
final List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
- loader.fetchQueuedFlows();
+ loader.fetchQueuedFlows();
Assert.assertEquals(queuedFlowsDB.size(), testFlows.size());
// Verify things are correctly setup in db
for (final Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
@@ -229,7 +229,7 @@ public class ExecutorManagerTest {
final List<Integer> managerActiveFlows = manager.getRunningFlows()
.stream().map(ExecutableFlow::getExecutionId).collect(Collectors.toList());
Assert.assertTrue(managerActiveFlows.containsAll(testFlows)
- && testFlows.containsAll(managerActiveFlows));
+ && testFlows.containsAll(managerActiveFlows));
// Verify getQueuedFlowIds method
Assert.assertEquals("[1, 2]", manager.getQueuedFlowIds());
@@ -238,11 +238,11 @@ public class ExecutorManagerTest {
/* Test submit duplicate flow when previous instance is not dispatched */
@Test(expected = ExecutorManagerException.class)
public void testDuplicateQueuedFlows() throws ExecutorManagerException,
- IOException {
+ IOException {
final ExecutorManager manager = createMultiExecutorManagerInstance();
final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
flow1.getExecutionOptions().setConcurrentOption(
- ExecutionOptions.CONCURRENT_OPTION_SKIP);
+ ExecutionOptions.CONCURRENT_OPTION_SKIP);
final User testUser = TestUtils.getTestUser();
manager.submitExecutableFlow(flow1, testUser.getUserId());
@@ -263,7 +263,7 @@ public class ExecutorManagerTest {
manager.cancelFlow(flow1, testUser.getUserId());
final ExecutableFlow fetchedFlow =
- loader.fetchExecutableFlow(flow1.getExecutionId());
+ loader.fetchExecutableFlow(flow1.getExecutionId());
Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
Assert.assertFalse(manager.getRunningFlows().contains(flow1));
@@ -282,7 +282,8 @@ public class ExecutorManagerTest {
verify(this.loader).addActiveExecutableReference(any());
}
- @Ignore @Test
+ @Ignore
+ @Test
public void testFetchAllActiveFlows() throws ExecutorManagerException, IOException {
testSetUpForRunningFlows();
final List<ExecutableFlow> flows = this.manager.getRunningFlows();
@@ -291,7 +292,8 @@ public class ExecutorManagerTest {
}
}
- @Ignore @Test
+ @Ignore
+ @Test
public void testFetchActiveFlowByProject() throws ExecutorManagerException, IOException {
testSetUpForRunningFlows();
final List<Integer> executions = this.manager.getRunningFlows(this.flow1.getProjectId(),
@@ -301,7 +303,8 @@ public class ExecutorManagerTest {
.assertTrue(this.manager.isFlowRunning(this.flow1.getProjectId(), this.flow1.getFlowId()));
}
- @Ignore @Test
+ @Ignore
+ @Test
public void testFetchActiveFlowWithExecutor() throws ExecutorManagerException, IOException {
testSetUpForRunningFlows();
final List<Pair<ExecutableFlow, Executor>> activeFlowsWithExecutor =
@@ -318,8 +321,10 @@ public class ExecutorManagerTest {
final Set<String> activeExecutorServerHosts = this.manager.getAllActiveExecutorServerHosts();
final Executor executor1 = this.manager.fetchExecutor(this.flow1.getExecutionId());
final Executor executor2 = this.manager.fetchExecutor(this.flow2.getExecutionId());
- Assert.assertTrue(activeExecutorServerHosts.contains(executor1.getHost() + ":" + executor1.getPort()));
- Assert.assertTrue(activeExecutorServerHosts.contains(executor2.getHost() + ":" + executor2.getPort()));
+ Assert.assertTrue(
+ activeExecutorServerHosts.contains(executor1.getHost() + ":" + executor1.getPort()));
+ Assert.assertTrue(
+ activeExecutorServerHosts.contains(executor2.getHost() + ":" + executor2.getPort()));
}
/*
diff --git a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
index 5ca9993..9fb5da2 100644
--- a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
+++ b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
@@ -63,6 +63,12 @@ public class InteractiveTestJob extends AbstractProcessJob {
testJobs.clear();
}
+ public static void clearTestJobs(final String... names) {
+ for (final String name : names) {
+ assertNotNull(testJobs.remove(name));
+ }
+ }
+
@Override
public void run() throws Exception {
final String nestedFlowPath =
@@ -148,10 +154,4 @@ public class InteractiveTestJob extends AbstractProcessJob {
info("Killing job");
failJob();
}
-
- public static void clearTestJobs(final String... names) {
- for (String name : names) {
- assertNotNull(testJobs.remove(name));
- }
- }
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index adadb2d..6b4db57 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -22,11 +22,11 @@ import azkaban.utils.Pair;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
-import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
@@ -413,7 +413,7 @@ public class MockExecutorLoader implements ExecutorLoader {
}
@Override
- public List<ExecutableFlow> fetchRecentlyFinishedFlows(Duration maxAge)
+ public List<ExecutableFlow> fetchRecentlyFinishedFlows(final Duration maxAge)
throws ExecutorManagerException {
return new ArrayList<>();
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
index d3791e7..f00a807 100644
--- a/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/NumExecutionsDaoTest.java
@@ -70,7 +70,6 @@ public class NumExecutionsDaoTest {
flow1.setStatus(Status.PREPARING);
this.executionFlowDao.uploadExecutableFlow(flow1);
-
final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
flow2.setStatus(Status.RUNNING);
this.executionFlowDao.uploadExecutableFlow(flow2);
@@ -82,7 +81,8 @@ public class NumExecutionsDaoTest {
final int count = this.numExecutionsDao.fetchNumExecutableFlows();
assertThat(count).isEqualTo(3);
- final int flow2Count = this.numExecutionsDao.fetchNumExecutableFlows(1, "derived-member-data-2");
+ final int flow2Count = this.numExecutionsDao
+ .fetchNumExecutableFlows(1, "derived-member-data-2");
assertThat(flow2Count).isEqualTo(2);
}
diff --git a/azkaban-common/src/test/java/azkaban/jobtype/JobTypeManagerTest.java b/azkaban-common/src/test/java/azkaban/jobtype/JobTypeManagerTest.java
index 4ebea5e..181b8b0 100644
--- a/azkaban-common/src/test/java/azkaban/jobtype/JobTypeManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobtype/JobTypeManagerTest.java
@@ -37,8 +37,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
/**
- * Test the flow run, especially with embedded flows. Files are in
- * unit/plugins/jobtypes
+ * Test the flow run, especially with embedded flows. Files are in unit/plugins/jobtypes
*/
public class JobTypeManagerTest {
@@ -92,8 +91,8 @@ public class JobTypeManagerTest {
}
/**
- * Tests that the proper classes were loaded and that the common and the load
- * properties are properly loaded.
+ * Tests that the proper classes were loaded and that the common and the load properties are
+ * properly loaded.
*/
@Test
public void testLoadedClasses() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
index c90e149..2478868 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -155,7 +155,8 @@ public class JdbcProjectImplTest {
final Project project = this.loader.fetchProjectByName("mytestProject");
final File testFile = new File(getClass().getClassLoader().getResource(SAMPLE_FILE).getFile());
final int newVersion = this.loader.getLatestProjectVersion(project) + 1;
- this.loader.addProjectVersion(project.getId(), newVersion, testFile, "uploadUser1", computeHash(testFile), "resourceId1");
+ this.loader.addProjectVersion(project.getId(), newVersion, testFile, "uploadUser1",
+ computeHash(testFile), "resourceId1");
final int currVersion = this.loader.getLatestProjectVersion(project);
Assert.assertEquals(currVersion, newVersion);
}
@@ -177,7 +178,7 @@ public class JdbcProjectImplTest {
final Project project = this.loader.fetchProjectByName("mytestProject");
final int newVersion = this.loader.getLatestProjectVersion(project) + 7;
this.loader.changeProjectVersion(project, newVersion, "uploadUser1");
- final Project sameProject= this.loader.fetchProjectById(project.getId());
+ final Project sameProject = this.loader.fetchProjectById(project.getId());
Assert.assertEquals(sameProject.getVersion(), newVersion);
}
@@ -185,9 +186,11 @@ public class JdbcProjectImplTest {
public void testUpdatePermission() throws Exception {
createThreeProjects();
final Project project = this.loader.fetchProjectByName("mytestProject");
- this.loader.updatePermission(project, project.getLastModifiedUser(), new Permission(Permission.Type.ADMIN), false);
+ this.loader.updatePermission(project, project.getLastModifiedUser(),
+ new Permission(Permission.Type.ADMIN), false);
- final List<Triple<String, Boolean, Permission>> permissionsTriple = this.loader.getProjectPermissions(project);
+ final List<Triple<String, Boolean, Permission>> permissionsTriple = this.loader
+ .getProjectPermissions(project);
Assert.assertEquals(permissionsTriple.size(), 1);
Assert.assertEquals(permissionsTriple.get(0).getFirst(), "testUser1");
Assert.assertEquals(permissionsTriple.get(0).getThird().toString(), "ADMIN");
@@ -208,9 +211,11 @@ public class JdbcProjectImplTest {
public void testRemovePermission() throws Exception {
createThreeProjects();
final Project project = this.loader.fetchProjectByName("mytestProject");
- this.loader.updatePermission(project, project.getLastModifiedUser(), new Permission(Permission.Type.ADMIN), false);
+ this.loader.updatePermission(project, project.getLastModifiedUser(),
+ new Permission(Permission.Type.ADMIN), false);
this.loader.removePermission(project, project.getLastModifiedUser(), false);
- final List<Triple<String, Boolean, Permission>> permissionsTriple = this.loader.getProjectPermissions(project);
+ final List<Triple<String, Boolean, Permission>> permissionsTriple = this.loader
+ .getProjectPermissions(project);
Assert.assertEquals(permissionsTriple.size(), 0);
}
@@ -313,7 +318,8 @@ public class JdbcProjectImplTest {
final Project project = this.loader.fetchProjectByName("mytestProject");
this.loader.uploadProjectProperties(project, list);
- final Map<String, Props> propsMap = this.loader.fetchProjectProperties(project.getId(), project.getVersion());
+ final Map<String, Props> propsMap = this.loader
+ .fetchProjectProperties(project.getId(), project.getVersion());
Assert.assertEquals(propsMap.get("source1").get("key2"), "value2");
Assert.assertEquals(propsMap.get("source2").get("keyaaa"), "valueaaa");
}
@@ -329,7 +335,7 @@ public class JdbcProjectImplTest {
final ProjectFileHandler fileHandler = this.loader.getUploadedFile(project.getId(), newVersion);
Assert.assertEquals(fileHandler.getNumChunks(), 1);
- this.loader.cleanOlderProjectVersion(project.getId(), newVersion+1);
+ this.loader.cleanOlderProjectVersion(project.getId(), newVersion + 1);
final ProjectFileHandler fileHandler2 = this.loader
.fetchProjectMetaData(project.getId(), newVersion);
diff --git a/azkaban-common/src/test/java/azkaban/project/validator/XmlValidatorManagerTest.java b/azkaban-common/src/test/java/azkaban/project/validator/XmlValidatorManagerTest.java
index fc1da74..b5a3baa 100644
--- a/azkaban-common/src/test/java/azkaban/project/validator/XmlValidatorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/validator/XmlValidatorManagerTest.java
@@ -12,8 +12,8 @@ public class XmlValidatorManagerTest {
private final Props baseProps = new Props();
/**
- * Test that if the validator directory does not exist, XmlValidatorManager
- * should still load the default validator.
+ * Test that if the validator directory does not exist, XmlValidatorManager should still load the
+ * default validator.
*/
@Test
public void testNoValidatorsDir() {
diff --git a/azkaban-common/src/test/java/azkaban/test/Utils.java b/azkaban-common/src/test/java/azkaban/test/Utils.java
index 2dac04e..b19248f 100644
--- a/azkaban-common/src/test/java/azkaban/test/Utils.java
+++ b/azkaban-common/src/test/java/azkaban/test/Utils.java
@@ -30,7 +30,7 @@ public class Utils {
SERVICE_PROVIDER.setInjector(injector);
}
- public static DatabaseOperator initTestDB() throws Exception{
+ public static DatabaseOperator initTestDB() throws Exception {
final AzkabanDataSource dataSource = new EmbeddedH2BasicDataSource();
final String sqlScriptsDir = new File("../azkaban-db/src/main/sql/").getCanonicalPath();
diff --git a/azkaban-common/src/test/java/azkaban/trigger/BasicTimeCheckerTest.java b/azkaban-common/src/test/java/azkaban/trigger/BasicTimeCheckerTest.java
index 4ba6e32..904a558 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/BasicTimeCheckerTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/BasicTimeCheckerTest.java
@@ -43,8 +43,8 @@ public class BasicTimeCheckerTest {
/**
- * This test manipulates global states (time) in org.joda.time.DateTimeUtils . Thus this test
- * can run in parallel with tests that do the same.
+ * This test manipulates global states (time) in org.joda.time.DateTimeUtils . Thus this test can
+ * run in parallel with tests that do the same.
*/
@Test
public void periodTimerTest() {
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index 5d3dc22..d45992a 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -16,7 +16,6 @@
package azkaban.trigger;
-import azkaban.utils.AbstractMailerTest;
import azkaban.executor.AlerterHolder;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManager;
@@ -25,6 +24,7 @@ import azkaban.executor.MockExecutorLoader;
import azkaban.metrics.CommonMetrics;
import azkaban.metrics.MetricsManager;
import azkaban.trigger.builtin.CreateTriggerAction;
+import azkaban.utils.AbstractMailerTest;
import azkaban.utils.Emailer;
import azkaban.utils.Props;
import com.codahale.metrics.MetricRegistry;
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
index 8fbcbc3..de874f6 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
@@ -122,7 +122,7 @@ public class TriggerManagerTest {
assertTrue(t1.getStatus() == TriggerStatus.PAUSED);
}
- private void sleep (final long millis) {
+ private void sleep(final long millis) {
try {
Thread.sleep(millis);
} catch (final InterruptedException e) {
@@ -135,7 +135,7 @@ public class TriggerManagerTest {
final Map<String, ConditionChecker> expireCheckers = new HashMap<>();
final ConditionChecker triggerChecker = new ThresholdChecker(ThresholdChecker.type, threshold);
final ConditionChecker endTimeChecker = new BasicTimeChecker("EndTimeCheck_1", 111L,
- DateTimeZone.UTC, 2536871155000L,false, false,
+ DateTimeZone.UTC, 2536871155000L, false, false,
null, null);
triggerCheckers.put(triggerChecker.getId(), triggerChecker);
expireCheckers.put(endTimeChecker.getId(), endTimeChecker);
@@ -169,7 +169,7 @@ public class TriggerManagerTest {
// End time is 3 seconds past now.
final ConditionChecker endTimeChecker = new BasicTimeChecker("EndTimeChecker_1", 111L,
- DateTimeZone.UTC, currMillis + 3000L,false, false,
+ DateTimeZone.UTC, currMillis + 3000L, false, false,
null, null);
triggerCheckers.put(triggerChecker.getId(), triggerChecker);
expireCheckers.put(endTimeChecker.getId(), endTimeChecker);
@@ -199,6 +199,7 @@ public class TriggerManagerTest {
}
public static class MockTriggerLoader implements TriggerLoader {
+
private final Map<Integer, Trigger> triggers = new HashMap<>();
private int idIndex = 0;
diff --git a/azkaban-common/src/test/java/azkaban/user/PermissionTest.java b/azkaban-common/src/test/java/azkaban/user/PermissionTest.java
index 0ee4cab..8b55538 100644
--- a/azkaban-common/src/test/java/azkaban/user/PermissionTest.java
+++ b/azkaban-common/src/test/java/azkaban/user/PermissionTest.java
@@ -112,8 +112,8 @@ public class PermissionTest {
}
/**
- * Verify that the binary bit for UPLOADPROJECTS is not turned on
- * by setting the other permissions.
+ * Verify that the binary bit for UPLOADPROJECTS is not turned on by setting the other
+ * permissions.
*/
@Test
public void testUploadProjectFlag() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/user/UserUtilsTest.java b/azkaban-common/src/test/java/azkaban/user/UserUtilsTest.java
index c087a88..002925a 100644
--- a/azkaban-common/src/test/java/azkaban/user/UserUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/user/UserUtilsTest.java
@@ -8,6 +8,7 @@ import org.junit.Test;
public class UserUtilsTest {
+
@Test
public void testAdminUserCanUploadProject() throws UserManagerException {
final UserManager userManager = TestUtils.createTestXmlUserManager();
diff --git a/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java
index 371b670..8ff3e27 100644
--- a/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/ExternalLinkUtilsTest.java
@@ -59,8 +59,8 @@ public class ExternalLinkUtilsTest {
}
/**
- * Test validates the happy path when an external analyzer is configured
- * with '${url}' as the format in 'azkaban.properties'.
+ * Test validates the happy path when an external analyzer is configured with '${url}' as the
+ * format in 'azkaban.properties'.
*/
@Test
public void testGetExternalAnalyzerValidFormat() {
@@ -80,8 +80,8 @@ public class ExternalLinkUtilsTest {
}
/**
- * Test validates the happy path when an log viewer is configured
- * with '${execid}' and '${jobid} as the format in 'azkaban.properties'.
+ * Test validates the happy path when an log viewer is configured with '${execid}' and '${jobid}
+ * as the format in 'azkaban.properties'.
*/
@Test
public void testGetExternalLogViewerValidFormat() {
@@ -98,8 +98,8 @@ public class ExternalLinkUtilsTest {
}
/**
- * Test validates the condition when an external analyzer is not configured
- * in 'azkaban.properties'.
+ * Test validates the condition when an external analyzer is not configured in
+ * 'azkaban.properties'.
*/
@Test
public void testGetExternalAnalyzerNotConfigured() {
@@ -109,8 +109,8 @@ public class ExternalLinkUtilsTest {
}
/**
- * Test validates the condition when an external log viewer is not configured
- * in 'azkaban.properties'.
+ * Test validates the condition when an external log viewer is not configured in
+ * 'azkaban.properties'.
*/
@Test
public void testGetLogViewerNotConfigured() {
diff --git a/azkaban-common/src/test/java/azkaban/utils/PatternLayoutEscapedTest.java b/azkaban-common/src/test/java/azkaban/utils/PatternLayoutEscapedTest.java
index e854e21..68c2056 100644
--- a/azkaban-common/src/test/java/azkaban/utils/PatternLayoutEscapedTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/PatternLayoutEscapedTest.java
@@ -26,9 +26,9 @@ import org.junit.Before;
import org.junit.Test;
/**
- * Test output of PatternLayoutEscapedTest
- * It should be appending stack traces, escaping new lines, quotes, tabs and backslashes
- * This is necessary when we are logging these messages out as JSON objects
+ * Test output of PatternLayoutEscapedTest It should be appending stack traces, escaping new lines,
+ * quotes, tabs and backslashes This is necessary when we are logging these messages out as JSON
+ * objects
*/
public class PatternLayoutEscapedTest {
diff --git a/azkaban-common/src/test/resources/sql/create.execution_flows.sql b/azkaban-common/src/test/resources/sql/create.execution_flows.sql
index b2f7625..043ed90 100644
--- a/azkaban-common/src/test/resources/sql/create.execution_flows.sql
+++ b/azkaban-common/src/test/resources/sql/create.execution_flows.sql
@@ -1,20 +1,24 @@
CREATE TABLE execution_flows (
- exec_id INT NOT NULL AUTO_INCREMENT,
- project_id INT NOT NULL,
- version INT NOT NULL,
- flow_id VARCHAR(128) NOT NULL,
- status TINYINT,
- submit_user VARCHAR(64),
- submit_time BIGINT,
- update_time BIGINT,
- start_time BIGINT,
- end_time BIGINT,
- enc_type TINYINT,
- flow_data LONGBLOB,
- PRIMARY KEY (exec_id)
+ exec_id INT NOT NULL AUTO_INCREMENT,
+ project_id INT NOT NULL,
+ version INT NOT NULL,
+ flow_id VARCHAR(128) NOT NULL,
+ status TINYINT,
+ submit_user VARCHAR(64),
+ submit_time BIGINT,
+ update_time BIGINT,
+ start_time BIGINT,
+ end_time BIGINT,
+ enc_type TINYINT,
+ flow_data LONGBLOB,
+ PRIMARY KEY (exec_id)
);
-CREATE INDEX ex_flows_start_time ON execution_flows(start_time);
-CREATE INDEX ex_flows_end_time ON execution_flows(end_time);
-CREATE INDEX ex_flows_time_range ON execution_flows(start_time, end_time);
-CREATE INDEX ex_flows_flows ON execution_flows(project_id, flow_id);
+CREATE INDEX ex_flows_start_time
+ ON execution_flows (start_time);
+CREATE INDEX ex_flows_end_time
+ ON execution_flows (end_time);
+CREATE INDEX ex_flows_time_range
+ ON execution_flows (start_time, end_time);
+CREATE INDEX ex_flows_flows
+ ON execution_flows (project_id, flow_id);
diff --git a/azkaban-common/src/test/resources/sql/create.execution_jobs.sql b/azkaban-common/src/test/resources/sql/create.execution_jobs.sql
index a62d3a9..d2379f2 100644
--- a/azkaban-common/src/test/resources/sql/create.execution_jobs.sql
+++ b/azkaban-common/src/test/resources/sql/create.execution_jobs.sql
@@ -1,19 +1,22 @@
CREATE TABLE execution_jobs (
- exec_id INT NOT NULL,
- project_id INT NOT NULL,
- version INT NOT NULL,
- flow_id VARCHAR(128) NOT NULL,
- job_id VARCHAR(128) NOT NULL,
- attempt INT,
- start_time BIGINT,
- end_time BIGINT,
- status TINYINT,
- input_params LONGBLOB,
- output_params LONGBLOB,
- attachments LONGBLOB,
- PRIMARY KEY (exec_id, job_id, attempt)
+ exec_id INT NOT NULL,
+ project_id INT NOT NULL,
+ version INT NOT NULL,
+ flow_id VARCHAR(128) NOT NULL,
+ job_id VARCHAR(128) NOT NULL,
+ attempt INT,
+ start_time BIGINT,
+ end_time BIGINT,
+ status TINYINT,
+ input_params LONGBLOB,
+ output_params LONGBLOB,
+ attachments LONGBLOB,
+ PRIMARY KEY (exec_id, job_id, attempt)
);
-CREATE INDEX exec_job ON execution_jobs(exec_id, job_id);
-CREATE INDEX exec_id ON execution_jobs(exec_id);
-CREATE INDEX ex_job_id ON execution_jobs(project_id, job_id);
+CREATE INDEX exec_job
+ ON execution_jobs (exec_id, job_id);
+CREATE INDEX exec_id
+ ON execution_jobs (exec_id);
+CREATE INDEX ex_job_id
+ ON execution_jobs (project_id, job_id);
diff --git a/azkaban-common/src/test/resources/sql/create.execution_logs.sql b/azkaban-common/src/test/resources/sql/create.execution_logs.sql
index 0aa6a36..80a9777 100644
--- a/azkaban-common/src/test/resources/sql/create.execution_logs.sql
+++ b/azkaban-common/src/test/resources/sql/create.execution_logs.sql
@@ -1,14 +1,16 @@
CREATE TABLE execution_logs (
- exec_id INT NOT NULL,
- name VARCHAR(128),
- attempt INT,
- enc_type TINYINT,
- start_byte INT,
- end_byte INT,
- log LONGBLOB,
- upload_time BIGINT,
- PRIMARY KEY (exec_id, name, attempt, start_byte)
+ exec_id INT NOT NULL,
+ name VARCHAR(128),
+ attempt INT,
+ enc_type TINYINT,
+ start_byte INT,
+ end_byte INT,
+ log LONGBLOB,
+ upload_time BIGINT,
+ PRIMARY KEY (exec_id, name, attempt, start_byte)
);
-CREATE INDEX ex_log_attempt ON execution_logs(exec_id, name, attempt);
-CREATE INDEX ex_log_index ON execution_logs(exec_id, name);
\ No newline at end of file
+CREATE INDEX ex_log_attempt
+ ON execution_logs (exec_id, name, attempt);
+CREATE INDEX ex_log_index
+ ON execution_logs (exec_id, name);
diff --git a/azkaban-common/src/test/resources/sql/create.project_events.sql b/azkaban-common/src/test/resources/sql/create.project_events.sql
index dd24d5f..dda35a8 100644
--- a/azkaban-common/src/test/resources/sql/create.project_events.sql
+++ b/azkaban-common/src/test/resources/sql/create.project_events.sql
@@ -1,9 +1,10 @@
CREATE TABLE project_events (
- project_id INT NOT NULL,
- event_type TINYINT NOT NULL,
- event_time BIGINT NOT NULL,
- username VARCHAR(64),
- message VARCHAR(512)
+ project_id INT NOT NULL,
+ event_type TINYINT NOT NULL,
+ event_time BIGINT NOT NULL,
+ username VARCHAR(64),
+ message VARCHAR(512)
);
-CREATE INDEX log ON project_events(project_id, event_time);
+CREATE INDEX log
+ ON project_events (project_id, event_time);
diff --git a/azkaban-common/src/test/resources/sql/create.properties.sql b/azkaban-common/src/test/resources/sql/create.properties.sql
index aaa37ec..27694fd 100644
--- a/azkaban-common/src/test/resources/sql/create.properties.sql
+++ b/azkaban-common/src/test/resources/sql/create.properties.sql
@@ -1,7 +1,7 @@
CREATE TABLE properties (
- name VARCHAR(64) NOT NULL,
- type INT NOT NULL,
- modified_time BIGINT NOT NULL,
- value VARCHAR(256),
- PRIMARY KEY (name, type)
-);
\ No newline at end of file
+ name VARCHAR(64) NOT NULL,
+ type INT NOT NULL,
+ modified_time BIGINT NOT NULL,
+ value VARCHAR(256),
+ PRIMARY KEY (name, type)
+);
diff --git a/azkaban-common/src/test/resources/sql/update.execution_jobs.2.1.sql b/azkaban-common/src/test/resources/sql/update.execution_jobs.2.1.sql
index b7313ad..5f7cf07 100644
--- a/azkaban-common/src/test/resources/sql/update.execution_jobs.2.1.sql
+++ b/azkaban-common/src/test/resources/sql/update.execution_jobs.2.1.sql
@@ -1,4 +1,8 @@
-ALTER TABLE execution_jobs ADD COLUMN attempt INT DEFAULT 0;
-ALTER TABLE execution_jobs DROP PRIMARY KEY;
-ALTER TABLE execution_jobs ADD PRIMARY KEY(exec_id, job_id, attempt);
-ALTER TABLE execution_jobs ADD INDEX exec_job (exec_id, job_id);
+ALTER TABLE execution_jobs
+ ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_jobs
+ DROP PRIMARY KEY;
+ALTER TABLE execution_jobs
+ ADD PRIMARY KEY (exec_id, job_id, attempt);
+ALTER TABLE execution_jobs
+ ADD INDEX exec_job (exec_id, job_id);
diff --git a/azkaban-common/src/test/resources/sql/update.execution_jobs.2.3.sql b/azkaban-common/src/test/resources/sql/update.execution_jobs.2.3.sql
index 8c43495..0da2610 100644
--- a/azkaban-common/src/test/resources/sql/update.execution_jobs.2.3.sql
+++ b/azkaban-common/src/test/resources/sql/update.execution_jobs.2.3.sql
@@ -1 +1 @@
-INSERT INTO properties (name,value,modified_time,type) VALUES ('test4', 'value1', 0, 99);
\ No newline at end of file
+INSERT INTO properties (name, value, modified_time, type) VALUES ('test4', 'value1', 0, 99);
diff --git a/azkaban-common/src/test/resources/sql/update.execution_logs.2.1.sql b/azkaban-common/src/test/resources/sql/update.execution_logs.2.1.sql
index 5c2dc0b..61ab60c 100644
--- a/azkaban-common/src/test/resources/sql/update.execution_logs.2.1.sql
+++ b/azkaban-common/src/test/resources/sql/update.execution_logs.2.1.sql
@@ -1,7 +1,14 @@
-ALTER TABLE execution_logs ADD COLUMN attempt INT DEFAULT 0;
-ALTER TABLE execution_logs ADD COLUMN upload_time BIGINT DEFAULT 1420099200000;
-UPDATE execution_logs SET upload_time=(UNIX_TIMESTAMP()*1000) WHERE upload_time=1420099200000;
+ALTER TABLE execution_logs
+ ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_logs
+ ADD COLUMN upload_time BIGINT DEFAULT 1420099200000;
+UPDATE execution_logs
+SET upload_time = (UNIX_TIMESTAMP() * 1000)
+WHERE upload_time = 1420099200000;
-ALTER TABLE execution_logs DROP PRIMARY KEY;
-ALTER TABLE execution_logs ADD PRIMARY KEY(exec_id, name, attempt, start_byte);
-ALTER TABLE execution_logs ADD INDEX ex_log_attempt (exec_id, name, attempt)
+ALTER TABLE execution_logs
+ DROP PRIMARY KEY;
+ALTER TABLE execution_logs
+ ADD PRIMARY KEY (exec_id, name, attempt, start_byte);
+ALTER TABLE execution_logs
+ ADD INDEX ex_log_attempt (exec_id, name, attempt)
diff --git a/azkaban-common/src/test/resources/sql/update.execution_logs.2.4.sql b/azkaban-common/src/test/resources/sql/update.execution_logs.2.4.sql
index f0a7aae..b9e1530 100644
--- a/azkaban-common/src/test/resources/sql/update.execution_logs.2.4.sql
+++ b/azkaban-common/src/test/resources/sql/update.execution_logs.2.4.sql
@@ -1 +1 @@
-INSERT INTO properties (name,value,modified_time,type) VALUES ('test', 'value1', 0, 99);
\ No newline at end of file
+INSERT INTO properties (name, value, modified_time, type) VALUES ('test', 'value1', 0, 99);
diff --git a/azkaban-common/src/test/resources/sql/update.execution_logs.2.7.sql b/azkaban-common/src/test/resources/sql/update.execution_logs.2.7.sql
index 9974cc7..a474b5f 100644
--- a/azkaban-common/src/test/resources/sql/update.execution_logs.2.7.sql
+++ b/azkaban-common/src/test/resources/sql/update.execution_logs.2.7.sql
@@ -1 +1 @@
-INSERT INTO properties (name,value,modified_time,type) VALUES ('test1','value1', 0, 99);
\ No newline at end of file
+INSERT INTO properties (name, value, modified_time, type) VALUES ('test1', 'value1', 0, 99);
diff --git a/azkaban-common/src/test/resources/sql/update.project_events.2.1.sql b/azkaban-common/src/test/resources/sql/update.project_events.2.1.sql
index 14d7554..e02595c 100644
--- a/azkaban-common/src/test/resources/sql/update.project_events.2.1.sql
+++ b/azkaban-common/src/test/resources/sql/update.project_events.2.1.sql
@@ -1 +1,2 @@
-ALTER TABLE project_events MODIFY COLUMN message VARCHAR(512);
+ALTER TABLE project_events
+ MODIFY COLUMN message VARCHAR(512);