azkaban-aplcache

Remove code for stats.dir property. Plumb retrieving job attachment

1/16/2014 7:58:07 AM

Details

diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index 50be4f6..a6ee307 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -110,12 +110,6 @@ public class AzkabanExecutorServer {
 		
 		configureMBeanServer();
 
-    File statsDir = new File(props.getString("azkaban.stats.dir", "stats"));
-    if (!statsDir.exists()) {
-      statsDir.mkdir();
-    }
-    props.put("azkaban.stats.dir", statsDir.getCanonicalPath());
-		
 		try {
 			server.start();
 		} 
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 277374e..60ad9b5 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import javax.servlet.ServletConfig;
@@ -98,8 +99,8 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 					else if (action.equals(LOG_ACTION)) { 
 						handleFetchLogEvent(execid, req, resp, respMap);
 					}
-					else if (action.equals(STATS_ACTION)) { 
-						handleFetchStatsEvent(execid, req, resp, respMap);
+					else if (action.equals(ATTACHMENT_ACTION)) { 
+						handleFetchAttachmentEvent(execid, req, resp, respMap);
 					}
 					else if (action.equals(EXECUTE_ACTION)) {
 						handleAjaxExecute(req, respMap, execid);
@@ -206,16 +207,18 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 		}
 	}
 
-	private void handleFetchStatsEvent(
+	private void handleFetchAttachmentEvent(
 			int execId, 
 			HttpServletRequest req, 
 			HttpServletResponse resp, 
 			Map<String, Object> respMap) throws ServletException {
 
 		String jobId = getParam(req, "jobId");
+    int attempt = getIntParam(req, "attempt", 0);
 		try {
-			String result = flowRunnerManager.readJobAttachment(execId, jobId);
-			respMap.put("stats", result);
+			List<Object> result = flowRunnerManager.readJobAttachment(
+          execId, jobId, attempt);
+			respMap.put("attachment", result);
 		}
 		catch (Exception e) {
 			logger.error(e);
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 993d1b4..db33b46 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -839,9 +839,17 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return logFile;
 	}
 
-	public String getJobAttachment(String jobId) {
+	public File getJobAttachmentFile(String jobId, int attempt) {
 		ExecutableNode node = flow.getExecutableNode(jobId);
-		return node.getAttachment();
+    File path = new File(execDir, node.getJobSource());
+
+    String attachmentFileName =
+        JobRunner.createAttachmentFileName(execId, jobId, attempt);
+    File attachmentFile = new File(path.getParentFile(), attachmentFileName);
+    if (!attachmentFile.exists()) {
+      return null;
+    }
+    return attachmentFile;
 	}
 	
 	public File getJobMetaDataFile(String jobId, int attempt) {
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 9a1c44f..3c19e31 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -50,6 +51,7 @@ import azkaban.jobtype.JobTypeManager;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 
@@ -554,7 +556,7 @@ public class FlowRunnerManager implements EventListener {
 		File dir = runner.getExecutionDir();
 		if (dir != null && dir.exists()) {
 			try {
-				synchronized(executionDirDeletionSync) {
+				synchronized (executionDirDeletionSync) {
 					if (!dir.exists()) {
 						throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
 					}
@@ -574,14 +576,38 @@ public class FlowRunnerManager implements EventListener {
 		throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
 	}
 
-	public String readJobAttachment(int execId, String jobId)
+	public List<Object> readJobAttachment(int execId, String jobId, int attempt)
 			throws ExecutorManagerException {
 		FlowRunner runner = runningFlows.get(execId);
 		if (runner == null) {
-			throw new ExecutorManagerException("Running flow " + execId + " not found.");
+			throw new ExecutorManagerException(
+          "Running flow " + execId + " not found.");
 		}
 
-		return runner.getJobAttachment(jobId);
+    File dir = runner.getExecutionDir();
+    if (dir == null || !dir.exists()) {
+      throw new ExecutorManagerException(
+          "Error reading file. Log directory doesn't exist.");
+    }
+
+    try {
+      synchronized (executionDirDeletionSync) {
+        if (!dir.exists()) {
+					throw new ExecutorManagerException(
+              "Execution dir file doesn't exist. Probably has beend deleted");
+        }
+
+        File attachmentFile = runner.getJobAttachmentFile(jobId, attempt);
+        if (attachmentFile == null || !attachmentFile.exists()) {
+          throw new ExecutorManagerException(
+              "Job attachment file doesn't exist.");
+        }
+        return (ArrayList<Object>) JSONUtils.parseJSONFromFile(attachmentFile);
+      }
+    }
+    catch (IOException e) {
+      throw new ExecutorManagerException(e);
+    }
 	}
 	
 	public JobMetaData readJobMetaData(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 99bd8ab..ff0c707 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -588,11 +588,24 @@ public class JobRunner extends EventHandler implements Runnable {
 		return props.getLong("retry.backoff", 0);
 	}
 	
-	public static String createLogFileName(int executionId, String jobId, int attempt) {
-		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
+	public static String createAttachmentFileName(
+      int executionId, String jobId, int attempt) {
+		return attempt > 0 
+        ? "_job." + executionId + "." + attempt + "." + jobId + ".attach" 
+        : "_job." + executionId + "." + jobId + ".attach";
 	}
-	
-	public static String createMetaDataFileName(int executionId, String jobId, int attempt) {
-		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".meta" : "_job." + executionId + "." + jobId + ".meta";
+
+	public static String createLogFileName(
+      int executionId, String jobId, int attempt) {
+		return attempt > 0 
+        ? "_job." + executionId + "." + attempt + "." + jobId + ".log" 
+        : "_job." + executionId + "." + jobId + ".log";
+	}
+	
+	public static String createMetaDataFileName(
+      int executionId, String jobId, int attempt) {
+		return attempt > 0 
+        ? "_job." + executionId + "." + attempt + "." + jobId + ".meta" 
+        : "_job." + executionId + "." + jobId + ".meta";
 	}
 }
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 40dd21a..f228031 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -30,7 +30,7 @@ public interface ConnectorParams {
 	public static final String RESUME_ACTION = "resume";
 	public static final String PING_ACTION = "ping";
 	public static final String LOG_ACTION = "log";
-	public static final String STATS_ACTION = "stats";
+	public static final String ATTACHMENT_ACTION = "attachment";
 	public static final String METADATA_ACTION = "metadata";
 	
 	public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 187d969..5fbeed2 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -62,8 +62,6 @@ public class ExecutableNode {
 	private Props inputProps;
 	private Props outputProps;
 
-	private String attachment;
-	
 	public static final String ATTEMPT_PARAM = "attempt";
 	public static final String PASTATTEMPTS_PARAM = "pastAttempts";
 	
@@ -207,14 +205,6 @@ public class ExecutableNode {
 		return outputProps;
 	}
 
-	public String getAttachment() {
-		return attachment;
-	}
-
-	public void setAttachment(String attachment) {
-		this.attachment = attachment;
-	}
-	
 	public long getDelayedExecution() {
 		return delayExecution;
 	}
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index bb39046..c59def3 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -47,7 +47,7 @@ public interface ExecutorLoader {
 
 	public LogData fetchLogs(int execId, String name, int attempt, int startByte, int endByte) throws ExecutorManagerException;
 
-	public String fetchStats(int execId, String name) throws ExecutorManagerException;
+	public List<Object> fetchAttachment(int execId, String name, int attempt) throws ExecutorManagerException;
 
 	public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException;
 
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 7b86331..37b858e 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -305,22 +305,26 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 	}
 
 	@Override
-	public String getExecutionJobStats(ExecutableFlow exFlow, String jobId)
+	public List<Object> getExecutionJobStats(
+      ExecutableFlow exFlow, String jobId, int attempt)
 			throws ExecutorManagerException {
 		Pair<ExecutionReference, ExecutableFlow> pair = 
 				runningFlows.get(exFlow.getExecutionId());
 		if (pair == null) {
-			return executorLoader.fetchStats(exFlow.getExecutionId(), jobId);
+			return executorLoader.fetchAttachment(
+          exFlow.getExecutionId(), jobId, attempt);
 		}
 
 		Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
+    Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
 		
 		@SuppressWarnings("unchecked")
 		Map<String, Object> result = callExecutorServer(
 				pair.getFirst(),
-				ConnectorParams.STATS_ACTION,
-				jobIdParam);
-		return (String) result.get("stats");
+				ConnectorParams.ATTACHMENT_ACTION,
+				jobIdParam,
+        attemptParam);
+		return (List<Object>) result.get("attachment");
 	}
 	
 	@Override
diff --git a/src/java/azkaban/executor/ExecutorManagerAdapter.java b/src/java/azkaban/executor/ExecutorManagerAdapter.java
index e40c97b..af64ec4 100644
--- a/src/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/src/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -91,7 +91,7 @@ public interface ExecutorManagerAdapter{
 	
 	public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
 
-	public String getExecutionJobStats(ExecutableFlow exflow, String jobId) throws ExecutorManagerException;
+	public List<Object> getExecutionJobStats(ExecutableFlow exflow, String jobId, int attempt) throws ExecutorManagerException;
 	
 	public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
 	
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 3fc12db..0a3d0b5 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -709,20 +709,25 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 	}
 
 	@Override
-	public String fetchStats(int execId, String jobId)
+	public List<Object> fetchAttachment(int execId, String jobId, int attempt)
 			throws ExecutorManagerException {
 		QueryRunner runner = createQueryRunner();
 
 		try {
-			String stats = runner.query(
-					FetchExecutableJobStatsHandler.FETCH_ATTACHMENT_EXECUTABLE_NODE,
-					new FetchExecutableJobStatsHandler(),
+			String attachment = runner.query(
+					FetchExecutableJobAttachmentHandler.FETCH_ATTACHMENT_EXECUTABLE_NODE,
+					new FetchExecutableJobAttachmentHandler(),
 					execId,
 					jobId);
-			return stats;
+			return (List<Object>) JSONUtils.parseJSONFromString(attachment);
 		}
+    catch (IOException e) {
+			throw new ExecutorManagerException(
+          "Error converting job attachment to JSON " + jobId, e);
+    }
 		catch (SQLException e) {
-			throw new ExecutorManagerException("Error query job stats " + jobId, e);
+			throw new ExecutorManagerException(
+          "Error query job attachment " + jobId, e);
 		}
 	}
 	
@@ -996,7 +1001,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 		}
 	}
 
-	private static class FetchExecutableJobStatsHandler
+	private static class FetchExecutableJobAttachmentHandler
 			implements ResultSetHandler<String> {
 		private static String FETCH_ATTACHMENT_EXECUTABLE_NODE = 
 				"SELECT attachment FROM execution_jobs WHERE exec_id=? AND job_id=?";
@@ -1004,17 +1009,17 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 		@SuppressWarnings("unchecked")
 		@Override
 		public String handle(ResultSet rs) throws SQLException {
-			String statsJson = "";
+			String attachmentJson = "";
 			if (rs.next()) {
 				try {
-					byte[] stats = rs.getBytes(1);
-					statsJson = GZIPUtils.unGzipString(stats, "UTF-8");
+					byte[] attachment = rs.getBytes(1);
+					attachmentJson = GZIPUtils.unGzipString(attachment, "UTF-8");
 				}
 				catch (IOException e) {
-					throw new SQLException("Error decoding job stats", e);
+					throw new SQLException("Error decoding job attachment", e);
 				}
 			}
-			return statsJson;
+			return attachmentJson;
 		}
 	}
 	
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 52e8d7c..25a81c7 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -197,12 +197,6 @@ public class AzkabanWebServer extends AzkabanServer {
 		
 		tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
 
-		File statsDir = new File(props.getString("azkaban.stats.dir", "stats"));
-		if (!statsDir.exists()) {
-			statsDir.mkdir();
-		}
-    props.put("azkaban.stats.dir", statsDir.getCanonicalPath());
-
 		// Setup time zone
 		if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
 			String timezone = props.getString(DEFAULT_TIMEZONE_ID);
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 9781043..f330d53 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -58,8 +58,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 	private ScheduleManager scheduleManager;
 	private ExecutorVelocityHelper velocityHelper;
 
-  private String statsDir;
-
 	@Override
 	public void init(ServletConfig config) throws ServletException {
 		super.init(config);
@@ -68,7 +66,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		executorManager = server.getExecutorManager();
 		scheduleManager = server.getScheduleManager();
 		velocityHelper = new ExecutorVelocityHelper();
-    statsDir = server.getServerProps().getString("azkaban.stats.dir");
 	}
 
 	@Override
@@ -476,19 +473,15 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 				return;
 			}
 
-			String statsJson = executorManager.getExecutionJobStats(exFlow, jobId);
-      List<Object> jsonObj = 
-          (ArrayList<Object>) JSONUtils.parseJSONFromString(statsJson);
+			int attempt = this.getIntParam(req, "attempt", node.getAttempt());
+			List<Object> jsonObj = executorManager.getExecutionJobStats(
+          exFlow, jobId, attempt);
       ret.put("jobStats", jsonObj);
     }
 		catch (ExecutorManagerException e) {
 			ret.put("error", "Error retrieving stats for job " + jobId);
 			return;
 		}
-    catch (IOException e) {
-      ret.put("error", "Cannot write JSON for stats for job " + jobId);
-      return;
-		}
   }
 
 	private void ajaxFetchFlowInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectName, String flowId) throws ServletException {
diff --git a/src/package/execserver/conf/azkaban.properties b/src/package/execserver/conf/azkaban.properties
index 585e208..b54b0f5 100644
--- a/src/package/execserver/conf/azkaban.properties
+++ b/src/package/execserver/conf/azkaban.properties
@@ -8,8 +8,6 @@ azkaban.jobtype.plugin.dir=plugins/jobtypes
 executor.global.properties=conf/global.properties
 azkaban.project.dir=projects
 
-azkaban.stats.dir=stats
-
 database.type=mysql
 mysql.port=3306
 mysql.host=localhost
diff --git a/src/package/soloserver/conf/azkaban.properties b/src/package/soloserver/conf/azkaban.properties
index 9ca591e..7524a14 100644
--- a/src/package/soloserver/conf/azkaban.properties
+++ b/src/package/soloserver/conf/azkaban.properties
@@ -20,9 +20,6 @@ database.type=h2
 h2.path=data/azkaban
 h2.create.tables=true
 
-# Stats
-azkaban.stats.dir=stats
-
 # Velocity dev mode
 velocity.dev.mode=false
 
diff --git a/src/package/webserver/conf/azkaban.properties b/src/package/webserver/conf/azkaban.properties
index 97e40ae..3ccb2f3 100644
--- a/src/package/webserver/conf/azkaban.properties
+++ b/src/package/webserver/conf/azkaban.properties
@@ -22,8 +22,6 @@ mysql.user=azkaban
 mysql.password=azkaban
 mysql.numconnections=100
 
-azkaban.stats.dir=stats
-
 # Velocity dev mode
 velocity.dev.mode=false