azkaban-aplcache
Changes
.classpath 13(+4 -9)
build.properties 2(+2 -0)
build.xml 52(+52 -0)
conf/azkaban.properties 1(+0 -1)
lib/guava-13.0.1.jar 0(+0 -0)
lib/hadoop-core-1.0.2-p1.jar 0(+0 -0)
lib/pig-0.9.1-withouthadoop.jar 0(+0 -0)
src/java/azkaban/executor/JobRunner.java 20(+17 -3)
src/java/azkaban/utils/JSONUtils.java 176(+9 -167)
Details
.classpath 13(+4 -9)
diff --git a/.classpath b/.classpath
index ef4e777..d82bc10 100644
--- a/.classpath
+++ b/.classpath
@@ -23,13 +23,8 @@
<classpathentry kind="lib" path="lib/commons-io-2.4.jar"/>
<classpathentry kind="lib" path="lib/httpcore-4.2.1.jar"/>
<classpathentry kind="lib" path="lib/commons-logging-1.1.1.jar"/>
- <classpathentry kind="lib" path="/azkaban/lib/cglib-nodep-2.2.jar"/>
- <classpathentry kind="lib" path="/azkaban/lib/easymock-2.5.2.jar"/>
- <classpathentry kind="lib" path="/azkaban/lib/easymockclassextension-2.5.2.jar"/>
- <classpathentry kind="lib" path="/azkaban/lib/google-collect-1.0-rc2.jar"/>
- <classpathentry kind="lib" path="/azkaban/lib/hadoop-core-1.0.2-p1.jar"/>
- <classpathentry kind="lib" path="/azkaban/lib/objenesis-1.2.jar"/>
- <classpathentry kind="lib" path="/azkaban/lib/org-json-2010-02-26.jar"/>
- <classpathentry kind="lib" path="/azkaban/lib/pig-0.9.1-withouthadoop.jar"/>
- <classpathentry kind="output" path="dist"/>
+ <classpathentry kind="lib" path="lib/hadoop-core-1.0.2-p1.jar"/>
+ <classpathentry kind="lib" path="lib/guava-13.0.1.jar"/>
+ <classpathentry kind="lib" path="lib/pig-0.9.1-withouthadoop.jar"/>
+ <classpathentry kind="output" path="dist/classes"/>
</classpath>
build.properties 2(+2 -0)
diff --git a/build.properties b/build.properties
new file mode 100644
index 0000000..6bb299a
--- /dev/null
+++ b/build.properties
@@ -0,0 +1,2 @@
+name=azkaban
+version=2.0
\ No newline at end of file
build.xml 52(+52 -0)
diff --git a/build.xml b/build.xml
new file mode 100644
index 0000000..df62df6
--- /dev/null
+++ b/build.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0"?>
+
+<project name="azkaban" basedir="." default="all">
+ <property name="base.dir" value="${basedir}" />
+ <property name="dist.jar.dir" value="${basedir}/dist/jars" />
+ <property name="dist.classes.dir" value="${basedir}/dist/classes" />
+ <property name="lib.dir" value="${basedir}/lib" />
+ <property name="java.src.dir" value="${basedir}/src/java" />
+ <property file="build.properties" />
+
+ <property environment="env" />
+
+ <path id="main.classpath">
+ <fileset dir="${lib.dir}">
+ <include name="*.jar" />
+ </fileset>
+
+ <pathelement path="${dist.classes.dir}" />
+ </path>
+
+ <!-- set the build number based on environment variable, otherwise blank -->
+ <property environment="env" description="System environment variables (including those set by Hudson)" />
+
+ <target name="all" depends="clean, jars" description="Builds all jars" />
+
+ <target name="clean" description="Delete generated files.">
+ <echo message="Deleting generated files in dist" />
+ <delete dir="${dist.jar.dir}" />
+ <delete dir="${dist.classes.dir}" />
+ </target>
+
+ <target name="build" description="Compile main source tree java files">
+ <delete dir="${dist.classes.dir}" />
+ <mkdir dir="${dist.classes.dir}" />
+
+ <javac fork="true" destdir="${dist.classes.dir}"
+ target="1.6" debug="true" deprecation="false" failonerror="true">
+ <src path="${java.src.dir}" />
+ <classpath refid="main.classpath" />
+ </javac>
+ </target>
+
+ <target name="jars" depends="build" description="Create azkaban jar">
+ <mkdir dir="${dist.jar.dir}" />
+ <jar destfile="${dist.jar.dir}/${name}-${version}.jar">
+ <fileset dir="${dist.classes.dir}">
+ <include name="**/*.*" />
+ </fileset>
+ </jar>
+ </target>
+
+</project>
conf/azkaban.properties 1(+0 -1)
diff --git a/conf/azkaban.properties b/conf/azkaban.properties
index 40d3d0b..683b4c6 100644
--- a/conf/azkaban.properties
+++ b/conf/azkaban.properties
@@ -17,7 +17,6 @@ project.global.properties=conf/global.properties
#Execution directory
execution.directory=execution
-execution.use.symlink=true
#Schedule files
schedule.directory=schedule
lib/guava-13.0.1.jar 0(+0 -0)
diff --git a/lib/guava-13.0.1.jar b/lib/guava-13.0.1.jar
new file mode 100644
index 0000000..09c5449
Binary files /dev/null and b/lib/guava-13.0.1.jar differ
lib/hadoop-core-1.0.2-p1.jar 0(+0 -0)
diff --git a/lib/hadoop-core-1.0.2-p1.jar b/lib/hadoop-core-1.0.2-p1.jar
new file mode 100644
index 0000000..861c03c
Binary files /dev/null and b/lib/hadoop-core-1.0.2-p1.jar differ
lib/pig-0.9.1-withouthadoop.jar 0(+0 -0)
diff --git a/lib/pig-0.9.1-withouthadoop.jar b/lib/pig-0.9.1-withouthadoop.jar
new file mode 100644
index 0000000..dd2fcc2
Binary files /dev/null and b/lib/pig-0.9.1-withouthadoop.jar differ
src/java/azkaban/executor/JobRunner.java 20(+17 -3)
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 3a5c99c..11acd34 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -89,7 +89,19 @@ public class JobRunner extends EventHandler implements Runnable {
logger.info("Starting job " + node.getId() + " at " + node.getStartTime());
node.setStatus(Status.RUNNING);
this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
+
+ boolean succeeded = true;
+ synchronized(this) {
+ try {
+ wait(5000);
+ }
+ catch (InterruptedException e) {
+ logger.info("Job cancelled.");
+ succeeded = false;
+ }
+ }
+/*
// Run Job
boolean succeeded = true;
@@ -104,11 +116,13 @@ public class JobRunner extends EventHandler implements Runnable {
//logger.error("job run failed!");
e.printStackTrace();
}
-
+ */
node.setEndTime(System.currentTimeMillis());
if (succeeded) {
node.setStatus(Status.SUCCEEDED);
- outputProps = job.getJobGeneratedProperties();
+ if (job != null) {
+ outputProps = job.getJobGeneratedProperties();
+ }
this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
} else {
node.setStatus(Status.FAILED);
@@ -121,7 +135,7 @@ public class JobRunner extends EventHandler implements Runnable {
public synchronized void cancel() {
// Cancel code here
if(job == null) {
- logger.error("Job doesn't exist!");
+ logger.error("Job doesn't exisit!");
return;
}
diff --git a/src/java/azkaban/jobExecutor/AbstractProcessJob.java b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
index dd860a1..26ff388 100644
--- a/src/java/azkaban/jobExecutor/AbstractProcessJob.java
+++ b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
@@ -26,10 +26,9 @@ import java.util.Map;
import org.apache.commons.fileupload.util.Streams;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
-import org.json.JSONObject;
+import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
-import azkaban.jobExecutor.utils.JSONToJava;
import azkaban.jobExecutor.utils.PropsUtils;
/*
@@ -51,8 +50,6 @@ public abstract class AbstractProcessJob extends AbstractJob {
public static final String JOB_NAME_ENV = "JOB_NAME";
public static final String JOB_OUTPUT_PROP_FILE = "JOB_OUTPUT_PROP_FILE";
- private static final JSONToJava jsonToJava = new JSONToJava();
-
protected final String _jobPath;
// protected final Props props;
@@ -138,8 +135,7 @@ public abstract class AbstractProcessJob extends AbstractJob {
final String content = Streams.asString(reader).trim();
if (!content.isEmpty()) {
- Map<String, Object> propMap = jsonToJava.apply(new JSONObject(
- content));
+ Map<String, Object> propMap = (Map<String, Object>)JSONUtils.parseJSONFromString(content);
for (Map.Entry<String, Object> entry : propMap.entrySet()) {
outputProps
diff --git a/src/java/azkaban/jobExecutor/SecurePigWrapper.java b/src/java/azkaban/jobExecutor/SecurePigWrapper.java
index 3010089..c210b40 100644
--- a/src/java/azkaban/jobExecutor/SecurePigWrapper.java
+++ b/src/java/azkaban/jobExecutor/SecurePigWrapper.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Logger;
-import org.apache.pig.Main;
import java.io.DataOutputStream;
import java.io.File;
@@ -48,7 +47,7 @@ public class SecurePigWrapper {
@Override
public Void run() throws Exception {
prefetchToken();
- Main.main(args);
+ org.apache.pig.Main.main(args);
return null;
}
diff --git a/src/java/azkaban/scheduler/LocalFileScheduleLoader.java b/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
index b61d8a7..ffc6aad 100644
--- a/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
+++ b/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
@@ -150,7 +150,7 @@ public class LocalFileScheduleLoader implements ScheduleLoader {
try {
FileWriter writer = new FileWriter(scheduleFile);
- writer.write(JSONUtils.toJSONString(obj, 4));
+ writer.write(JSONUtils.toJSON(obj, true));
writer.flush();
} catch (Exception e) {
throw new RuntimeException("Error saving flow file", e);
@@ -174,7 +174,7 @@ public class LocalFileScheduleLoader implements ScheduleLoader {
HashMap<String, Object> schedule;
try {
//TODO handle first time empty schedule file
- schedule = (HashMap<String,Object>)JSONUtils.fromJSONStream(reader);
+ schedule = (HashMap<String,Object>)JSONUtils.parseJSONFromReader(reader);
} catch (Exception e) {
//schedule = loadLegacyFile(schedulefile);
logger.error("Error parsing the schedule file", e);
src/java/azkaban/utils/JSONUtils.java 176(+9 -167)
diff --git a/src/java/azkaban/utils/JSONUtils.java b/src/java/azkaban/utils/JSONUtils.java
index cae5958..5d2c199 100644
--- a/src/java/azkaban/utils/JSONUtils.java
+++ b/src/java/azkaban/utils/JSONUtils.java
@@ -1,9 +1,7 @@
package azkaban.utils;
-import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -11,186 +9,21 @@ import java.io.Reader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectWriter;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
public class JSONUtils {
-
/**
* The constructor. Cannot construct this class.
*/
private JSONUtils() {
}
- /**
- * Takes a reader to stream the JSON string. The reader is not wrapped in a BufferReader
- * so it is up to the user to employ such optimizations if so desired.
- *
- * The results will be Maps, Lists and other Java mapping of Json types (i.e. String, Number, Boolean).
- *
- * @param reader
- * @return
- * @throws Exception
- */
- public static Map<String, Object> fromJSONStream(Reader reader) throws Exception {
- JSONObject jsonObj = new JSONObject(new JSONTokener(reader));
- Map<String, Object> results = createObject(jsonObj);
-
- return results;
- }
-
- /**
- * Converts a json string to Objects.
- *
- * The results will be Maps, Lists and other Java mapping of Json types (i.e. String, Number, Boolean).
- *
- * @param str
- * @return
- * @throws Exception
- */
- public static Map<String, Object> fromJSONString(String str) throws Exception {
- JSONObject jsonObj = new JSONObject(str);
- Map<String, Object> results = createObject(jsonObj);
- return results;
- }
-
- /**
- * Recurses through the json object to create a Map/List/Object equivalent.
- *
- * @param obj
- * @return
- */
- @SuppressWarnings("unchecked")
- private static Map<String, Object> createObject(JSONObject obj) {
- LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();
-
- Iterator<String> iterator = obj.keys();
- while(iterator.hasNext()) {
- String key = iterator.next();
- Object value = null;
- try {
- value = obj.get(key);
- } catch (JSONException e) {
- // Since we get the value from the key given by the JSONObject,
- // this exception shouldn't be thrown.
- }
-
- if (value instanceof JSONArray) {
- value = createArray((JSONArray)value);
- }
- else if (value instanceof JSONObject) {
- value = createObject((JSONObject)value);
- }
-
- map.put(key, value);
- }
-
- return map;
- }
-
- /**
- * Recurses through the json object to create a Map/List/Object equivalent.
- *
- * @param obj
- * @return
- */
- private static List<Object> createArray(JSONArray array) {
- ArrayList<Object> list = new ArrayList<Object>();
- for (int i = 0; i < array.length(); ++i) {
- Object value = null;
- try {
- value = array.get(i);
- } catch (JSONException e) {
- // Ugh... JSON's over done exception throwing.
- }
-
- if (value instanceof JSONArray) {
- value = createArray((JSONArray)value);
- }
- else if (value instanceof JSONObject) {
- value = createObject((JSONObject)value);
- }
-
- list.add(value);
- }
-
- return list;
- }
-
- /**
- * Creates a json string from Map/List/Primitive object.
- *
- * @param obj
- * @return
- */
- public static String toJSONString(List<?> list) {
- JSONArray jsonList = new JSONArray(list);
- try {
- return jsonList.toString();
- } catch (Exception e) {
- return "";
- }
- }
-
- /**
- * Creates a json string from Map/List/Primitive object.
- *
- * @param obj
- * @parm indent
- * @return
- */
- public static String toJSONString(List<?> list, int indent) {
- JSONArray jsonList = new JSONArray(list);
- try {
- return jsonList.toString(indent);
- } catch (Exception e) {
- return "";
- }
- }
-
- /**
- * Creates a json string from Map/List/Primitive object.
- *
- * @param obj
- * @return
- */
- public static String toJSONString(Map<String, Object> obj) {
- JSONObject jsonObj = new JSONObject(obj);
- try {
- return jsonObj.toString();
- } catch (Exception e) {
- return "";
- }
- }
-
- /**
- * Creates a json pretty string from Map/List/Primitive object
- *
- * @param obj
- * @param indent
- * @return
- */
- public static String toJSONString(Map<String, Object> obj, int indent) {
- JSONObject jsonObj = new JSONObject(obj);
- try {
- return jsonObj.toString(indent);
- } catch (Exception e) {
- return "";
- }
- }
-
public static String toJSON(Object obj) {
return toJSON(obj, false);
}
@@ -254,6 +87,15 @@ public class JSONUtils {
return toObjectFromJSONNode(node);
}
+
+ public static Object parseJSONFromReader(Reader reader) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonFactory factory = new JsonFactory();
+ JsonParser parser = factory.createJsonParser(reader);
+ JsonNode node = mapper.readTree(parser);
+
+ return toObjectFromJSONNode(node);
+ }
private static Object toObjectFromJSONNode(JsonNode node) {
if (node.isObject()) {
diff --git a/unit/java/azkaban/test/jobExecutor/JavaJobTest.java b/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
index 27a7881..d9edc3d 100644
--- a/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
@@ -5,7 +5,6 @@ import java.util.Date;
import java.util.Properties;
import org.apache.log4j.Logger;
-import org.easymock.classextension.EasyMock;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
diff --git a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
index d859e02..861601c 100644
--- a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
@@ -4,7 +4,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.log4j.Logger;
-import org.easymock.classextension.EasyMock;
import azkaban.utils.Props;
import azkaban.jobExecutor.AbstractProcessJob;
diff --git a/unit/java/azkaban/test/jobExecutor/PythonJobTest.java b/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
index 48cfd42..f86ed90 100644
--- a/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
@@ -4,7 +4,6 @@ import java.io.IOException;
import java.util.Date;
import org.apache.log4j.Logger;
-import org.easymock.classextension.EasyMock;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;