azkaban-developers
Changes
src/java/azkaban/execapp/FlowRunner.java 12(+10 -2)
src/java/azkaban/execapp/JobRunner.java 23(+18 -5)
Details
diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index 50be4f6..a6ee307 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -110,12 +110,6 @@ public class AzkabanExecutorServer {
configureMBeanServer();
- File statsDir = new File(props.getString("azkaban.stats.dir", "stats"));
- if (!statsDir.exists()) {
- statsDir.mkdir();
- }
- props.put("azkaban.stats.dir", statsDir.getCanonicalPath());
-
try {
server.start();
}
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 277374e..60ad9b5 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.servlet.ServletConfig;
@@ -98,8 +99,8 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
else if (action.equals(LOG_ACTION)) {
handleFetchLogEvent(execid, req, resp, respMap);
}
- else if (action.equals(STATS_ACTION)) {
- handleFetchStatsEvent(execid, req, resp, respMap);
+ else if (action.equals(ATTACHMENT_ACTION)) {
+ handleFetchAttachmentEvent(execid, req, resp, respMap);
}
else if (action.equals(EXECUTE_ACTION)) {
handleAjaxExecute(req, respMap, execid);
@@ -206,16 +207,18 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
}
- private void handleFetchStatsEvent(
+ private void handleFetchAttachmentEvent(
int execId,
HttpServletRequest req,
HttpServletResponse resp,
Map<String, Object> respMap) throws ServletException {
String jobId = getParam(req, "jobId");
+ int attempt = getIntParam(req, "attempt", 0);
try {
- String result = flowRunnerManager.readJobAttachment(execId, jobId);
- respMap.put("stats", result);
+ List<Object> result = flowRunnerManager.readJobAttachment(
+ execId, jobId, attempt);
+ respMap.put("attachment", result);
}
catch (Exception e) {
logger.error(e);
src/java/azkaban/execapp/FlowRunner.java 12(+10 -2)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 993d1b4..db33b46 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -839,9 +839,17 @@ public class FlowRunner extends EventHandler implements Runnable {
return logFile;
}
- public String getJobAttachment(String jobId) {
+ public File getJobAttachmentFile(String jobId, int attempt) {
ExecutableNode node = flow.getExecutableNode(jobId);
- return node.getAttachment();
+ File path = new File(execDir, node.getJobSource());
+
+ String attachmentFileName =
+ JobRunner.createAttachmentFileName(execId, jobId, attempt);
+ File attachmentFile = new File(path.getParentFile(), attachmentFileName);
+ if (!attachmentFile.exists()) {
+ return null;
+ }
+ return attachmentFile;
}
public File getJobMetaDataFile(String jobId, int attempt) {
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 9a1c44f..3c19e31 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -50,6 +51,7 @@ import azkaban.jobtype.JobTypeManager;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
@@ -554,7 +556,7 @@ public class FlowRunnerManager implements EventListener {
File dir = runner.getExecutionDir();
if (dir != null && dir.exists()) {
try {
- synchronized(executionDirDeletionSync) {
+ synchronized (executionDirDeletionSync) {
if (!dir.exists()) {
throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
}
@@ -574,14 +576,38 @@ public class FlowRunnerManager implements EventListener {
throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
}
- public String readJobAttachment(int execId, String jobId)
+ public List<Object> readJobAttachment(int execId, String jobId, int attempt)
throws ExecutorManagerException {
FlowRunner runner = runningFlows.get(execId);
if (runner == null) {
- throw new ExecutorManagerException("Running flow " + execId + " not found.");
+ throw new ExecutorManagerException(
+ "Running flow " + execId + " not found.");
}
- return runner.getJobAttachment(jobId);
+ File dir = runner.getExecutionDir();
+ if (dir == null || !dir.exists()) {
+ throw new ExecutorManagerException(
+ "Error reading file. Log directory doesn't exist.");
+ }
+
+ try {
+ synchronized (executionDirDeletionSync) {
+ if (!dir.exists()) {
+ throw new ExecutorManagerException(
+ "Execution dir file doesn't exist. Probably has beend deleted");
+ }
+
+ File attachmentFile = runner.getJobAttachmentFile(jobId, attempt);
+ if (attachmentFile == null || !attachmentFile.exists()) {
+ throw new ExecutorManagerException(
+ "Job attachment file doesn't exist.");
+ }
+ return (ArrayList<Object>) JSONUtils.parseJSONFromFile(attachmentFile);
+ }
+ }
+ catch (IOException e) {
+ throw new ExecutorManagerException(e);
+ }
}
public JobMetaData readJobMetaData(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
src/java/azkaban/execapp/JobRunner.java 23(+18 -5)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 99bd8ab..ff0c707 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -588,11 +588,24 @@ public class JobRunner extends EventHandler implements Runnable {
return props.getLong("retry.backoff", 0);
}
- public static String createLogFileName(int executionId, String jobId, int attempt) {
- return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
+ public static String createAttachmentFileName(
+ int executionId, String jobId, int attempt) {
+ return attempt > 0
+ ? "_job." + executionId + "." + attempt + "." + jobId + ".attach"
+ : "_job." + executionId + "." + jobId + ".attach";
}
-
- public static String createMetaDataFileName(int executionId, String jobId, int attempt) {
- return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".meta" : "_job." + executionId + "." + jobId + ".meta";
+
+ public static String createLogFileName(
+ int executionId, String jobId, int attempt) {
+ return attempt > 0
+ ? "_job." + executionId + "." + attempt + "." + jobId + ".log"
+ : "_job." + executionId + "." + jobId + ".log";
+ }
+
+ public static String createMetaDataFileName(
+ int executionId, String jobId, int attempt) {
+ return attempt > 0
+ ? "_job." + executionId + "." + attempt + "." + jobId + ".meta"
+ : "_job." + executionId + "." + jobId + ".meta";
}
}
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 40dd21a..f228031 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -30,7 +30,7 @@ public interface ConnectorParams {
public static final String RESUME_ACTION = "resume";
public static final String PING_ACTION = "ping";
public static final String LOG_ACTION = "log";
- public static final String STATS_ACTION = "stats";
+ public static final String ATTACHMENT_ACTION = "attachment";
public static final String METADATA_ACTION = "metadata";
public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 187d969..5fbeed2 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -62,8 +62,6 @@ public class ExecutableNode {
private Props inputProps;
private Props outputProps;
- private String attachment;
-
public static final String ATTEMPT_PARAM = "attempt";
public static final String PASTATTEMPTS_PARAM = "pastAttempts";
@@ -207,14 +205,6 @@ public class ExecutableNode {
return outputProps;
}
- public String getAttachment() {
- return attachment;
- }
-
- public void setAttachment(String attachment) {
- this.attachment = attachment;
- }
-
public long getDelayedExecution() {
return delayExecution;
}
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index bb39046..c59def3 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -47,7 +47,7 @@ public interface ExecutorLoader {
public LogData fetchLogs(int execId, String name, int attempt, int startByte, int endByte) throws ExecutorManagerException;
- public String fetchStats(int execId, String name) throws ExecutorManagerException;
+ public List<Object> fetchAttachment(int execId, String name, int attempt) throws ExecutorManagerException;
public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException;
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 7b86331..37b858e 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -305,22 +305,26 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
}
@Override
- public String getExecutionJobStats(ExecutableFlow exFlow, String jobId)
+ public List<Object> getExecutionJobStats(
+ ExecutableFlow exFlow, String jobId, int attempt)
throws ExecutorManagerException {
Pair<ExecutionReference, ExecutableFlow> pair =
runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
- return executorLoader.fetchStats(exFlow.getExecutionId(), jobId);
+ return executorLoader.fetchAttachment(
+ exFlow.getExecutionId(), jobId, attempt);
}
Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
+ Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked")
Map<String, Object> result = callExecutorServer(
pair.getFirst(),
- ConnectorParams.STATS_ACTION,
- jobIdParam);
- return (String) result.get("stats");
+ ConnectorParams.ATTACHMENT_ACTION,
+ jobIdParam,
+ attemptParam);
+ return (List<Object>) result.get("attachment");
}
@Override
diff --git a/src/java/azkaban/executor/ExecutorManagerAdapter.java b/src/java/azkaban/executor/ExecutorManagerAdapter.java
index e40c97b..af64ec4 100644
--- a/src/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/src/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -91,7 +91,7 @@ public interface ExecutorManagerAdapter{
public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
- public String getExecutionJobStats(ExecutableFlow exflow, String jobId) throws ExecutorManagerException;
+ public List<Object> getExecutionJobStats(ExecutableFlow exflow, String jobId, int attempt) throws ExecutorManagerException;
public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 3fc12db..0a3d0b5 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -709,20 +709,25 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
}
@Override
- public String fetchStats(int execId, String jobId)
+ public List<Object> fetchAttachment(int execId, String jobId, int attempt)
throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
try {
- String stats = runner.query(
- FetchExecutableJobStatsHandler.FETCH_ATTACHMENT_EXECUTABLE_NODE,
- new FetchExecutableJobStatsHandler(),
+ String attachment = runner.query(
+ FetchExecutableJobAttachmentHandler.FETCH_ATTACHMENT_EXECUTABLE_NODE,
+ new FetchExecutableJobAttachmentHandler(),
execId,
jobId);
- return stats;
+ return (List<Object>) JSONUtils.parseJSONFromString(attachment);
}
+ catch (IOException e) {
+ throw new ExecutorManagerException(
+ "Error converting job attachment to JSON " + jobId, e);
+ }
catch (SQLException e) {
- throw new ExecutorManagerException("Error query job stats " + jobId, e);
+ throw new ExecutorManagerException(
+ "Error query job attachment " + jobId, e);
}
}
@@ -996,7 +1001,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
}
}
- private static class FetchExecutableJobStatsHandler
+ private static class FetchExecutableJobAttachmentHandler
implements ResultSetHandler<String> {
private static String FETCH_ATTACHMENT_EXECUTABLE_NODE =
"SELECT attachment FROM execution_jobs WHERE exec_id=? AND job_id=?";
@@ -1004,17 +1009,17 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
@SuppressWarnings("unchecked")
@Override
public String handle(ResultSet rs) throws SQLException {
- String statsJson = "";
+ String attachmentJson = "";
if (rs.next()) {
try {
- byte[] stats = rs.getBytes(1);
- statsJson = GZIPUtils.unGzipString(stats, "UTF-8");
+ byte[] attachment = rs.getBytes(1);
+ attachmentJson = GZIPUtils.unGzipString(attachment, "UTF-8");
}
catch (IOException e) {
- throw new SQLException("Error decoding job stats", e);
+ throw new SQLException("Error decoding job attachment", e);
}
}
- return statsJson;
+ return attachmentJson;
}
}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 52e8d7c..25a81c7 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -197,12 +197,6 @@ public class AzkabanWebServer extends AzkabanServer {
tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
- File statsDir = new File(props.getString("azkaban.stats.dir", "stats"));
- if (!statsDir.exists()) {
- statsDir.mkdir();
- }
- props.put("azkaban.stats.dir", statsDir.getCanonicalPath());
-
// Setup time zone
if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
String timezone = props.getString(DEFAULT_TIMEZONE_ID);
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 9781043..f330d53 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -58,8 +58,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private ScheduleManager scheduleManager;
private ExecutorVelocityHelper velocityHelper;
- private String statsDir;
-
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
@@ -68,7 +66,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
executorManager = server.getExecutorManager();
scheduleManager = server.getScheduleManager();
velocityHelper = new ExecutorVelocityHelper();
- statsDir = server.getServerProps().getString("azkaban.stats.dir");
}
@Override
@@ -476,19 +473,15 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
return;
}
- String statsJson = executorManager.getExecutionJobStats(exFlow, jobId);
- List<Object> jsonObj =
- (ArrayList<Object>) JSONUtils.parseJSONFromString(statsJson);
+ int attempt = this.getIntParam(req, "attempt", node.getAttempt());
+ List<Object> jsonObj = executorManager.getExecutionJobStats(
+ exFlow, jobId, attempt);
ret.put("jobStats", jsonObj);
}
catch (ExecutorManagerException e) {
ret.put("error", "Error retrieving stats for job " + jobId);
return;
}
- catch (IOException e) {
- ret.put("error", "Cannot write JSON for stats for job " + jobId);
- return;
- }
}
private void ajaxFetchFlowInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectName, String flowId) throws ServletException {
diff --git a/src/package/execserver/conf/azkaban.properties b/src/package/execserver/conf/azkaban.properties
index 585e208..b54b0f5 100644
--- a/src/package/execserver/conf/azkaban.properties
+++ b/src/package/execserver/conf/azkaban.properties
@@ -8,8 +8,6 @@ azkaban.jobtype.plugin.dir=plugins/jobtypes
executor.global.properties=conf/global.properties
azkaban.project.dir=projects
-azkaban.stats.dir=stats
-
database.type=mysql
mysql.port=3306
mysql.host=localhost
diff --git a/src/package/soloserver/conf/azkaban.properties b/src/package/soloserver/conf/azkaban.properties
index 9ca591e..7524a14 100644
--- a/src/package/soloserver/conf/azkaban.properties
+++ b/src/package/soloserver/conf/azkaban.properties
@@ -20,9 +20,6 @@ database.type=h2
h2.path=data/azkaban
h2.create.tables=true
-# Stats
-azkaban.stats.dir=stats
-
# Velocity dev mode
velocity.dev.mode=false
diff --git a/src/package/webserver/conf/azkaban.properties b/src/package/webserver/conf/azkaban.properties
index 97e40ae..3ccb2f3 100644
--- a/src/package/webserver/conf/azkaban.properties
+++ b/src/package/webserver/conf/azkaban.properties
@@ -22,8 +22,6 @@ mysql.user=azkaban
mysql.password=azkaban
mysql.numconnections=100
-azkaban.stats.dir=stats
-
# Velocity dev mode
velocity.dev.mode=false