azkaban-aplcache
Changes
.classpath 2(+2 -0)
conf/azkaban.properties 2(+1 -1)
lib/commons-logging-1.1.1.jar 0(+0 -0)
lib/httpcore-4.2.1.jar 0(+0 -0)
src/java/azkaban/executor/ExecutorManager.java 112(+79 -33)
src/java/azkaban/executor/FlowRunner.java 16(+16 -0)
src/java/event/Event.java 6(+6 -0)
src/java/event/EventHandler.java 20(+20 -0)
src/java/event/EventListener.java 5(+5 -0)
Details
.classpath 2(+2 -0)
diff --git a/.classpath b/.classpath
index df1b923..fc93185 100644
--- a/.classpath
+++ b/.classpath
@@ -21,5 +21,7 @@
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="lib" path="lib/httpclient-4.2.1.jar"/>
<classpathentry kind="lib" path="lib/commons-io-2.4.jar"/>
+ <classpathentry kind="lib" path="lib/httpcore-4.2.1.jar"/>
+ <classpathentry kind="lib" path="lib/commons-logging-1.1.1.jar"/>
<classpathentry kind="output" path="dist"/>
</classpath>
conf/azkaban.properties 2(+1 -1)
diff --git a/conf/azkaban.properties b/conf/azkaban.properties
index b4b4881..ee78616 100644
--- a/conf/azkaban.properties
+++ b/conf/azkaban.properties
@@ -36,5 +36,5 @@ jetty.trustpassword=password
executor.maxThreads=50
executor.port=12321
executor.shared.token=abcdefg
-
+executor.flow.threads=30
lib/commons-logging-1.1.1.jar 0(+0 -0)
diff --git a/lib/commons-logging-1.1.1.jar b/lib/commons-logging-1.1.1.jar
new file mode 100644
index 0000000..1deef14
Binary files /dev/null and b/lib/commons-logging-1.1.1.jar differ
lib/httpcore-4.2.1.jar 0(+0 -0)
diff --git a/lib/httpcore-4.2.1.jar b/lib/httpcore-4.2.1.jar
new file mode 100644
index 0000000..16d75e1
Binary files /dev/null and b/lib/httpcore-4.2.1.jar differ
src/java/azkaban/executor/ExecutorManager.java 112(+79 -33)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index ed0de8d..a38f5c9 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -5,6 +5,8 @@ import java.io.File;
import java.io.FileFilter;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -15,20 +17,31 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
import azkaban.flow.Flow;
-import azkaban.project.Project;
+import azkaban.utils.ExecutableFlowLoader;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
+import azkaban.webapp.AzkabanExecutorServer;
+/**
+ * Executor manager used to manage the client side job.
+ *
+ */
public class ExecutorManager {
- private static String FLOW_PATH = "flows";
private static Logger logger = Logger.getLogger(ExecutorManager.class);
private File basePath;
private AtomicInteger counter = new AtomicInteger();
private String token;
+ private int portNumber;
+ private String url = "localhost";
private HashMap<String, ExecutableFlow> runningFlows = new HashMap<String, ExecutableFlow>();
@@ -44,6 +57,7 @@ public class ExecutorManager {
}
}
+ portNumber = props.getInt("executor.port", AzkabanExecutorServer.DEFAULT_PORT_NUMBER);
token = props.getString("executor.shared.token", "");
counter.set(0);
loadActiveExecutions();
@@ -68,7 +82,7 @@ public class ExecutorManager {
int index = (executionFiles.length - from - 1);
for (int count = 0; count < maxResults && index >= 0; ++count, --index) {
File exDir = executionFiles[index];
- ExecutableFlow flow = loadExecutableFlowFromDir(exDir);
+ ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(exDir);
if (flow != null) {
executionFlows.add(flow);
@@ -98,7 +112,7 @@ public class ExecutorManager {
int count = 0;
for (int index = executionFiles.length - from - 1; count < maxResults && index>=0; --index ) {
File exDir = executionFiles[index];
- ExecutableFlow flow = loadExecutableFlowFromDir(exDir);
+ ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(exDir);
if (flow != null) {
results.add(flow);
@@ -111,33 +125,33 @@ public class ExecutorManager {
return executionFiles.length;
}
-
- private ExecutableFlow loadExecutableFlowFromDir(File exDir) {
- logger.info("Loading execution " + exDir.getName());
- String exFlowName = exDir.getName();
-
- String flowFileName = "_" + exFlowName + ".flow";
- File[] exFlowFiles = exDir.listFiles(new PrefixFilter(flowFileName));
- Arrays.sort(exFlowFiles);
-
- if (exFlowFiles.length <= 0) {
- logger.error("Execution flow " + exFlowName + " missing flow file.");
- return null;
- }
- File lastExFlow = exFlowFiles[exFlowFiles.length-1];
-
- Object exFlowObj = null;
- try {
- exFlowObj = JSONUtils.parseJSONFromFile(lastExFlow);
- } catch (IOException e) {
- logger.error("Error loading execution flow " + exFlowName + ". Problems parsing json file.");
- return null;
- }
-
- ExecutableFlow flow = ExecutableFlow.createExecutableFlowFromObject(exFlowObj);
- return flow;
- }
-
+//
+// private ExecutableFlow loadExecutableFlowFromDir(File exDir) {
+// logger.info("Loading execution " + exDir.getName());
+// String exFlowName = exDir.getName();
+//
+// String flowFileName = "_" + exFlowName + ".flow";
+// File[] exFlowFiles = exDir.listFiles(new PrefixFilter(flowFileName));
+// Arrays.sort(exFlowFiles);
+//
+// if (exFlowFiles.length <= 0) {
+// logger.error("Execution flow " + exFlowName + " missing flow file.");
+// return null;
+// }
+// File lastExFlow = exFlowFiles[exFlowFiles.length-1];
+//
+// Object exFlowObj = null;
+// try {
+// exFlowObj = JSONUtils.parseJSONFromFile(lastExFlow);
+// } catch (IOException e) {
+// logger.error("Error loading execution flow " + exFlowName + ". Problems parsing json file.");
+// return null;
+// }
+//
+// ExecutableFlow flow = ExecutableFlow.createExecutableFlowFromObject(exFlowObj);
+// return flow;
+// }
+//
private void loadActiveExecutions() {
File[] executingProjects = basePath.listFiles();
for (File project: executingProjects) {
@@ -148,7 +162,7 @@ public class ExecutorManager {
for (File exflow: activeFlows.listFiles()) {
logger.info("Loading execution " + exflow.getName());
- ExecutableFlow flow = loadExecutableFlowFromDir(exflow);
+ ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(exflow);
if (flow != null) {
logger.info("Adding active execution flow " + flow.getExecutionId());
@@ -201,9 +215,41 @@ public class ExecutorManager {
public void executeFlow(ExecutableFlow flow) throws ExecutorManagerException {
String executionPath = flow.getExecutionPath();
File executionDir = new File(executionPath);
-
+ flow.setSubmitTime(System.currentTimeMillis());
+
File resourceFile = writeResourceFile(executionDir, flow);
File executableFlowFile = writeExecutableFlowFile(executionDir, flow);
+ logger.info("Setting up " + flow.getExecutionId() + " for execution.");
+
+ URIBuilder builder = new URIBuilder();
+ builder.setScheme("http")
+ .setHost(url)
+ .setPort(portNumber)
+ .setPath("/submit")
+ .setParameter("sharedToken", token)
+ .setParameter("execid", flow.getExecutionId())
+ .setParameter("execpath", flow.getExecutionPath());
+
+ URI uri = null;
+ try {
+ uri = builder.build();
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ flow.setStatus(ExecutableFlow.Status.FAILED);
+ return;
+ }
+
+ logger.info("Submitting flow " + flow.getExecutionId() + " for execution.");
+ HttpClient httpclient = new DefaultHttpClient();
+ HttpGet httpget = new HttpGet(uri);
+ HttpResponse response = null;
+ try {
+ response = httpclient.execute(httpget);
+ } catch (IOException e) {
+ flow.setStatus(ExecutableFlow.Status.FAILED);
+ e.printStackTrace();
+ return;
+ }
}
public void cleanupAll(ExecutableFlow exflow) throws ExecutorManagerException{
src/java/azkaban/executor/FlowRunner.java 16(+16 -0)
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
new file mode 100644
index 0000000..f63adc5
--- /dev/null
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -0,0 +1,16 @@
+package azkaban.executor;
+
+public class FlowRunner implements Runnable {
+ private ExecutableFlow flow;
+ private FlowRunnerManager manager;
+
+ public FlowRunner(ExecutableFlow flow) {
+ this.flow = flow;
+ this.manager = manager;
+ }
+
+ @Override
+ public void run() {
+
+ }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
new file mode 100644
index 0000000..f8731d6
--- /dev/null
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -0,0 +1,73 @@
+package azkaban.executor;
+
+import java.io.File;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.ExecutableFlowLoader;
+import azkaban.utils.Props;
+
+/**
+ * Execution manager for the server side execution client.
+ *
+ */
+public class FlowRunnerManager {
+ private static Logger logger = Logger.getLogger(FlowRunnerManager.class);
+ private File basePath;
+
+ private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
+ private ConcurrentHashMap<String, FlowRunner> runningFlows = new ConcurrentHashMap<String, FlowRunner>();
+ private LinkedBlockingQueue<FlowRunner> queue = new LinkedBlockingQueue<FlowRunner>();
+ private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
+
+ private ExecutorService executorService;
+ private SubmitterThread submitterThread;
+
+ public FlowRunnerManager(Props props) {
+ basePath = new File(props.getString("execution.directory"));
+ numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
+ executorService = Executors.newFixedThreadPool(numThreads);
+
+ submitterThread = new SubmitterThread(queue);
+ submitterThread.start();
+ }
+
+ public void submitFlow(String id, String path) throws ExecutorManagerException {
+ // Load file and submit
+ File dir = new File(path);
+ ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(dir);
+ FlowRunner runner = new FlowRunner(flow);
+ }
+
+ //
+ private class SubmitterThread extends Thread {
+ private BlockingQueue<FlowRunner> queue;
+ private boolean shutdown = false;
+
+ public SubmitterThread(BlockingQueue<FlowRunner> queue) {
+ this.queue = queue;
+ }
+
+ public void shutdown() {
+ shutdown = true;
+ this.interrupt();
+ }
+
+ public void run() {
+ while(!shutdown) {
+ try {
+ FlowRunner flowRunner = queue.take();
+ executorService.submit(flowRunner);
+ }
+ catch (InterruptedException e) {
+ logger.info("Interrupted. Probably to shut down.");
+ }
+ }
+ }
+ }
+}
diff --git a/src/java/azkaban/utils/ExecutableFlowLoader.java b/src/java/azkaban/utils/ExecutableFlowLoader.java
new file mode 100644
index 0000000..8afd7a6
--- /dev/null
+++ b/src/java/azkaban/utils/ExecutableFlowLoader.java
@@ -0,0 +1,70 @@
+package azkaban.utils;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutableFlow;
+
+public class ExecutableFlowLoader {
+ private static final Logger logger = Logger.getLogger(ExecutableFlowLoader.class.getName());
+
+ public static ExecutableFlow loadExecutableFlowFromDir(File exDir) {
+ String exFlowName = exDir.getName();
+
+ String flowFileName = "_" + exFlowName + ".flow";
+ File[] exFlowFiles = exDir.listFiles(new PrefixFilter(flowFileName));
+ Arrays.sort(exFlowFiles);
+
+ if (exFlowFiles.length <= 0) {
+ logger.error("Execution flow " + exFlowName + " missing flow file.");
+ return null;
+ }
+ File lastExFlow = exFlowFiles[exFlowFiles.length-1];
+
+ Object exFlowObj = null;
+ try {
+ exFlowObj = JSONUtils.parseJSONFromFile(lastExFlow);
+ } catch (IOException e) {
+ logger.error("Error loading execution flow " + exFlowName + ". Problems parsing json file.");
+ return null;
+ }
+
+ ExecutableFlow flow = ExecutableFlow.createExecutableFlowFromObject(exFlowObj);
+ return flow;
+ }
+
+ private static class PrefixFilter implements FileFilter {
+ private String prefix;
+
+ public PrefixFilter(String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public boolean accept(File pathname) {
+ String name = pathname.getName();
+
+ return pathname.isFile() && !pathname.isHidden() && name.length() >= prefix.length() && name.startsWith(prefix);
+ }
+ }
+
+
+ private static class SuffixFilter implements FileFilter {
+ private String suffix;
+ private boolean filesOnly = false;
+
+ public SuffixFilter(String suffix, boolean filesOnly) {
+ this.suffix = suffix;
+ }
+
+ @Override
+ public boolean accept(File pathname) {
+ String name = pathname.getName();
+ return (pathname.isFile() || !filesOnly) && !pathname.isHidden() && name.length() >= suffix.length() && name.endsWith(suffix);
+ }
+ }
+}
src/java/event/Event.java 6(+6 -0)
diff --git a/src/java/event/Event.java b/src/java/event/Event.java
new file mode 100644
index 0000000..4f100d0
--- /dev/null
+++ b/src/java/event/Event.java
@@ -0,0 +1,6 @@
+package event;
+
+
+public class Event {
+
+}
src/java/event/EventHandler.java 20(+20 -0)
diff --git a/src/java/event/EventHandler.java b/src/java/event/EventHandler.java
new file mode 100644
index 0000000..8125e03
--- /dev/null
+++ b/src/java/event/EventHandler.java
@@ -0,0 +1,20 @@
+package event;
+
+import java.util.HashSet;
+
+public class EventHandler {
+ private HashSet<EventListener> listeners = new HashSet<EventListener>();
+
+ public EventHandler() {
+ }
+
+ public void addListener(EventListener listener) {
+ listeners.add(listener);
+ }
+
+ public void fireEventListeners(Event event) {
+ for (EventListener listener: listeners) {
+ listener.handleEvent(event);
+ }
+ }
+}
src/java/event/EventListener.java 5(+5 -0)
diff --git a/src/java/event/EventListener.java b/src/java/event/EventListener.java
new file mode 100644
index 0000000..22dbded
--- /dev/null
+++ b/src/java/event/EventListener.java
@@ -0,0 +1,5 @@
+package event;
+
+public interface EventListener {
+ public void handleEvent(Event event);
+}