azkaban-aplcache

Details

.classpath 2(+2 -0)

diff --git a/.classpath b/.classpath
index df1b923..fc93185 100644
--- a/.classpath
+++ b/.classpath
@@ -21,5 +21,7 @@
 	<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
 	<classpathentry kind="lib" path="lib/httpclient-4.2.1.jar"/>
 	<classpathentry kind="lib" path="lib/commons-io-2.4.jar"/>
+	<classpathentry kind="lib" path="lib/httpcore-4.2.1.jar"/>
+	<classpathentry kind="lib" path="lib/commons-logging-1.1.1.jar"/>
 	<classpathentry kind="output" path="dist"/>
 </classpath>
diff --git a/conf/azkaban.properties b/conf/azkaban.properties
index b4b4881..ee78616 100644
--- a/conf/azkaban.properties
+++ b/conf/azkaban.properties
@@ -36,5 +36,5 @@ jetty.trustpassword=password
 executor.maxThreads=50
 executor.port=12321
 executor.shared.token=abcdefg
-
+executor.flow.threads=30
 
diff --git a/lib/commons-logging-1.1.1.jar b/lib/commons-logging-1.1.1.jar
new file mode 100644
index 0000000..1deef14
Binary files /dev/null and b/lib/commons-logging-1.1.1.jar differ
diff --git a/lib/httpcore-4.2.1.jar b/lib/httpcore-4.2.1.jar
new file mode 100644
index 0000000..16d75e1
Binary files /dev/null and b/lib/httpcore-4.2.1.jar differ
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index ed0de8d..a38f5c9 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -5,6 +5,8 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -15,20 +17,31 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.log4j.Logger;
 
 import azkaban.flow.Flow;
-import azkaban.project.Project;
+import azkaban.utils.ExecutableFlowLoader;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
+import azkaban.webapp.AzkabanExecutorServer;
 
+/**
+ * Executor manager used to manage the client side job.
+ *
+ */
 public class ExecutorManager {
-	private static String FLOW_PATH = "flows";
 	private static Logger logger = Logger.getLogger(ExecutorManager.class);
 	private File basePath;
 
 	private AtomicInteger counter = new AtomicInteger();
 	private String token;
+	private int portNumber;
+	private String url = "localhost";
 	
 	private HashMap<String, ExecutableFlow> runningFlows = new HashMap<String, ExecutableFlow>();
 	
@@ -44,6 +57,7 @@ public class ExecutorManager {
 			}
 		}
 		
+		portNumber = props.getInt("executor.port", AzkabanExecutorServer.DEFAULT_PORT_NUMBER);
 		token = props.getString("executor.shared.token", "");
 		counter.set(0);
 		loadActiveExecutions();
@@ -68,7 +82,7 @@ public class ExecutorManager {
 		int index = (executionFiles.length - from - 1);
 		for (int count = 0; count < maxResults && index >= 0; ++count, --index) {
 			File exDir = executionFiles[index];
-			ExecutableFlow flow = loadExecutableFlowFromDir(exDir);
+			ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(exDir);
 			
 			if (flow != null) {
 				executionFlows.add(flow);
@@ -98,7 +112,7 @@ public class ExecutorManager {
 		int count = 0;
 		for (int index = executionFiles.length - from - 1; count < maxResults && index>=0; --index ) {
 			File exDir = executionFiles[index];
-			ExecutableFlow flow = loadExecutableFlowFromDir(exDir);
+			ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(exDir);
 			
 			if (flow != null) {
 				results.add(flow);
@@ -111,33 +125,33 @@ public class ExecutorManager {
 		
 		return executionFiles.length;
 	}
-	
-	private ExecutableFlow loadExecutableFlowFromDir(File exDir) {
-		logger.info("Loading execution " + exDir.getName());
-		String exFlowName = exDir.getName();
-		
-		String flowFileName = "_" + exFlowName + ".flow";
-		File[] exFlowFiles = exDir.listFiles(new PrefixFilter(flowFileName));
-		Arrays.sort(exFlowFiles);
-		
-		if (exFlowFiles.length <= 0) {
-			logger.error("Execution flow " + exFlowName + " missing flow file.");
-			return null;
-		}
-		File lastExFlow = exFlowFiles[exFlowFiles.length-1];
-		
-		Object exFlowObj = null;
-		try {
-			exFlowObj = JSONUtils.parseJSONFromFile(lastExFlow);
-		} catch (IOException e) {
-			logger.error("Error loading execution flow " + exFlowName + ". Problems parsing json file.");
-			return null;
-		}
-		
-		ExecutableFlow flow = ExecutableFlow.createExecutableFlowFromObject(exFlowObj);
-		return flow;
-	}
-	
+//	
+//	private ExecutableFlow loadExecutableFlowFromDir(File exDir) {
+//		logger.info("Loading execution " + exDir.getName());
+//		String exFlowName = exDir.getName();
+//		
+//		String flowFileName = "_" + exFlowName + ".flow";
+//		File[] exFlowFiles = exDir.listFiles(new PrefixFilter(flowFileName));
+//		Arrays.sort(exFlowFiles);
+//		
+//		if (exFlowFiles.length <= 0) {
+//			logger.error("Execution flow " + exFlowName + " missing flow file.");
+//			return null;
+//		}
+//		File lastExFlow = exFlowFiles[exFlowFiles.length-1];
+//		
+//		Object exFlowObj = null;
+//		try {
+//			exFlowObj = JSONUtils.parseJSONFromFile(lastExFlow);
+//		} catch (IOException e) {
+//			logger.error("Error loading execution flow " + exFlowName + ". Problems parsing json file.");
+//			return null;
+//		}
+//		
+//		ExecutableFlow flow = ExecutableFlow.createExecutableFlowFromObject(exFlowObj);
+//		return flow;
+//	}
+//	
 	private void loadActiveExecutions() {
 		File[] executingProjects = basePath.listFiles();
 		for (File project: executingProjects) {
@@ -148,7 +162,7 @@ public class ExecutorManager {
 			
 			for (File exflow: activeFlows.listFiles()) {
 				logger.info("Loading execution " + exflow.getName());
-				ExecutableFlow flow = loadExecutableFlowFromDir(exflow);
+				ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(exflow);
 				
 				if (flow != null) {
 					logger.info("Adding active execution flow " + flow.getExecutionId());
@@ -201,9 +215,41 @@ public class ExecutorManager {
 	public void executeFlow(ExecutableFlow flow) throws ExecutorManagerException {
 		String executionPath = flow.getExecutionPath();
 		File executionDir = new File(executionPath);
-
+		flow.setSubmitTime(System.currentTimeMillis());
+		
 		File resourceFile = writeResourceFile(executionDir, flow);
 		File executableFlowFile = writeExecutableFlowFile(executionDir, flow);
+		logger.info("Setting up " + flow.getExecutionId() + " for execution.");
+		
+		URIBuilder builder = new URIBuilder();
+		builder.setScheme("http")
+			.setHost(url)
+			.setPort(portNumber)
+			.setPath("/submit")
+			.setParameter("sharedToken", token)
+			.setParameter("execid", flow.getExecutionId())
+			.setParameter("execpath", flow.getExecutionPath());
+		
+		URI uri = null;
+		try {
+			uri = builder.build();
+		} catch (URISyntaxException e) {
+			e.printStackTrace();
+			flow.setStatus(ExecutableFlow.Status.FAILED);
+			return;
+		}
+
+		logger.info("Submitting flow " + flow.getExecutionId() + " for execution.");
+		HttpClient httpclient = new DefaultHttpClient();
+		HttpGet httpget = new HttpGet(uri);
+		HttpResponse response = null;
+		try {
+			response = httpclient.execute(httpget);
+		} catch (IOException e) {
+			flow.setStatus(ExecutableFlow.Status.FAILED);
+			e.printStackTrace();
+			return;
+		}
 	}
 	
 	public void cleanupAll(ExecutableFlow exflow) throws ExecutorManagerException{
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
new file mode 100644
index 0000000..f63adc5
--- /dev/null
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -0,0 +1,16 @@
+package azkaban.executor;
+
+public class FlowRunner implements Runnable {
+	private ExecutableFlow flow;
+	private FlowRunnerManager manager;
+	
+	public FlowRunner(ExecutableFlow flow) {
+		this.flow = flow;
+		this.manager = manager;
+	}
+	
+	@Override
+	public void run() {
+		
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
new file mode 100644
index 0000000..f8731d6
--- /dev/null
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -0,0 +1,73 @@
+package azkaban.executor;
+
+import java.io.File;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.ExecutableFlowLoader;
+import azkaban.utils.Props;
+
+/**
+ * Execution manager for the server side execution client.
+ *
+ */
+public class FlowRunnerManager {
+	private static Logger logger = Logger.getLogger(FlowRunnerManager.class);
+	private File basePath;
+	
+	private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
+	private ConcurrentHashMap<String, FlowRunner> runningFlows = new ConcurrentHashMap<String, FlowRunner>();
+	private LinkedBlockingQueue<FlowRunner> queue = new LinkedBlockingQueue<FlowRunner>();
+	private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
+
+	private ExecutorService executorService;
+	private SubmitterThread submitterThread;
+	
+	public FlowRunnerManager(Props props) {
+		basePath = new File(props.getString("execution.directory"));
+		numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
+		executorService = Executors.newFixedThreadPool(numThreads);
+		
+		submitterThread = new SubmitterThread(queue);
+		submitterThread.start();
+	}
+	
+	public void submitFlow(String id, String path) throws ExecutorManagerException {
+		// Load file and submit
+		File dir = new File(path);
+		ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(dir);
+		FlowRunner runner = new FlowRunner(flow);
+	}
+	
+	//
+	private class SubmitterThread extends Thread {
+		private BlockingQueue<FlowRunner> queue;
+		private boolean shutdown = false;
+		
+		public SubmitterThread(BlockingQueue<FlowRunner> queue) {
+			this.queue = queue;
+		}
+		
+		public void shutdown() {
+			shutdown = true;
+			this.interrupt();
+		}
+		
+		public void run() {
+			while(!shutdown) {
+				try {
+					FlowRunner flowRunner = queue.take();
+					executorService.submit(flowRunner);
+				} 
+				catch (InterruptedException e) {
+					logger.info("Interrupted. Probably to shut down.");
+				}
+			}
+		}
+	}
+}
diff --git a/src/java/azkaban/utils/ExecutableFlowLoader.java b/src/java/azkaban/utils/ExecutableFlowLoader.java
new file mode 100644
index 0000000..8afd7a6
--- /dev/null
+++ b/src/java/azkaban/utils/ExecutableFlowLoader.java
@@ -0,0 +1,70 @@
+package azkaban.utils;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutableFlow;
+
+public class ExecutableFlowLoader {
+	private static final Logger logger = Logger.getLogger(ExecutableFlowLoader.class.getName());
+	
+	public static ExecutableFlow loadExecutableFlowFromDir(File exDir) {
+		String exFlowName = exDir.getName();
+		
+		String flowFileName = "_" + exFlowName + ".flow";
+		File[] exFlowFiles = exDir.listFiles(new PrefixFilter(flowFileName));
+		Arrays.sort(exFlowFiles);
+		
+		if (exFlowFiles.length <= 0) {
+			logger.error("Execution flow " + exFlowName + " missing flow file.");
+			return null;
+		}
+		File lastExFlow = exFlowFiles[exFlowFiles.length-1];
+		
+		Object exFlowObj = null;
+		try {
+			exFlowObj = JSONUtils.parseJSONFromFile(lastExFlow);
+		} catch (IOException e) {
+			logger.error("Error loading execution flow " + exFlowName + ". Problems parsing json file.");
+			return null;
+		}
+		
+		ExecutableFlow flow = ExecutableFlow.createExecutableFlowFromObject(exFlowObj);
+		return flow;
+	}
+	
+	private static class PrefixFilter implements FileFilter {
+		private String prefix;
+
+		public PrefixFilter(String prefix) {
+			this.prefix = prefix;
+		}
+
+		@Override
+		public boolean accept(File pathname) {
+			String name = pathname.getName();
+
+			return pathname.isFile() && !pathname.isHidden() && name.length() >= prefix.length() && name.startsWith(prefix);
+		}
+	}
+	
+	
+	private static class SuffixFilter implements FileFilter {
+		private String suffix;
+		private boolean filesOnly = false;
+
+		public SuffixFilter(String suffix, boolean filesOnly) {
+			this.suffix = suffix;
+		}
+
+		@Override
+		public boolean accept(File pathname) {
+			String name = pathname.getName();
+			return (pathname.isFile() || !filesOnly) && !pathname.isHidden() && name.length() >= suffix.length() && name.endsWith(suffix);
+		}
+	}
+}
diff --git a/src/java/event/Event.java b/src/java/event/Event.java
new file mode 100644
index 0000000..4f100d0
--- /dev/null
+++ b/src/java/event/Event.java
@@ -0,0 +1,6 @@
+package event;
+
+
+public class Event {
+
+}
diff --git a/src/java/event/EventHandler.java b/src/java/event/EventHandler.java
new file mode 100644
index 0000000..8125e03
--- /dev/null
+++ b/src/java/event/EventHandler.java
@@ -0,0 +1,20 @@
+package event;
+
+import java.util.HashSet;
+
+public class EventHandler {
+	private HashSet<EventListener> listeners = new HashSet<EventListener>();
+	
+	public EventHandler() {
+	}
+
+	public void addListener(EventListener listener) {
+		listeners.add(listener);
+	}
+	
+	public void fireEventListeners(Event event) {
+		for (EventListener listener: listeners) {
+			listener.handleEvent(event);
+		}
+	}
+}
diff --git a/src/java/event/EventListener.java b/src/java/event/EventListener.java
new file mode 100644
index 0000000..22dbded
--- /dev/null
+++ b/src/java/event/EventListener.java
@@ -0,0 +1,5 @@
+package event;
+
+public interface EventListener {
+	public void handleEvent(Event event);
+}