azkaban-uncached
Changes
build.xml 42(+41 -1)
src/java/azkaban/alert/Alerter.java 11(+11 -0)
src/java/azkaban/executor/ExecutorManager.java 291(+139 -152)
src/java/azkaban/jobtype/JobTypeManager.java 93(+72 -21)
src/java/azkaban/scheduler/ScheduleManager.java 41(+13 -28)
src/java/azkaban/sla/SlaOption.java 40(+38 -2)
src/java/azkaban/trigger/ActionTypeLoader.java 196(+98 -98)
src/java/azkaban/trigger/builtin/SlaChecker.java 58(+16 -42)
src/java/azkaban/trigger/CheckerTypeLoader.java 196(+98 -98)
src/java/azkaban/trigger/Condition.java 22(+16 -6)
src/java/azkaban/trigger/TriggerManager.java 71(+34 -37)
src/java/azkaban/utils/Emailer.java 16(+10 -6)
src/java/azkaban/utils/StringUtils.java 11(+11 -0)
src/java/azkaban/webapp/AzkabanWebServer.java 151(+57 -94)
Details
build.xml 42(+41 -1)
diff --git a/build.xml b/build.xml
index 86db40b..1031487 100644
--- a/build.xml
+++ b/build.xml
@@ -8,12 +8,14 @@
<property name="dist.packages.dir" value="${basedir}/dist/packages" />
<property name="dist.web.package.dir" value="${dist.packages.dir}/azkaban-web-server" />
<property name="dist.exec.package.dir" value="${dist.packages.dir}/azkaban-exec-server" />
+ <property name="dist.trigger.package.dir" value="${dist.packages.dir}/azkaban-trigger-server" />
<property name="dist.solo.package.dir" value="${dist.packages.dir}/azkaban-solo-server" />
<property name="dist.sql.package.dir" value="${dist.packages.dir}/sql" />
<property name="conf.dir" value="${basedir}/conf" />
<property name="web.package.dir" value="${basedir}/src/package/webserver" />
<property name="exec.package.dir" value="${basedir}/src/package/execserver" />
+ <property name="trigger.package.dir" value="${basedir}/src/package/triggerserver" />
<property name="solo.package.dir" value="${basedir}/src/package/soloserver" />
<property name="lib.dir" value="${basedir}/lib" />
@@ -208,6 +210,44 @@
</tar>
</target>
+ <target name="package-trigger-server" depends="jars" description="Creates a package for the trigger server">
+ <delete dir="${dist.trigger.package.dir}" />
+ <mkdir dir="${dist.trigger.package.dir}" />
+ <mkdir dir="${dist.trigger.package.dir}/conf" />
+ <mkdir dir="${dist.trigger.package.dir}/bin" />
+ <mkdir dir="${dist.trigger.package.dir}/lib" />
+ <mkdir dir="${dist.trigger.package.dir}/plugins" />
+ <mkdir dir="${dist.trigger.package.dir}/extlib" />
+
+ <!-- Copy Azkaban jars and libs-->
+ <copy file="${azkaban.jar}" todir="${dist.trigger.package.dir}/lib" />
+ <copy todir="${dist.trigger.package.dir}/lib" >
+ <fileset dir="${lib.dir}" >
+ <exclude name="hadoop-core*.jar"/>
+ </fileset>
+ </copy>
+
+ <!-- Copy bin files for trigger server only-->
+ <copy todir="${dist.trigger.package.dir}/bin" >
+ <fileset dir="${trigger.package.dir}/bin"/>
+ </copy>
+
+ <!-- Copy conf files -->
+ <copy todir="${dist.trigger.package.dir}/conf" >
+ <fileset dir="${trigger.package.dir}/conf" />
+ </copy>
+
+ <!-- Tarball it -->
+ <tar destfile="${dist.trigger.package.dir}/${name}-trigger-server-${version}.tar.gz" compression="gzip" longfile="gnu">
+ <tarfileset dir="${dist.trigger.package.dir}" prefix="azkaban-${version}" filemode="755" includes="bin/*" />
+
+ <tarfileset dir="${dist.trigger.package.dir}" prefix="azkaban-${version}" includes="**">
+ <exclude name="bin/*"/>
+ </tarfileset>
+ </tar>
+ </target>
+
+
<target name="package-solo-server" depends="jars" description="Creates a package for the solo server">
<delete dir="${dist.solo.package.dir}" />
<mkdir dir="${dist.solo.package.dir}" />
@@ -257,6 +297,6 @@
</tar>
</target>
- <target name="package-all" depends="package-exec-server, package-web-server, package-solo-server, package-sql-scripts" description="Create all packages">
+ <target name="package-all" depends="package-trigger-server, package-exec-server, package-web-server, package-solo-server, package-sql-scripts" description="Create all packages">
</target>
</project>
src/java/azkaban/alert/Alerter.java 11(+11 -0)
diff --git a/src/java/azkaban/alert/Alerter.java b/src/java/azkaban/alert/Alerter.java
new file mode 100644
index 0000000..1ba02a8
--- /dev/null
+++ b/src/java/azkaban/alert/Alerter.java
@@ -0,0 +1,11 @@
+package azkaban.alert;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.sla.SlaOption;
+
+public interface Alerter {
+ void alertOnSuccess(ExecutableFlow exflow) throws Exception;
+ void alertOnError(ExecutableFlow exflow, String ... extraReasons) throws Exception;
+ void alertOnFirstError(ExecutableFlow exflow) throws Exception;
+ void alertOnSla(SlaOption slaOption, String slaMessage) throws Exception;
+}
src/java/azkaban/executor/ExecutorManager.java 291(+139 -152)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 7db8a04..1fa6989 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -16,15 +16,10 @@
package azkaban.executor;
-import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
-import java.lang.reflect.Constructor;
-import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -44,16 +39,14 @@ import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
+import azkaban.alert.Alerter;
import azkaban.project.Project;
import azkaban.scheduler.ScheduleStatisticManager;
-import azkaban.sla.SlaOption;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.FileIOUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
-import azkaban.utils.PropsUtils;
/**
* Executor manager used to manage the client side job.
@@ -80,19 +73,14 @@ public class ExecutorManager implements ExecutorManagerAdapter {
private Map<String, Alerter> alerters;
- public interface Alerter {
- void alertOnSuccess(ExecutableFlow exflow) throws Exception;
- void alertOnError(ExecutableFlow exflow, String ... extraReasons) throws Exception;
- void alertOnFirstError(ExecutableFlow exflow) throws Exception;
- void alertOnSla(SlaOption slaOption, String slaMessage) throws Exception;
- }
-
- public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
+ public ExecutorManager(Props props, ExecutorLoader loader, Map<String, Alerter> alters) throws ExecutorManagerException {
this.executorLoader = loader;
this.loadRunningFlows();
executorHost = props.getString("executor.host", "localhost");
executorPort = props.getInt("executor.port");
+
+ alerters = alters;
executingManager = new ExecutingManagerUpdaterThread();
executingManager.start();
@@ -101,145 +89,144 @@ public class ExecutorManager implements ExecutorManagerAdapter {
cleanerThread = new CleanerThread(executionLogsRetentionMs);
cleanerThread.start();
- alerters = loadAlerters(props);
}
- private Map<String, Alerter> loadAlerters(Props props) {
- Map<String, Alerter> allAlerters = new HashMap<String, Alerter>();
- // load built-in alerters
- ExecutorMailer mailAlerter = new ExecutorMailer(props);
- allAlerters.put("email", mailAlerter);
- // load all plugin alerters
- String pluginDir = props.getString("alerter.plugin.dir", "plugins/alerter");
- allAlerters.putAll(loadPluginAlerters(pluginDir));
- return allAlerters;
- }
-
- private Map<String, Alerter> loadPluginAlerters(String pluginPath) {
- File alerterPluginPath = new File(pluginPath);
- if (!alerterPluginPath.exists()) {
- return Collections.<String, Alerter>emptyMap();
- }
-
- Map<String, Alerter> installedAlerterPlugins = new HashMap<String, Alerter>();
- ClassLoader parentLoader = this.getClass().getClassLoader();
- File[] pluginDirs = alerterPluginPath.listFiles();
- ArrayList<String> jarPaths = new ArrayList<String>();
- for (File pluginDir: pluginDirs) {
- if (!pluginDir.isDirectory()) {
- logger.error("The plugin path " + pluginDir + " is not a directory.");
- continue;
- }
-
- // Load the conf directory
- File propertiesDir = new File(pluginDir, "conf");
- Props pluginProps = null;
- if (propertiesDir.exists() && propertiesDir.isDirectory()) {
- File propertiesFile = new File(propertiesDir, "plugin.properties");
- File propertiesOverrideFile = new File(propertiesDir, "override.properties");
-
- if (propertiesFile.exists()) {
- if (propertiesOverrideFile.exists()) {
- pluginProps = PropsUtils.loadProps(null, propertiesFile, propertiesOverrideFile);
- }
- else {
- pluginProps = PropsUtils.loadProps(null, propertiesFile);
- }
- }
- else {
- logger.error("Plugin conf file " + propertiesFile + " not found.");
- continue;
- }
- }
- else {
- logger.error("Plugin conf path " + propertiesDir + " not found.");
- continue;
- }
-
- String pluginName = pluginProps.getString("alerter.name");
- List<String> extLibClasspath = pluginProps.getStringList("alerter.external.classpaths", (List<String>)null);
-
- String pluginClass = pluginProps.getString("alerter.class");
- if (pluginClass == null) {
- logger.error("Alerter class is not set.");
- }
- else {
- logger.info("Plugin class " + pluginClass);
- }
-
- URLClassLoader urlClassLoader = null;
- File libDir = new File(pluginDir, "lib");
- if (libDir.exists() && libDir.isDirectory()) {
- File[] files = libDir.listFiles();
-
- ArrayList<URL> urls = new ArrayList<URL>();
- for (int i=0; i < files.length; ++i) {
- try {
- URL url = files[i].toURI().toURL();
- urls.add(url);
- } catch (MalformedURLException e) {
- logger.error(e);
- }
- }
- if (extLibClasspath != null) {
- for (String extLib : extLibClasspath) {
- try {
- File file = new File(pluginDir, extLib);
- URL url = file.toURI().toURL();
- urls.add(url);
- } catch (MalformedURLException e) {
- logger.error(e);
- }
- }
- }
-
- urlClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader);
- }
- else {
- logger.error("Library path " + propertiesDir + " not found.");
- continue;
- }
-
- Class<?> alerterClass = null;
- try {
- alerterClass = urlClassLoader.loadClass(pluginClass);
- }
- catch (ClassNotFoundException e) {
- logger.error("Class " + pluginClass + " not found.");
- continue;
- }
+// private Map<String, Alerter> loadAlerters(Props props) {
+// Map<String, Alerter> allAlerters = new HashMap<String, Alerter>();
+// // load built-in alerters
+// Emailer mailAlerter = new Emailer(props);
+// allAlerters.put("email", mailAlerter);
+// // load all plugin alerters
+// String pluginDir = props.getString("alerter.plugin.dir", "plugins/alerter");
+// allAlerters.putAll(loadPluginAlerters(pluginDir));
+// return allAlerters;
+// }
- String source = FileIOUtils.getSourcePathFromClass(alerterClass);
- logger.info("Source jar " + source);
- jarPaths.add("jar:file:" + source);
-
- Constructor<?> constructor = null;
- try {
- constructor = alerterClass.getConstructor(Props.class);
- } catch (NoSuchMethodException e) {
- logger.error("Constructor not found in " + pluginClass);
- continue;
- }
-
- Object obj = null;
- try {
- obj = constructor.newInstance(pluginProps);
- } catch (Exception e) {
- logger.error(e);
- }
-
- if (!(obj instanceof Alerter)) {
- logger.error("The object is not an Alerter");
- continue;
- }
-
- Alerter plugin = (Alerter) obj;
- installedAlerterPlugins.put(pluginName, plugin);
- }
-
- return installedAlerterPlugins;
-
- }
+// private Map<String, Alerter> loadPluginAlerters(String pluginPath) {
+// File alerterPluginPath = new File(pluginPath);
+// if (!alerterPluginPath.exists()) {
+// return Collections.<String, Alerter>emptyMap();
+// }
+//
+// Map<String, Alerter> installedAlerterPlugins = new HashMap<String, Alerter>();
+// ClassLoader parentLoader = this.getClass().getClassLoader();
+// File[] pluginDirs = alerterPluginPath.listFiles();
+// ArrayList<String> jarPaths = new ArrayList<String>();
+// for (File pluginDir: pluginDirs) {
+// if (!pluginDir.isDirectory()) {
+// logger.error("The plugin path " + pluginDir + " is not a directory.");
+// continue;
+// }
+//
+// // Load the conf directory
+// File propertiesDir = new File(pluginDir, "conf");
+// Props pluginProps = null;
+// if (propertiesDir.exists() && propertiesDir.isDirectory()) {
+// File propertiesFile = new File(propertiesDir, "plugin.properties");
+// File propertiesOverrideFile = new File(propertiesDir, "override.properties");
+//
+// if (propertiesFile.exists()) {
+// if (propertiesOverrideFile.exists()) {
+// pluginProps = PropsUtils.loadProps(null, propertiesFile, propertiesOverrideFile);
+// }
+// else {
+// pluginProps = PropsUtils.loadProps(null, propertiesFile);
+// }
+// }
+// else {
+// logger.error("Plugin conf file " + propertiesFile + " not found.");
+// continue;
+// }
+// }
+// else {
+// logger.error("Plugin conf path " + propertiesDir + " not found.");
+// continue;
+// }
+//
+// String pluginName = pluginProps.getString("alerter.name");
+// List<String> extLibClasspath = pluginProps.getStringList("alerter.external.classpaths", (List<String>)null);
+//
+// String pluginClass = pluginProps.getString("alerter.class");
+// if (pluginClass == null) {
+// logger.error("Alerter class is not set.");
+// }
+// else {
+// logger.info("Plugin class " + pluginClass);
+// }
+//
+// URLClassLoader urlClassLoader = null;
+// File libDir = new File(pluginDir, "lib");
+// if (libDir.exists() && libDir.isDirectory()) {
+// File[] files = libDir.listFiles();
+//
+// ArrayList<URL> urls = new ArrayList<URL>();
+// for (int i=0; i < files.length; ++i) {
+// try {
+// URL url = files[i].toURI().toURL();
+// urls.add(url);
+// } catch (MalformedURLException e) {
+// logger.error(e);
+// }
+// }
+// if (extLibClasspath != null) {
+// for (String extLib : extLibClasspath) {
+// try {
+// File file = new File(pluginDir, extLib);
+// URL url = file.toURI().toURL();
+// urls.add(url);
+// } catch (MalformedURLException e) {
+// logger.error(e);
+// }
+// }
+// }
+//
+// urlClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader);
+// }
+// else {
+// logger.error("Library path " + propertiesDir + " not found.");
+// continue;
+// }
+//
+// Class<?> alerterClass = null;
+// try {
+// alerterClass = urlClassLoader.loadClass(pluginClass);
+// }
+// catch (ClassNotFoundException e) {
+// logger.error("Class " + pluginClass + " not found.");
+// continue;
+// }
+//
+// String source = FileIOUtils.getSourcePathFromClass(alerterClass);
+// logger.info("Source jar " + source);
+// jarPaths.add("jar:file:" + source);
+//
+// Constructor<?> constructor = null;
+// try {
+// constructor = alerterClass.getConstructor(Props.class);
+// } catch (NoSuchMethodException e) {
+// logger.error("Constructor not found in " + pluginClass);
+// continue;
+// }
+//
+// Object obj = null;
+// try {
+// obj = constructor.newInstance(pluginProps);
+// } catch (Exception e) {
+// logger.error(e);
+// }
+//
+// if (!(obj instanceof Alerter)) {
+// logger.error("The object is not an Alerter");
+// continue;
+// }
+//
+// Alerter plugin = (Alerter) obj;
+// installedAlerterPlugins.put(pluginName, plugin);
+// }
+//
+// return installedAlerterPlugins;
+//
+// }
// private String getExecutorHost() {
// return executorHost;
diff --git a/src/java/azkaban/executor/ExecutorManagerAdapter.java b/src/java/azkaban/executor/ExecutorManagerAdapter.java
index 3ddab0c..85b55df 100644
--- a/src/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/src/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -2,7 +2,6 @@ package azkaban.executor;
import java.io.IOException;
import java.lang.Thread.State;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
diff --git a/src/java/azkaban/jmx/JmxExecutorManagerAdapter.java b/src/java/azkaban/jmx/JmxExecutorManagerAdapter.java
index 67edbc9..1b63f26 100644
--- a/src/java/azkaban/jmx/JmxExecutorManagerAdapter.java
+++ b/src/java/azkaban/jmx/JmxExecutorManagerAdapter.java
@@ -4,7 +4,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerAdapter;
public class JmxExecutorManagerAdapter implements JmxExecutorManagerAdapterMBean {
diff --git a/src/java/azkaban/jmx/JmxTriggerManager.java b/src/java/azkaban/jmx/JmxTriggerManager.java
index e183558..7537b0b 100644
--- a/src/java/azkaban/jmx/JmxTriggerManager.java
+++ b/src/java/azkaban/jmx/JmxTriggerManager.java
@@ -1,11 +1,7 @@
package azkaban.jmx;
-import java.util.ArrayList;
-import java.util.List;
-
import org.joda.time.DateTime;
-import azkaban.trigger.TriggerManager;
import azkaban.trigger.TriggerManagerAdapter;
import azkaban.trigger.TriggerManagerAdapter.TriggerJMX;
diff --git a/src/java/azkaban/jmx/JmxTriggerManagerMBean.java b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
index ecf9c21..c87fbd0 100644
--- a/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
@@ -1,7 +1,5 @@
package azkaban.jmx;
-import java.util.List;
-
public interface JmxTriggerManagerMBean {
@DisplayName("OPERATION: getLastThreadCheckTime")
src/java/azkaban/jobtype/JobTypeManager.java 93(+72 -21)
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index 1a0b25d..08fcb5e 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -130,19 +130,33 @@ public class JobTypeManager
throw new JobTypeManagerException("Failed to get global jobtype properties" + e.getCause());
}
- for(File dir : jobPluginsDir.listFiles()) {
- if(dir.isDirectory() && dir.canRead()) {
- // get its conf file
- try {
- loadJob(dir, globalConf, globalSysConf);
- }
- catch (Exception e) {
- logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
- throw new JobTypeManagerException(e);
+
+ synchronized (this) {
+ ClassLoader prevCl = Thread.currentThread().getContextClassLoader();
+ try{
+ for(File dir : jobPluginsDir.listFiles()) {
+ if(dir.isDirectory() && dir.canRead()) {
+ // get its conf file
+ try {
+ loadJob(dir, globalConf, globalSysConf);
+ Thread.currentThread().setContextClassLoader(prevCl);
+ }
+ catch (Exception e) {
+ logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
+ throw new JobTypeManagerException(e);
+ }
+ }
}
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw new JobTypeManagerException(e);
+ } catch(Throwable t) {
+ t.printStackTrace();
+ throw new JobTypeManagerException(t);
+ } finally {
+ Thread.currentThread().setContextClassLoader(prevCl);
}
- }
-
+ }
}
public static File findFilefromDir(File dir, String fn){
@@ -213,7 +227,6 @@ public class JobTypeManager
try {
if(confFile != null) {
conf = new Props(commonConf, confFile);
-// conf = PropsUtils.resolveProps(conf);
}
else {
conf = new Props(commonConf);
@@ -236,18 +249,56 @@ public class JobTypeManager
logger.info("Loading jobtype " + jobtypeName );
// sysconf says what jars/confs to load
- //List<String> jobtypeClasspath = sysConf.getStringList("jobtype.classpath", null, ",");
List<URL> resources = new ArrayList<URL>();
- for(File f : dir.listFiles()) {
- try {
+
+ try {
+ //first global classpath
+ logger.info("Adding global resources.");
+ List<String> typeGlobalClassPath = sysConf.getStringList("jobtype.global.classpath", null, ",");
+ if(typeGlobalClassPath != null) {
+ for(String jar : typeGlobalClassPath) {
+ URL cpItem = new File(jar).toURI().toURL();
+ if(!resources.contains(cpItem)) {
+ logger.info("adding to classpath " + cpItem);
+ resources.add(cpItem);
+ }
+ }
+ }
+
+ //type specific classpath
+ logger.info("Adding type resources.");
+ List<String> typeClassPath = sysConf.getStringList("jobtype.classpath", null, ",");
+ if(typeClassPath != null) {
+ for(String jar : typeClassPath) {
+ URL cpItem = new File(jar).toURI().toURL();
+ if(!resources.contains(cpItem)) {
+ logger.info("adding to classpath " + cpItem);
+ resources.add(cpItem);
+ }
+ }
+ }
+ List<String> jobtypeLibDirs = sysConf.getStringList("jobtype.lib.dir", null, ",");
+ if(jobtypeLibDirs != null) {
+ for(String libDir : jobtypeLibDirs) {
+ for(File f : new File(libDir).listFiles()) {
+ if(f.getName().endsWith(".jar")) {
+ resources.add(f.toURI().toURL());
+ logger.info("adding to classpath " + f.toURI().toURL());
+ }
+ }
+ }
+ }
+
+ logger.info("Adding type override resources.");
+ for(File f : dir.listFiles()) {
if(f.getName().endsWith(".jar")) {
- resources.add(f.toURI().toURL());
- logger.info("adding to classpath " + f.toURI().toURL());
+ resources.add(f.toURI().toURL());
+ logger.info("adding to classpath " + f.toURI().toURL());
}
- } catch (MalformedURLException e) {
- // TODO Auto-generated catch block
- throw new JobTypeManagerException(e);
}
+
+ } catch (MalformedURLException e) {
+ throw new JobTypeManagerException(e);
}
// each job type can have a different class loader
@@ -265,7 +316,7 @@ public class JobTypeManager
logger.info("Doing simple testing...");
try {
Props fakeSysProps = new Props(sysConf);
- fakeSysProps.put("type", jobtypeName);
+// fakeSysProps.put("type", jobtypeName);
Props fakeJobProps = new Props(conf);
@SuppressWarnings("unused")
Job job = (Job)Utils.callConstructor(clazz, "dummy", fakeSysProps, fakeJobProps, logger);
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index e4a4ab1..e69ed5e 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -16,15 +16,12 @@ import org.apache.log4j.Logger;
import azkaban.flow.Flow;
import azkaban.project.ProjectLogEvent.EventType;
-import azkaban.trigger.TriggerManager;
import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.user.Permission.Type;
import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.Props;
import azkaban.utils.Utils;
-import azkaban.webapp.AzkabanWebServer;
-import azkaban.webapp.servlet.TriggerPlugin;
public class ProjectManager {
private static final Logger logger = Logger.getLogger(ProjectManager.class);
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index 2f529a6..434edfd 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -367,8 +367,8 @@ public class Schedule{
return null;
}
+ @SuppressWarnings("unchecked")
public void createAndSetScheduleOptions(Object obj) {
- @SuppressWarnings("unchecked")
HashMap<String, Object> schedObj = (HashMap<String, Object>)obj;
if (schedObj.containsKey("executionOptions")) {
ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("executionOptions"));
src/java/azkaban/scheduler/ScheduleManager.java 41(+13 -28)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 878eb28..ed41ce4 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -16,32 +16,18 @@
package azkaban.scheduler;
-import java.lang.Thread.State;
import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorManager;
-import azkaban.executor.ExecutorManagerAdapter;
-import azkaban.executor.ExecutorManagerException;
-import azkaban.flow.Flow;
-import azkaban.project.Project;
-import azkaban.project.ProjectManager;
import azkaban.sla.SlaOption;
import azkaban.trigger.TriggerAgent;
import azkaban.trigger.TriggerStatus;
@@ -65,10 +51,10 @@ public class ScheduleManager implements TriggerAgent {
private Map<Integer, Schedule> scheduleIDMap = new LinkedHashMap<Integer, Schedule>();
private Map<Pair<Integer, String>, Schedule> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Schedule>();
- private final ExecutorManagerAdapter executorManager;
-
- private ProjectManager projectManager = null;
-
+// private final ExecutorManagerAdapter executorManager;
+//
+// private ProjectManager projectManager = null;
+//
// Used for mbeans to query Scheduler status
//<<<<<<< HEAD
//
@@ -84,17 +70,16 @@ public class ScheduleManager implements TriggerAgent {
*
* @param loader
*/
- public ScheduleManager (ExecutorManagerAdapter executorManager,
- ScheduleLoader loader)
+ public ScheduleManager (ScheduleLoader loader)
{
- this.executorManager = executorManager;
+// this.executorManager = executorManager;
this.loader = loader;
}
- public void setProjectManager(ProjectManager projectManager) {
- this.projectManager = projectManager;
- }
+// public void setProjectManager(ProjectManager projectManager) {
+// this.projectManager = projectManager;
+// }
@Override
public void start() throws ScheduleManagerException {
@@ -318,10 +303,10 @@ public class ScheduleManager implements TriggerAgent {
*/
private synchronized void internalSchedule(Schedule s) {
//Schedule existing = scheduleIDMap.get(s.getScheduleId());
- Schedule existing = null;
- if(scheduleIdentityPairMap.get(s.getScheduleIdentityPair()) != null) {
- existing = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
- }
+// Schedule existing = null;
+// if(scheduleIdentityPairMap.get(s.getScheduleIdentityPair()) != null) {
+// existing = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
+// }
scheduleIDMap.put(s.getScheduleId(), s);
// Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
diff --git a/src/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
index 3aaf9ff..482334d 100644
--- a/src/java/azkaban/scheduler/ScheduleStatisticManager.java
+++ b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -8,7 +8,6 @@ import java.util.List;
import java.util.Map;
import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index 1f638d7..66bc178 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -6,15 +6,13 @@ import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
-import azkaban.executor.ExecutorManagerAdapter;
-import azkaban.project.ProjectManager;
import azkaban.trigger.Condition;
import azkaban.trigger.ConditionChecker;
import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAction;
import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerAdapter;
import azkaban.trigger.TriggerManagerException;
import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.trigger.builtin.ExecuteFlowAction;
@@ -23,29 +21,29 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
private static Logger logger = Logger.getLogger(TriggerBasedScheduleLoader.class);
- private TriggerManager triggerManager;
+ private TriggerManagerAdapter triggerManager;
private String triggerSource;
-// private Map<Integer, Trigger> triggersLocalCopy;
private long lastUpdateTime = -1;
- public TriggerBasedScheduleLoader(TriggerManager triggerManager, ExecutorManagerAdapter executorManager, ProjectManager projectManager, String triggerSource) {
+ public TriggerBasedScheduleLoader(TriggerManager triggerManager, String triggerSource) {
this.triggerManager = triggerManager;
this.triggerSource = triggerSource;
- // need to init the action types and condition checker types
- ExecuteFlowAction.setExecutorManager(executorManager);
- ExecuteFlowAction.setProjectManager(projectManager);
+// // need to init the action types and condition checker types
+// ExecuteFlowAction.setExecutorManager(executorManager);
+// ExecuteFlowAction.setProjectManager(projectManager);
}
private Trigger scheduleToTrigger(Schedule s) {
-
- Condition triggerCondition = createTimeTriggerCondition(s);
- Condition expireCondition = createTimeExpireCondition(s);
+ Condition triggerCondition = createTriggerCondition(s);
+ Condition expireCondition = createExpireCondition(s);
List<TriggerAction> actions = createActions(s);
Trigger t = new Trigger(s.getScheduleId(), s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), triggerSource, triggerCondition, expireCondition, actions);
if(s.isRecurring()) {
t.setResetOnTrigger(true);
+ } else {
+ t.setResetOnTrigger(false);
}
return t;
}
@@ -67,7 +65,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
return actions;
}
- private Condition createTimeTriggerCondition (Schedule s) {
+ private Condition createTriggerCondition (Schedule s) {
Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_1", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
checkers.put(checker.getId(), checker);
@@ -77,7 +75,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
}
// if failed to trigger, auto expire?
- private Condition createTimeExpireCondition (Schedule s) {
+ private Condition createExpireCondition (Schedule s) {
Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_2", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
checkers.put(checker.getId(), checker);
@@ -92,9 +90,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
try {
triggerManager.insertTrigger(t, t.getSubmitUser());
s.setScheduleId(t.getTriggerId());
-// triggersLocalCopy.put(t.getTriggerId(), t);
} catch (TriggerManagerException e) {
- // TODO Auto-generated catch block
throw new ScheduleManagerException("Failed to insert new schedule!", e);
}
}
@@ -104,9 +100,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
Trigger t = scheduleToTrigger(s);
try {
triggerManager.updateTrigger(t, t.getSubmitUser());
-// triggersLocalCopy.put(t.getTriggerId(), t);
} catch (TriggerManagerException e) {
- // TODO Auto-generated catch block
throw new ScheduleManagerException("Failed to update schedule!", e);
}
}
src/java/azkaban/sla/SlaOption.java 40(+38 -2)
diff --git a/src/java/azkaban/sla/SlaOption.java b/src/java/azkaban/sla/SlaOption.java
index ad5b536..264a28c 100644
--- a/src/java/azkaban/sla/SlaOption.java
+++ b/src/java/azkaban/sla/SlaOption.java
@@ -5,9 +5,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.joda.time.ReadablePeriod;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
-import azkaban.utils.Utils;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManagerException;
public class SlaOption {
@@ -34,6 +37,8 @@ public class SlaOption {
private Map<String, Object> info;
private List<String> actions;
+ private static DateTimeFormatter fmt = DateTimeFormat.forPattern("MM/dd, YYYY HH:mm");
+
public SlaOption(
String type,
List<String> actions,
@@ -125,5 +130,36 @@ public class SlaOption {
public String toString() {
return "Sla of " + getType() + getInfo() + getActions();
}
+
+ public static String createSlaMessage(SlaOption slaOption, ExecutableFlow flow) {
+ String type = slaOption.getType();
+ int execId = flow.getExecutionId();
+ if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
+ String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
+ String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+ String basicinfo = "SLA Alert: Your flow " + flowName + " failed to FINISH within " + duration + "</br>";
+ String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " is expected to FINISH within " + duration + " from " + fmt.print(new DateTime(flow.getStartTime())) + "</br>";
+ String actual = "Actual flow status is " + flow.getStatus();
+ return basicinfo + expected + actual;
+ } else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+ String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
+ String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+ String basicinfo = "SLA Alert: Your flow " + flowName + " failed to SUCCEED within " + duration + "</br>";
+ String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " expected to FINISH within " + duration + " from " + fmt.print(new DateTime(flow.getStartTime())) + "</br>";
+ String actual = "Actual flow status is " + flow.getStatus();
+ return basicinfo + expected + actual;
+ } else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
+ String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+ String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+ return "SLA Alert: Your job " + jobName + " failed to FINISH within " + duration + " in execution " + execId;
+ } else if(type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
+ String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+ String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+ return "SLA Alert: Your job " + jobName + " failed to SUCCEED within " + duration + " in execution " + execId;
+ } else {
+ return "Unrecognized SLA type " + type;
+ }
+ }
+
}
src/java/azkaban/trigger/ActionTypeLoader.java 196(+98 -98)
diff --git a/src/java/azkaban/trigger/ActionTypeLoader.java b/src/java/azkaban/trigger/ActionTypeLoader.java
index 4ee2ef3..1a8b665 100644
--- a/src/java/azkaban/trigger/ActionTypeLoader.java
+++ b/src/java/azkaban/trigger/ActionTypeLoader.java
@@ -30,10 +30,10 @@ public class ActionTypeLoader {
public void init(Props props) throws TriggerException {
// load built-in actions
-
- loadBuiltinActions();
-
- loadPluginActions(props);
+//
+// loadBuiltinActions();
+//
+// loadPluginActions(props);
}
@@ -44,100 +44,100 @@ public class ActionTypeLoader {
}
}
- private void loadPluginActions(Props props) throws TriggerException {
- String checkerDir = props.getString("azkaban.trigger.action.plugin.dir", DEFAULT_TRIGGER_ACTION_PLUGIN_DIR);
- File pluginDir = new File(checkerDir);
- if(!pluginDir.exists() || !pluginDir.isDirectory() || !pluginDir.canRead()) {
- logger.info("No trigger action plugins to load.");
- return;
- }
-
- logger.info("Loading plugin trigger actions from " + pluginDir);
- ClassLoader parentCl = this.getClass().getClassLoader();
-
- Props globalActionConf = null;
- File confFile = Utils.findFilefromDir(pluginDir, COMMONCONFFILE);
- try {
- if(confFile != null) {
- globalActionConf = new Props(null, confFile);
- } else {
- globalActionConf = new Props();
- }
- } catch (IOException e) {
- throw new TriggerException("Failed to get global properties." + e);
- }
-
- for(File dir : pluginDir.listFiles()) {
- if(dir.isDirectory() && dir.canRead()) {
- try {
- loadPluginTypes(globalActionConf, pluginDir, parentCl);
- } catch (Exception e) {
- logger.info("Plugin actions failed to load. " + e.getCause());
- throw new TriggerException("Failed to load all trigger actions!", e);
- }
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- private void loadPluginTypes(Props globalConf, File dir, ClassLoader parentCl) throws TriggerException {
- Props actionConf = null;
- File confFile = Utils.findFilefromDir(dir, ACTIONTYPECONFFILE);
- if(confFile == null) {
- logger.info("No action type found in " + dir.getAbsolutePath());
- return;
- }
- try {
- actionConf = new Props(globalConf, confFile);
- } catch (IOException e) {
- throw new TriggerException("Failed to load config for the action type", e);
- }
-
- String actionName = dir.getName();
- String actionClass = actionConf.getString("action.class");
-
- List<URL> resources = new ArrayList<URL>();
- for(File f : dir.listFiles()) {
- try {
- if(f.getName().endsWith(".jar")) {
- resources.add(f.toURI().toURL());
- logger.info("adding to classpath " + f.toURI().toURL());
- }
- } catch (MalformedURLException e) {
- // TODO Auto-generated catch block
- throw new TriggerException(e);
- }
- }
-
- // each job type can have a different class loader
- ClassLoader actionCl = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentCl);
-
- Class<? extends TriggerAction> clazz = null;
- try {
- clazz = (Class<? extends TriggerAction>)actionCl.loadClass(actionClass);
- actionToClass.put(actionName, clazz);
- }
- catch (ClassNotFoundException e) {
- throw new TriggerException(e);
- }
-
- if(actionConf.getBoolean("need.init")) {
- try {
- Utils.invokeStaticMethod(actionCl, actionClass, "init", actionConf);
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("Failed to init the action type " + actionName);
- throw new TriggerException(e);
- }
- }
-
- logger.info("Loaded action type " + actionName + " " + actionClass);
- }
-
- private void loadBuiltinActions() {
- actionToClass.put(ExecuteFlowAction.type, ExecuteFlowAction.class);
- logger.info("Loaded ExecuteFlowAction type.");
- }
+// private void loadPluginActions(Props props) throws TriggerException {
+// String checkerDir = props.getString("azkaban.trigger.action.plugin.dir", DEFAULT_TRIGGER_ACTION_PLUGIN_DIR);
+// File pluginDir = new File(checkerDir);
+// if(!pluginDir.exists() || !pluginDir.isDirectory() || !pluginDir.canRead()) {
+// logger.info("No trigger action plugins to load.");
+// return;
+// }
+//
+// logger.info("Loading plugin trigger actions from " + pluginDir);
+// ClassLoader parentCl = this.getClass().getClassLoader();
+//
+// Props globalActionConf = null;
+// File confFile = Utils.findFilefromDir(pluginDir, COMMONCONFFILE);
+// try {
+// if(confFile != null) {
+// globalActionConf = new Props(null, confFile);
+// } else {
+// globalActionConf = new Props();
+// }
+// } catch (IOException e) {
+// throw new TriggerException("Failed to get global properties." + e);
+// }
+//
+// for(File dir : pluginDir.listFiles()) {
+// if(dir.isDirectory() && dir.canRead()) {
+// try {
+// loadPluginTypes(globalActionConf, pluginDir, parentCl);
+// } catch (Exception e) {
+// logger.info("Plugin actions failed to load. " + e.getCause());
+// throw new TriggerException("Failed to load all trigger actions!", e);
+// }
+// }
+// }
+// }
+//
+// @SuppressWarnings("unchecked")
+// private void loadPluginTypes(Props globalConf, File dir, ClassLoader parentCl) throws TriggerException {
+// Props actionConf = null;
+// File confFile = Utils.findFilefromDir(dir, ACTIONTYPECONFFILE);
+// if(confFile == null) {
+// logger.info("No action type found in " + dir.getAbsolutePath());
+// return;
+// }
+// try {
+// actionConf = new Props(globalConf, confFile);
+// } catch (IOException e) {
+// throw new TriggerException("Failed to load config for the action type", e);
+// }
+//
+// String actionName = dir.getName();
+// String actionClass = actionConf.getString("action.class");
+//
+// List<URL> resources = new ArrayList<URL>();
+// for(File f : dir.listFiles()) {
+// try {
+// if(f.getName().endsWith(".jar")) {
+// resources.add(f.toURI().toURL());
+// logger.info("adding to classpath " + f.toURI().toURL());
+// }
+// } catch (MalformedURLException e) {
+// // TODO Auto-generated catch block
+// throw new TriggerException(e);
+// }
+// }
+//
+// // each job type can have a different class loader
+// ClassLoader actionCl = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentCl);
+//
+// Class<? extends TriggerAction> clazz = null;
+// try {
+// clazz = (Class<? extends TriggerAction>)actionCl.loadClass(actionClass);
+// actionToClass.put(actionName, clazz);
+// }
+// catch (ClassNotFoundException e) {
+// throw new TriggerException(e);
+// }
+//
+// if(actionConf.getBoolean("need.init")) {
+// try {
+// Utils.invokeStaticMethod(actionCl, actionClass, "init", actionConf);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Failed to init the action type " + actionName);
+// throw new TriggerException(e);
+// }
+// }
+//
+// logger.info("Loaded action type " + actionName + " " + actionClass);
+// }
+//
+// private void loadBuiltinActions() {
+// actionToClass.put(ExecuteFlowAction.type, ExecuteFlowAction.class);
+// logger.info("Loaded ExecuteFlowAction type.");
+// }
public static void registerBuiltinActions(Map<String, Class<? extends TriggerAction>> builtinActions) {
actionToClass.putAll(builtinActions);
diff --git a/src/java/azkaban/trigger/builtin/BasicTimeChecker.java b/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
index 13eaf27..8dbc1cc 100644
--- a/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
+++ b/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
@@ -64,19 +64,6 @@ public class BasicTimeChecker implements ConditionChecker {
return nextCheckTime;
}
-// public BasicTimeChecker(
-// DateTime firstCheckTime,
-// Boolean isRecurring,
-// Boolean skipPastChecks,
-// String period) {
-// this.firstCheckTime = firstCheckTime;
-// this.isRecurring = isRecurring;
-// this.skipPastChecks = skipPastChecks;
-// this.period = parsePeriodString(period);
-// this.nextCheckTime = new DateTime(firstCheckTime);
-// this.nextCheckTime = calculateNextCheckTime();
-// }
-
public BasicTimeChecker(
String id,
long firstCheckTime,
@@ -102,23 +89,10 @@ public class BasicTimeChecker implements ConditionChecker {
@Override
public void reset() {
this.nextCheckTime = calculateNextCheckTime();
-
}
- /*
- * TimeChecker format:
- * type_first-time-in-millis_next-time-in-millis_timezone_is-recurring_skip-past-checks_period
- */
@Override
public String getId() {
-// return getType() + "$" +
-// firstCheckTime.getMillis() + "$" +
-// nextCheckTime.getMillis() + "$" +
-// firstCheckTime.getZone().getID().replace('/', '0') + "$" +
-// //"offset"+firstCheckTime.getZone().getOffset(firstCheckTime.getMillis()) + "_" +
-// (isRecurring == true ? "1" : "0") + "$" +
-// (skipPastChecks == true ? "1" : "0") + "$" +
-// createPeriodString(period);
return id;
}
@@ -129,21 +103,6 @@ public class BasicTimeChecker implements ConditionChecker {
@SuppressWarnings("unchecked")
public static BasicTimeChecker createFromJson(Object obj) throws Exception {
-// Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
-// if(!jsonObj.get("type").equals(type)) {
-// throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
-// }
-// long firstCheckTime = Long.valueOf((String)jsonObj.get("firstCheckTime"));
-// String timezoneId = (String) jsonObj.get("timezone");
-// long nextCheckTime = Long.valueOf((String)jsonObj.get("nextCheckTime"));
-// DateTimeZone timezone = DateTimeZone.forID(timezoneId);
-//// DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
-//// DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
-// boolean isRecurring = Boolean.valueOf((String)jsonObj.get("isRecurring"));
-// boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
-// ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
-// String id = (String) jsonObj.get("id");
-// return new BasicTimeChecker(id, firstCheckTime, timezone, nextCheckTime, isRecurring, skipPastChecks, period);
return createFromJson((HashMap<String, Object>)obj);
}
@@ -156,8 +115,6 @@ public class BasicTimeChecker implements ConditionChecker {
String timezoneId = (String) jsonObj.get("timezone");
long nextCheckTime = Long.valueOf((String) jsonObj.get("nextCheckTime"));
DateTimeZone timezone = DateTimeZone.forID(timezoneId);
-// DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
-// DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
boolean isRecurring = Boolean.valueOf((String)jsonObj.get("isRecurring"));
boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
@@ -175,44 +132,6 @@ public class BasicTimeChecker implements ConditionChecker {
return createFromJson(obj);
}
-// public static ConditionChecker createFromJson(String obj) {
-// String str = (String) obj;
-// String[] parts = str.split("\\$");
-//
-// if(!parts[0].equals(type)) {
-// throw new RuntimeException("Cannot create checker of " + type + " from " + parts[0]);
-// }
-//
-// long firstMillis = Long.parseLong(parts[1]);
-// long nextMillis = Long.parseLong(parts[2]);
-// //DateTimeZone timezone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(parts[3]));
-// DateTimeZone timezone = DateTimeZone.forID(parts[3].replace('0', '/'));
-// boolean isRecurring = parts[4].equals("1") ? true : false;
-// boolean skipPastChecks = parts[5].equals("1") ? true : false;
-// ReadablePeriod period = parsePeriodString(parts[6]);
-//
-// return new BasicTimeChecker(new DateTime(firstMillis, timezone), new DateTime(nextMillis, timezone), isRecurring, skipPastChecks, period);
-// }
-//
-// @Override
-// public ConditionChecker fromJson(Object obj) {
-// String str = (String) obj;
-// String[] parts = str.split("_");
-//
-// if(!parts[0].equals(getType())) {
-// throw new RuntimeException("Cannot create checker of " + getType() + " from " + parts[0]);
-// }
-//
-// long firstMillis = Long.parseLong(parts[1]);
-// long nextMillis = Long.parseLong(parts[2]);
-// DateTimeZone timezone = DateTimeZone.forID(parts[3]);
-// boolean isRecurring = Boolean.valueOf(parts[4]);
-// boolean skipPastChecks = Boolean.valueOf(parts[5]);
-// ReadablePeriod period = parsePeriodString(parts[6]);
-//
-// return new BasicTimeChecker(new DateTime(firstMillis, timezone), new DateTime(nextMillis, timezone), isRecurring, skipPastChecks, period);
-// }
-
private void updateNextCheckTime(){
nextCheckTime = calculateNextCheckTime();
}
@@ -224,7 +143,6 @@ public class BasicTimeChecker implements ConditionChecker {
if(count > 100000) {
throw new IllegalStateException("100000 increments of period did not get to present time.");
}
-
if(period == null) {
break;
}else {
@@ -240,7 +158,6 @@ public class BasicTimeChecker implements ConditionChecker {
@Override
public Object getNum() {
- // TODO Auto-generated method stub
return null;
}
@@ -266,8 +183,6 @@ public class BasicTimeChecker implements ConditionChecker {
@Override
public void setContext(Map<String, Object> context) {
- // TODO Auto-generated method stub
-
}
}
diff --git a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index 6124b22..f19adfe 100644
--- a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -9,9 +9,9 @@ import org.apache.log4j.Logger;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
@@ -38,7 +38,6 @@ public class ExecuteFlowAction implements TriggerAction {
private static ProjectManager projectManager;
private ExecutionOptions executionOptions = new ExecutionOptions();
private List<SlaOption> slaOptions;
- private Map<String, Object> context;
private static Logger logger = Logger.getLogger(ExecuteFlowAction.class);
@@ -64,7 +63,7 @@ public class ExecuteFlowAction implements TriggerAction {
return projectId;
}
- public void setProjectId(int projectId) {
+ protected void setProjectId(int projectId) {
this.projectId = projectId;
}
@@ -72,7 +71,7 @@ public class ExecuteFlowAction implements TriggerAction {
return flowName;
}
- public void setFlowName(String flowName) {
+ protected void setFlowName(String flowName) {
this.flowName = flowName;
}
@@ -80,7 +79,7 @@ public class ExecuteFlowAction implements TriggerAction {
return submitUser;
}
- public void setSubmitUser(String submitUser) {
+ protected void setSubmitUser(String submitUser) {
this.submitUser = submitUser;
}
@@ -88,7 +87,7 @@ public class ExecuteFlowAction implements TriggerAction {
return executionOptions;
}
- public void setExecutionOptions(ExecutionOptions executionOptions) {
+ protected void setExecutionOptions(ExecutionOptions executionOptions) {
this.executionOptions = executionOptions;
}
@@ -96,7 +95,7 @@ public class ExecuteFlowAction implements TriggerAction {
return slaOptions;
}
- public void setSlaOptions(List<SlaOption> slaOptions) {
+ protected void setSlaOptions(List<SlaOption> slaOptions) {
this.slaOptions = slaOptions;
}
@@ -216,9 +215,9 @@ public class ExecuteFlowAction implements TriggerAction {
try{
executorManager.submitExecutableFlow(exflow, submitUser);
- Map<String, Object> outputProps = new HashMap<String, Object>();
- outputProps.put(EXEC_ID, exflow.getExecutionId());
- context.put(actionId, outputProps);
+// Map<String, Object> outputProps = new HashMap<String, Object>();
+// outputProps.put(EXEC_ID, exflow.getExecutionId());
+// context.put(actionId, outputProps);
logger.info("Invoked flow " + project.getName() + "." + flowName);
} catch (ExecutorManagerException e) {
throw new RuntimeException(e);
@@ -229,14 +228,15 @@ public class ExecuteFlowAction implements TriggerAction {
int execId = exflow.getExecutionId();
for(SlaOption sla : slaOptions) {
logger.info("Adding sla trigger " + sla.toString());
- SlaChecker slaFailChecker = new SlaChecker("slaFailChecker", sla, execId, false);
- Map<String, ConditionChecker> failCheckers = new HashMap<String, ConditionChecker>();
- failCheckers.put(slaFailChecker.getId(), slaFailChecker);
- Condition triggerCond = new Condition(failCheckers, slaFailChecker.getId() + ".eval()");
- SlaChecker slaPassChecker = new SlaChecker("slaPassChecker", sla, execId, true);
- Map<String, ConditionChecker> passCheckers = new HashMap<String, ConditionChecker>();
- passCheckers.put(slaPassChecker.getId(), slaPassChecker);
- Condition expireCond = new Condition(passCheckers, slaPassChecker.getId() + ".eval()");
+ SlaChecker slaChecker = new SlaChecker("slaChecker", sla, execId);
+ Map<String, ConditionChecker> slaCheckers = new HashMap<String, ConditionChecker>();
+ slaCheckers.put(slaChecker.getId(), slaChecker);
+ Condition triggerCond = new Condition(slaCheckers, slaChecker.getId() + ".eval()");
+ // if whole flow finish before violate sla, just abort
+ ExecutionChecker execChecker = new ExecutionChecker("execChecker", execId, null, Status.SUCCEEDED);
+ Map<String, ConditionChecker> expireCheckers = new HashMap<String, ConditionChecker>();
+ expireCheckers.put(execChecker.getId(), execChecker);
+ Condition expireCond = new Condition(expireCheckers, execChecker.getId() + ".eval()");
List<TriggerAction> actions = new ArrayList<TriggerAction>();
List<String> slaActions = sla.getActions();
for(String act : slaActions) {
@@ -265,7 +265,6 @@ public class ExecuteFlowAction implements TriggerAction {
@Override
public void setContext(Map<String, Object> context) {
- this.context = context;
}
@Override
@@ -273,5 +272,4 @@ public class ExecuteFlowAction implements TriggerAction {
return actionId;
}
-
}
diff --git a/src/java/azkaban/trigger/builtin/ExecutionChecker.java b/src/java/azkaban/trigger/builtin/ExecutionChecker.java
new file mode 100644
index 0000000..2906b5b
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/ExecutionChecker.java
@@ -0,0 +1,123 @@
+package azkaban.trigger.builtin;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorManagerAdapter;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+import azkaban.trigger.ConditionChecker;
+
+public class ExecutionChecker implements ConditionChecker{
+
+ public static final String type = "ExecutionChecker";
+ public static ExecutorManagerAdapter executorManager;
+
+ private String checkerId;
+ private int execId;
+ private String jobName;
+ private Status wantedStatus;
+
+ public ExecutionChecker(String checkerId, int execId, String jobName, Status wantedStatus) {
+ this.checkerId = checkerId;
+ this.execId = execId;
+ this.jobName = jobName;
+ this.wantedStatus = wantedStatus;
+ }
+
+ public static void setExecutorManager(ExecutorManagerAdapter em) {
+ executorManager = em;
+ }
+
+ @Override
+ public Object eval() {
+ ExecutableFlow exflow;
+ try {
+ exflow = executorManager.getExecutableFlow(execId);
+ } catch (ExecutorManagerException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return Boolean.FALSE;
+ }
+ if(jobName != null) {
+ ExecutableNode job = exflow.getExecutableNode(jobName);
+ if(job != null) {
+ return job.getStatus().equals(wantedStatus);
+ } else {
+ return Boolean.FALSE;
+ }
+ } else {
+ return exflow.getStatus().equals(wantedStatus);
+ }
+
+ }
+
+ @Override
+ public Object getNum() {
+ return null;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public String getId() {
+ return checkerId;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ public static ExecutionChecker createFromJson(HashMap<String, Object> jsonObj) throws Exception {
+ if(!jsonObj.get("type").equals(type)) {
+ throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
+ }
+ int execId = Integer.valueOf((String) jsonObj.get("execId"));
+ String jobName = null;
+ if(jsonObj.containsKey("jobName")) {
+ jobName = (String) jsonObj.get("jobName");
+ }
+ String checkerId = (String) jsonObj.get("checkerId");
+ Status wantedStatus = Status.valueOf((String)jsonObj.get("wantedStatus"));
+
+ return new ExecutionChecker(checkerId, execId, jobName, wantedStatus);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ConditionChecker fromJson(Object obj) throws Exception {
+ return createFromJson((HashMap<String, Object>) obj);
+ }
+
+ @Override
+ public Object toJson() {
+ Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("type", type);
+ jsonObj.put("execId", String.valueOf(execId));
+ if(jobName != null) {
+ jsonObj.put("jobName", jobName);
+ }
+ jsonObj.put("wantedStatus", wantedStatus.toString());
+ jsonObj.put("checkerId", checkerId);
+ return jsonObj;
+ }
+
+ @Override
+ public void stopChecker() {
+ }
+
+ @Override
+ public void setContext(Map<String, Object> context) {
+ }
+
+ @Override
+ public long getNextCheckTime() {
+ return -1;
+ }
+
+}
diff --git a/src/java/azkaban/trigger/builtin/KillExecutionAction.java b/src/java/azkaban/trigger/builtin/KillExecutionAction.java
index 0dabe73..74633e9 100644
--- a/src/java/azkaban/trigger/builtin/KillExecutionAction.java
+++ b/src/java/azkaban/trigger/builtin/KillExecutionAction.java
@@ -6,7 +6,6 @@ import java.util.Map;
import org.apache.log4j.Logger;
import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.trigger.TriggerAction;
diff --git a/src/java/azkaban/trigger/builtin/SendEmailAction.java b/src/java/azkaban/trigger/builtin/SendEmailAction.java
index ec946c2..c1b6efc 100644
--- a/src/java/azkaban/trigger/builtin/SendEmailAction.java
+++ b/src/java/azkaban/trigger/builtin/SendEmailAction.java
@@ -1,25 +1,17 @@
package azkaban.trigger.builtin;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.log4j.Logger;
-
-import azkaban.sla.SlaOption;
-import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAction;
import azkaban.utils.AbstractMailer;
import azkaban.utils.EmailMessage;
import azkaban.utils.Props;
public class SendEmailAction implements TriggerAction {
-
- private static final Logger logger = Logger.getLogger(SendEmailAction.class);
private String actionId;
- private Map<String, Object> context;
private static AbstractMailer mailer;
private String message;
public static final String type = "SendEmailAction";
diff --git a/src/java/azkaban/trigger/builtin/SlaAlertAction.java b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
index 11c6bd5..82d69c5 100644
--- a/src/java/azkaban/trigger/builtin/SlaAlertAction.java
+++ b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
@@ -1,29 +1,16 @@
package azkaban.trigger.builtin;
-import java.io.File;
-import java.lang.reflect.Constructor;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
+import azkaban.alert.Alerter;
import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutorMailer;
-import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerAdapter;
-import azkaban.executor.ExecutorManager.Alerter;
import azkaban.executor.ExecutorManagerException;
import azkaban.sla.SlaOption;
import azkaban.trigger.TriggerAction;
-import azkaban.utils.FileIOUtils;
-import azkaban.utils.Props;
-import azkaban.utils.PropsUtils;
public class SlaAlertAction implements TriggerAction{
@@ -35,8 +22,7 @@ public class SlaAlertAction implements TriggerAction{
private SlaOption slaOption;
private int execId;
// private List<Map<String, Object>> alerts;
- private static Map<String, Alerter> alerters;
- private Map<String, Object> context;
+ private static Map<String, azkaban.alert.Alerter> alerters;
private static ExecutorManagerAdapter executorManager;
public SlaAlertAction(String id, SlaOption slaOption, int execId) {
@@ -108,7 +94,8 @@ public class SlaAlertAction implements TriggerAction{
Alerter alerter = alerters.get(alertType);
if(alerter != null) {
try {
- alerter.alertOnSla(slaOption, createSlaMessage());
+ ExecutableFlow flow = executorManager.getExecutableFlow(execId);
+ alerter.alertOnSla(slaOption, SlaOption.createSlaMessage(slaOption, flow));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -122,45 +109,44 @@ public class SlaAlertAction implements TriggerAction{
// }
}
- private String createSlaMessage() {
- ExecutableFlow flow = null;
- try {
- flow = executorManager.getExecutableFlow(execId);
- } catch (ExecutorManagerException e) {
- e.printStackTrace();
- logger.error("Failed to get executable flow.");
- }
- String type = slaOption.getType();
- if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
- String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
- String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
- String basicinfo = "SLA Alert: Your flow " + flowName + " failed to FINISH within " + duration + "</br>";
- String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " is expected to FINISH within " + duration + " from " + flow.getStartTime() + "</br>";
- String actual = "Actual flow status is " + flow.getStatus();
- return basicinfo + expected + actual;
- } else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
- String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
- String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
- String basicinfo = "SLA Alert: Your flow " + flowName + " failed to SUCCEED within " + duration + "</br>";
- String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " expected to FINISH within " + duration + " from " + flow.getStartTime() + "</br>";
- String actual = "Actual flow status is " + flow.getStatus();
- return basicinfo + expected + actual;
- } else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
- String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
- String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
- return "SLA Alert: Your job " + jobName + " failed to FINISH within " + duration + " in execution " + execId;
- } else if(type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
- String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
- String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
- return "SLA Alert: Your job " + jobName + " failed to SUCCEED within " + duration + " in execution " + execId;
- } else {
- return "Unrecognized SLA type " + type;
- }
- }
+// private String createSlaMessage() {
+// ExecutableFlow flow = null;
+// try {
+// flow = executorManager.getExecutableFlow(execId);
+// } catch (ExecutorManagerException e) {
+// e.printStackTrace();
+// logger.error("Failed to get executable flow.");
+// }
+// String type = slaOption.getType();
+// if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
+// String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
+// String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+// String basicinfo = "SLA Alert: Your flow " + flowName + " failed to FINISH within " + duration + "</br>";
+// String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " is expected to FINISH within " + duration + " from " + flow.getStartTime() + "</br>";
+// String actual = "Actual flow status is " + flow.getStatus();
+// return basicinfo + expected + actual;
+// } else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+// String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
+// String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+// String basicinfo = "SLA Alert: Your flow " + flowName + " failed to SUCCEED within " + duration + "</br>";
+// String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " expected to FINISH within " + duration + " from " + flow.getStartTime() + "</br>";
+// String actual = "Actual flow status is " + flow.getStatus();
+// return basicinfo + expected + actual;
+// } else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
+// String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+// String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+// return "SLA Alert: Your job " + jobName + " failed to FINISH within " + duration + " in execution " + execId;
+// } else if(type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
+// String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+// String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+// return "SLA Alert: Your job " + jobName + " failed to SUCCEED within " + duration + " in execution " + execId;
+// } else {
+// return "Unrecognized SLA type " + type;
+// }
+// }
@Override
public void setContext(Map<String, Object> context) {
- this.context = context;
}
@Override
src/java/azkaban/trigger/builtin/SlaChecker.java 58(+16 -42)
diff --git a/src/java/azkaban/trigger/builtin/SlaChecker.java b/src/java/azkaban/trigger/builtin/SlaChecker.java
index 25ca9b3..0219eab 100644
--- a/src/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/src/java/azkaban/trigger/builtin/SlaChecker.java
@@ -24,38 +24,26 @@ public class SlaChecker implements ConditionChecker{
private String id;
private SlaOption slaOption;
private int execId;
- private Map<String, Object> context;
- private boolean passChecker = true;
private long checkTime = -1;
private static ExecutorManagerAdapter executorManager;
- public SlaChecker(String id, SlaOption slaOption, int execId, boolean passChecker) {
+ public SlaChecker(String id, SlaOption slaOption, int execId) {
this.id = id;
this.slaOption = slaOption;
this.execId = execId;
- this.passChecker = passChecker;
- }
-
- public SlaChecker(String id, SlaOption sla, String executionActionId, boolean passChecker) {
- Map<String, Object> executeActionProps = (Map<String, Object>) context.get(executionActionId);
- int execId = Integer.valueOf((String) executeActionProps.get(ExecuteFlowAction.EXEC_ID));
- this.id = id;
- this.slaOption = sla;
- this.execId = execId;
- this.passChecker = passChecker;
}
public static void setExecutorManager(ExecutorManagerAdapter em) {
executorManager = em;
}
- private Boolean metSla(ExecutableFlow flow) {
+ private Boolean violateSla(ExecutableFlow flow) {
String type = slaOption.getType();
logger.info("Checking for " + flow.getExecutionId() + " with sla " + type);
logger.info("flow is " + flow.getStatus());
if(flow.getStartTime() < 0) {
- return null;
+ return Boolean.FALSE;
}
if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
@@ -65,9 +53,9 @@ public class SlaChecker implements ConditionChecker{
if(checkTime.isBeforeNow()) {
Status status = flow.getStatus();
if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
- return Boolean.TRUE;
- } else {
return Boolean.FALSE;
+ } else {
+ return Boolean.TRUE;
}
}
} else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
@@ -78,9 +66,9 @@ public class SlaChecker implements ConditionChecker{
if(checkTime.isBeforeNow()) {
Status status = flow.getStatus();
if(status.equals(Status.SUCCEEDED)) {
- return Boolean.TRUE;
- } else {
return Boolean.FALSE;
+ } else {
+ return Boolean.TRUE;
}
}
} else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
@@ -94,9 +82,9 @@ public class SlaChecker implements ConditionChecker{
if(checkTime.isBeforeNow()) {
Status status = node.getStatus();
if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
- return Boolean.TRUE;
- } else {
return Boolean.FALSE;
+ } else {
+ return Boolean.TRUE;
}
}
}
@@ -111,9 +99,9 @@ public class SlaChecker implements ConditionChecker{
if(checkTime.isBeforeNow()) {
Status status = node.getStatus();
if(status.equals(Status.SUCCEEDED)) {
- return Boolean.TRUE;
- } else {
return Boolean.FALSE;
+ } else {
+ return Boolean.TRUE;
}
}
}
@@ -137,10 +125,10 @@ public class SlaChecker implements ConditionChecker{
// return Boolean.FALSE;
// }
// }
- return null;
+ return Boolean.FALSE;
}
- // return true for should do sla actions
+ // return true to trigger sla action
@Override
public Object eval() {
ExecutableFlow flow;
@@ -149,30 +137,19 @@ public class SlaChecker implements ConditionChecker{
} catch (ExecutorManagerException e) {
logger.error("Can't get executable flow.", e);
e.printStackTrace();
+ // something wrong, send out alerts
return Boolean.TRUE;
}
- Boolean metSla = metSla(flow);
- if(metSla == null) {
- return Boolean.FALSE;
- } else {
- if(passChecker) {
- return metSla;
- } else {
- return !metSla;
- }
- }
+ return violateSla(flow);
}
@Override
public Object getNum() {
- // TODO Auto-generated method stub
return null;
}
@Override
public void reset() {
- // TODO Auto-generated method stub
-
}
@Override
@@ -204,8 +181,7 @@ public class SlaChecker implements ConditionChecker{
String id = (String) jsonObj.get("id");
SlaOption slaOption = SlaOption.fromObject(jsonObj.get("slaOption"));
int execId = Integer.valueOf((String) jsonObj.get("execId"));
- boolean passChecker = Boolean.valueOf((Boolean) jsonObj.get("passChecker"));
- return new SlaChecker(id, slaOption, execId, passChecker);
+ return new SlaChecker(id, slaOption, execId);
}
@Override
@@ -215,7 +191,6 @@ public class SlaChecker implements ConditionChecker{
jsonObj.put("id", id);
jsonObj.put("slaOption", slaOption.toObject());
jsonObj.put("execId", String.valueOf(execId));
- jsonObj.put("passChecker", passChecker);
return jsonObj;
}
@@ -227,7 +202,6 @@ public class SlaChecker implements ConditionChecker{
@Override
public void setContext(Map<String, Object> context) {
- this.context = context;
}
@Override
src/java/azkaban/trigger/CheckerTypeLoader.java 196(+98 -98)
diff --git a/src/java/azkaban/trigger/CheckerTypeLoader.java b/src/java/azkaban/trigger/CheckerTypeLoader.java
index 45fc8ff..bc5c06a 100644
--- a/src/java/azkaban/trigger/CheckerTypeLoader.java
+++ b/src/java/azkaban/trigger/CheckerTypeLoader.java
@@ -31,10 +31,10 @@ public class CheckerTypeLoader {
// load built-in checkers
-
- loadBuiltinCheckers();
-
- loadPluginCheckers(props);
+//
+// loadBuiltinCheckers();
+//
+// loadPluginCheckers(props);
}
@@ -45,96 +45,96 @@ public class CheckerTypeLoader {
}
}
- private void loadPluginCheckers(Props props) throws TriggerException {
-
- String checkerDir = props.getString("azkaban.condition.checker.plugin.dir", DEFAULT_CONDITION_CHECKER_PLUGIN_DIR);
- File pluginDir = new File(checkerDir);
- if(!pluginDir.exists() || !pluginDir.isDirectory() || !pluginDir.canRead()) {
- logger.info("No conditon checker plugins to load.");
- return;
- }
-
- logger.info("Loading plugin condition checkers from " + pluginDir);
- ClassLoader parentCl = this.getClass().getClassLoader();
-
- Props globalCheckerConf = null;
- File confFile = Utils.findFilefromDir(pluginDir, COMMONCONFFILE);
- try {
- if(confFile != null) {
- globalCheckerConf = new Props(null, confFile);
- } else {
- globalCheckerConf = new Props();
- }
- } catch (IOException e) {
- throw new TriggerException("Failed to get global properties." + e);
- }
-
- for(File dir : pluginDir.listFiles()) {
- if(dir.isDirectory() && dir.canRead()) {
- try {
- loadPluginTypes(globalCheckerConf, pluginDir, parentCl);
- } catch (Exception e) {
- logger.info("Plugin checkers failed to load. " + e.getCause());
- throw new TriggerException("Failed to load all condition checkers!", e);
- }
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- private void loadPluginTypes(Props globalConf, File dir, ClassLoader parentCl) throws TriggerException {
- Props checkerConf = null;
- File confFile = Utils.findFilefromDir(dir, CHECKERTYPECONFFILE);
- if(confFile == null) {
- logger.info("No checker type found in " + dir.getAbsolutePath());
- return;
- }
- try {
- checkerConf = new Props(globalConf, confFile);
- } catch (IOException e) {
- throw new TriggerException("Failed to load config for the checker type", e);
- }
-
- String checkerName = dir.getName();
- String checkerClass = checkerConf.getString("checker.class");
-
- List<URL> resources = new ArrayList<URL>();
- for(File f : dir.listFiles()) {
- try {
- if(f.getName().endsWith(".jar")) {
- resources.add(f.toURI().toURL());
- logger.info("adding to classpath " + f.toURI().toURL());
- }
- } catch (MalformedURLException e) {
- // TODO Auto-generated catch block
- throw new TriggerException(e);
- }
- }
-
- // each job type can have a different class loader
- ClassLoader checkerCl = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentCl);
-
- Class<? extends ConditionChecker> clazz = null;
- try {
- clazz = (Class<? extends ConditionChecker>)checkerCl.loadClass(checkerClass);
- checkerToClass.put(checkerName, clazz);
- }
- catch (ClassNotFoundException e) {
- throw new TriggerException(e);
- }
-
- if(checkerConf.getBoolean("need.init")) {
- try {
- Utils.invokeStaticMethod(checkerCl, checkerClass, "init", checkerConf);
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("Failed to init the checker type " + checkerName);
- throw new TriggerException(e);
- }
- }
-
- logger.info("Loaded checker type " + checkerName + " " + checkerClass);
- }
+// private void loadPluginCheckers(Props props) throws TriggerException {
+//
+// String checkerDir = props.getString("azkaban.condition.checker.plugin.dir", DEFAULT_CONDITION_CHECKER_PLUGIN_DIR);
+// File pluginDir = new File(checkerDir);
+// if(!pluginDir.exists() || !pluginDir.isDirectory() || !pluginDir.canRead()) {
+// logger.info("No conditon checker plugins to load.");
+// return;
+// }
+//
+// logger.info("Loading plugin condition checkers from " + pluginDir);
+// ClassLoader parentCl = this.getClass().getClassLoader();
+//
+// Props globalCheckerConf = null;
+// File confFile = Utils.findFilefromDir(pluginDir, COMMONCONFFILE);
+// try {
+// if(confFile != null) {
+// globalCheckerConf = new Props(null, confFile);
+// } else {
+// globalCheckerConf = new Props();
+// }
+// } catch (IOException e) {
+// throw new TriggerException("Failed to get global properties." + e);
+// }
+//
+// for(File dir : pluginDir.listFiles()) {
+// if(dir.isDirectory() && dir.canRead()) {
+// try {
+// loadPluginTypes(globalCheckerConf, pluginDir, parentCl);
+// } catch (Exception e) {
+// logger.info("Plugin checkers failed to load. " + e.getCause());
+// throw new TriggerException("Failed to load all condition checkers!", e);
+// }
+// }
+// }
+// }
+//
+// @SuppressWarnings("unchecked")
+// private void loadPluginTypes(Props globalConf, File dir, ClassLoader parentCl) throws TriggerException {
+// Props checkerConf = null;
+// File confFile = Utils.findFilefromDir(dir, CHECKERTYPECONFFILE);
+// if(confFile == null) {
+// logger.info("No checker type found in " + dir.getAbsolutePath());
+// return;
+// }
+// try {
+// checkerConf = new Props(globalConf, confFile);
+// } catch (IOException e) {
+// throw new TriggerException("Failed to load config for the checker type", e);
+// }
+//
+// String checkerName = dir.getName();
+// String checkerClass = checkerConf.getString("checker.class");
+//
+// List<URL> resources = new ArrayList<URL>();
+// for(File f : dir.listFiles()) {
+// try {
+// if(f.getName().endsWith(".jar")) {
+// resources.add(f.toURI().toURL());
+// logger.info("adding to classpath " + f.toURI().toURL());
+// }
+// } catch (MalformedURLException e) {
+// // TODO Auto-generated catch block
+// throw new TriggerException(e);
+// }
+// }
+//
+// // each job type can have a different class loader
+// ClassLoader checkerCl = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentCl);
+//
+// Class<? extends ConditionChecker> clazz = null;
+// try {
+// clazz = (Class<? extends ConditionChecker>)checkerCl.loadClass(checkerClass);
+// checkerToClass.put(checkerName, clazz);
+// }
+// catch (ClassNotFoundException e) {
+// throw new TriggerException(e);
+// }
+//
+// if(checkerConf.getBoolean("need.init")) {
+// try {
+// Utils.invokeStaticMethod(checkerCl, checkerClass, "init", checkerConf);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Failed to init the checker type " + checkerName);
+// throw new TriggerException(e);
+// }
+// }
+//
+// logger.info("Loaded checker type " + checkerName + " " + checkerClass);
+// }
public static void registerBuiltinCheckers(Map<String, Class<? extends ConditionChecker>> builtinCheckers) {
checkerToClass.putAll(checkerToClass);
@@ -143,10 +143,10 @@ public class CheckerTypeLoader {
}
}
- private void loadBuiltinCheckers() {
- checkerToClass.put("BasicTimeChecker", BasicTimeChecker.class);
- logger.info("Loaded BasicTimeChecker type.");
- }
+// private void loadBuiltinCheckers() {
+// checkerToClass.put("BasicTimeChecker", BasicTimeChecker.class);
+// logger.info("Loaded BasicTimeChecker type.");
+// }
public ConditionChecker createCheckerFromJson(String type, Object obj) throws Exception {
ConditionChecker checker = null;
src/java/azkaban/trigger/Condition.java 22(+16 -6)
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index d032527..2f355d9 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -21,7 +21,7 @@ public class Condition {
private Expression expression;
private Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
private MapContext context = new MapContext();
- private long nextCheckTime = -1;
+ private Long nextCheckTime = -1L;
public Condition(Map<String, ConditionChecker> checkers, String expr) {
setCheckers(checkers);
@@ -32,6 +32,9 @@ public class Condition {
public Condition(Map<String, ConditionChecker> checkers, String expr, long nextCheckTime) {
this.nextCheckTime = nextCheckTime;
setCheckers(checkers);
+// for(ConditionChecker ck : checkers.values()) {
+// ck.setCondition(this);
+// }
this.expression = jexl.createExpression(expr);
}
@@ -43,18 +46,17 @@ public class Condition {
Condition.checkerLoader = loader;
}
- public static CheckerTypeLoader getCheckerLoader() {
+ protected static CheckerTypeLoader getCheckerLoader() {
return checkerLoader;
}
- public void registerChecker(ConditionChecker checker) {
+ protected void registerChecker(ConditionChecker checker) {
checkers.put(checker.getId(), checker);
context.set(checker.getId(), checker);
updateNextCheckTime();
}
public long getNextCheckTime() {
- updateNextCheckTime();
return nextCheckTime;
}
@@ -66,10 +68,17 @@ public class Condition {
this.checkers = checkers;
for(ConditionChecker checker : checkers.values()) {
this.context.set(checker.getId(), checker);
+// checker.setCondition(this);
}
updateNextCheckTime();
}
+ public void updateCheckTime(Long ct) {
+ if(nextCheckTime < ct) {
+ nextCheckTime = ct;
+ }
+ }
+
private void updateNextCheckTime() {
long time = Long.MAX_VALUE;
for(ConditionChecker checker : checkers.values()) {
@@ -83,7 +92,7 @@ public class Condition {
checker.reset();
}
updateNextCheckTime();
- logger.error("Done resetting checkers. The next check time will be " + new DateTime(nextCheckTime));
+ logger.info("Done resetting checkers. The next check time will be " + new DateTime(nextCheckTime));
}
public String getExpression() {
@@ -95,6 +104,7 @@ public class Condition {
}
public boolean isMet() {
+ logger.info("Testing ondition " + expression);
return expression.evaluate(context).equals(Boolean.TRUE);
}
@@ -134,7 +144,7 @@ public class Condition {
checkers.put(ck.getId(), ck);
}
String expr = (String) jsonObj.get("expression");
- long nextCheckTime = Long.valueOf((String) jsonObj.get("nextCheckTime"));
+ Long nextCheckTime = Long.valueOf((String) jsonObj.get("nextCheckTime"));
cond = new Condition(checkers, expr, nextCheckTime);
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
index 342312b..c78267e 100644
--- a/src/java/azkaban/trigger/ConditionChecker.java
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -24,4 +24,8 @@ public interface ConditionChecker {
void setContext(Map<String, Object> context);
long getNextCheckTime();
+
+// void setCondition(Condition c);
+//
+// String getDescription();
}
src/java/azkaban/trigger/TriggerManager.java 71(+34 -37)
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index e964ecf..d4f1698 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -8,9 +8,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
import azkaban.utils.Props;
@@ -18,7 +18,7 @@ public class TriggerManager implements TriggerManagerAdapter{
private static Logger logger = Logger.getLogger(TriggerManager.class);
public static final long DEFAULT_SCANNER_INTERVAL_MS = 60000;
- private static Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
+ private static Map<Integer, Trigger> triggerIdMap = new ConcurrentHashMap<Integer, Trigger>();
private CheckerTypeLoader checkerTypeLoader;
private ActionTypeLoader actionTypeLoader;
@@ -47,8 +47,11 @@ public class TriggerManager implements TriggerManagerAdapter{
} catch (Exception e) {
throw new TriggerManagerException(e);
}
+
Condition.setCheckerLoader(checkerTypeLoader);
Trigger.setActionTypeLoader(actionTypeLoader);
+
+ logger.info("TriggerManager loaded.");
}
@Override
@@ -192,7 +195,6 @@ public class TriggerManager implements TriggerManagerAdapter{
} catch(InterruptedException e) {
logger.info("Interrupted. Probably to shut down.");
}
-
}
}
}
@@ -247,7 +249,6 @@ public class TriggerManager implements TriggerManagerAdapter{
catch (TriggerLoaderException e) {
throw new TriggerManagerException(e);
}
-// updateAgent(t);
}
private void onTriggerExpire(Trigger t) throws TriggerManagerException {
@@ -304,6 +305,7 @@ public class TriggerManager implements TriggerManagerAdapter{
// updateAgent(t);
}
+ @Override
public List<Trigger> getTriggers(String triggerSource) {
List<Trigger> triggers = new ArrayList<Trigger>();
for(Trigger t : triggerIdMap.values()) {
@@ -326,29 +328,29 @@ public class TriggerManager implements TriggerManagerAdapter{
}
@Override
- public List<Integer> getTriggerUpdates(long lastUpdateTime) throws TriggerManagerException {
- List<Integer> triggers = new ArrayList<Integer>();
+ public List<Trigger> getAllTriggerUpdates(long lastUpdateTime) throws TriggerManagerException {
+ List<Trigger> triggers = new ArrayList<Trigger>();
for(Trigger t : triggerIdMap.values()) {
if(t.getLastModifyTime() > lastUpdateTime) {
- triggers.add(t.getTriggerId());
+ triggers.add(t);
}
}
return triggers;
}
- public void loadTrigger(int triggerId) throws TriggerManagerException {
- Trigger t;
- try {
- t = triggerLoader.loadTrigger(triggerId);
- } catch (TriggerLoaderException e) {
- throw new TriggerManagerException(e);
- }
- if(t.getStatus().equals(TriggerStatus.PREPARING)) {
- triggerIdMap.put(t.getTriggerId(), t);
- runnerThread.addTrigger(t);
- t.setStatus(TriggerStatus.READY);
- }
- }
+// public void loadTrigger(int triggerId) throws TriggerManagerException {
+// Trigger t;
+// try {
+// t = triggerLoader.loadTrigger(triggerId);
+// } catch (TriggerLoaderException e) {
+// throw new TriggerManagerException(e);
+// }
+// if(t.getStatus().equals(TriggerStatus.PREPARING)) {
+// triggerIdMap.put(t.getTriggerId(), t);
+// runnerThread.addTrigger(t);
+// t.setStatus(TriggerStatus.READY);
+// }
+// }
@Override
public void insertTrigger(Trigger t, String user) throws TriggerManagerException {
@@ -361,27 +363,22 @@ public class TriggerManager implements TriggerManagerAdapter{
}
@Override
- public void updateTrigger(int triggerId, String user) throws TriggerManagerException {
- updateTrigger(triggerId);
- }
-
- @Override
public void updateTrigger(Trigger t, String user) throws TriggerManagerException {
updateTrigger(t);
}
- @Override
- public void insertTrigger(int triggerId, String user) throws TriggerManagerException {
- Trigger t;
- try {
- t = triggerLoader.loadTrigger(triggerId);
- } catch (TriggerLoaderException e) {
- throw new TriggerManagerException(e);
- }
- if(t != null) {
- insertTrigger(t);
- }
- }
+// @Override
+// public void insertTrigger(int triggerId, String user) throws TriggerManagerException {
+// Trigger t;
+// try {
+// t = triggerLoader.loadTrigger(triggerId);
+// } catch (TriggerLoaderException e) {
+// throw new TriggerManagerException(e);
+// }
+// if(t != null) {
+// insertTrigger(t);
+// }
+// }
@Override
public void shutdown() {
diff --git a/src/java/azkaban/trigger/TriggerManagerAdapter.java b/src/java/azkaban/trigger/TriggerManagerAdapter.java
index 0934985..cdf2921 100644
--- a/src/java/azkaban/trigger/TriggerManagerAdapter.java
+++ b/src/java/azkaban/trigger/TriggerManagerAdapter.java
@@ -1,27 +1,22 @@
package azkaban.trigger;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
-import org.joda.time.DateTimeZone;
-
-import azkaban.triggerapp.TriggerRunnerManagerException;
public interface TriggerManagerAdapter {
+
public void insertTrigger(Trigger t, String user) throws TriggerManagerException;
public void removeTrigger(int id, String user) throws TriggerManagerException;
- public void updateTrigger(int triggerId, String user) throws TriggerManagerException;
-
- void updateTrigger(Trigger t, String user) throws TriggerManagerException;
+ public void updateTrigger(Trigger t, String user) throws TriggerManagerException;
- public void insertTrigger(int triggerId, String user) throws TriggerManagerException;
-
- public List<Integer> getTriggerUpdates(long lastUpdateTime) throws TriggerManagerException;
+ public List<Trigger> getAllTriggerUpdates(long lastUpdateTime) throws TriggerManagerException;
public List<Trigger> getTriggerUpdates(String triggerSource, long lastUpdateTime) throws TriggerManagerException;
+
+ public List<Trigger> getTriggers(String trigegerSource);
public void start() throws TriggerManagerException;
diff --git a/src/java/azkaban/trigger/TriggerStatus.java b/src/java/azkaban/trigger/TriggerStatus.java
index 8d397bc..3fcadf7 100644
--- a/src/java/azkaban/trigger/TriggerStatus.java
+++ b/src/java/azkaban/trigger/TriggerStatus.java
@@ -1,7 +1,7 @@
package azkaban.trigger;
public enum TriggerStatus {
- READY(10), PAUSED(20), EXPIRED(30), PREPARING(40);
+ READY(10), PAUSED(20), EXPIRED(30);
private int numVal;
@@ -21,8 +21,6 @@ public enum TriggerStatus {
return PAUSED;
case 30:
return EXPIRED;
- case 40:
- return PREPARING;
default:
return READY;
}
diff --git a/src/java/azkaban/utils/PropsUtils.java b/src/java/azkaban/utils/PropsUtils.java
index 16afb52..507d433 100644
--- a/src/java/azkaban/utils/PropsUtils.java
+++ b/src/java/azkaban/utils/PropsUtils.java
@@ -24,7 +24,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.StringTokenizer;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
src/java/azkaban/utils/StringUtils.java 11(+11 -0)
diff --git a/src/java/azkaban/utils/StringUtils.java b/src/java/azkaban/utils/StringUtils.java
index 6684b00..2693365 100644
--- a/src/java/azkaban/utils/StringUtils.java
+++ b/src/java/azkaban/utils/StringUtils.java
@@ -38,6 +38,17 @@ public class StringUtils {
return buf.toString();
}
+ @Deprecated
+ public static String join(List<String> list, String delimiter) {
+ StringBuffer buffer = new StringBuffer();
+ for (String str: list) {
+ buffer.append(str);
+ buffer.append(delimiter);
+ }
+
+ return buffer.toString();
+ }
+
/**
* Use this when you don't want to include Apache Common's string for
* plugins.
src/java/azkaban/webapp/AzkabanWebServer.java 151(+57 -94)
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 2af906b..83ffba4 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -51,13 +51,11 @@ import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
+import azkaban.alert.Alerter;
import azkaban.database.AzkabanDatabaseSetup;
-import azkaban.executor.ExecutorMailer;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerAdapter;
-import azkaban.executor.ExecutorManagerRemoteAdapter;
import azkaban.executor.JdbcExecutorLoader;
-import azkaban.executor.ExecutorManager.Alerter;
import azkaban.jmx.JmxExecutorManagerAdapter;
import azkaban.jmx.JmxJettyServer;
import azkaban.jmx.JmxTriggerManager;
@@ -74,11 +72,13 @@ import azkaban.trigger.TriggerManagerException;
import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.trigger.builtin.CreateTriggerAction;
import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.trigger.builtin.ExecutionChecker;
import azkaban.trigger.builtin.KillExecutionAction;
import azkaban.trigger.builtin.SlaAlertAction;
import azkaban.trigger.builtin.SlaChecker;
import azkaban.user.UserManager;
import azkaban.user.XmlUserManager;
+import azkaban.utils.Emailer;
import azkaban.utils.FileIOUtils;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
@@ -144,8 +144,8 @@ public class AzkabanWebServer extends AzkabanServer {
private ProjectManager projectManager;
private ExecutorManagerAdapter executorManager;
private ScheduleManager scheduleManager;
-// private TriggerBasedScheduler scheduler;
private TriggerManager triggerManager;
+ private Map<String, Alerter> alerters;
private final ClassLoader baseClassLoader;
@@ -176,18 +176,23 @@ public class AzkabanWebServer extends AzkabanServer {
sessionCache = new SessionCache(props);
userManager = loadUserManager(props);
- triggerManager = loadTriggerManager(props);
+ alerters = loadAlerters(props);
+
executorManager = loadExecutorManager(props);
- projectManager = loadProjectManager(props, triggerManager);
+ projectManager = loadProjectManager(props);
+
+ triggerManager = loadTriggerManager(props);
+ loadBuiltinCheckersAndActions();
// load all triggger agents here
- scheduleManager = loadScheduleManager(executorManager, triggerManager, props);
+ scheduleManager = loadScheduleManager(triggerManager, props);
- loadBuiltinCheckersAndActions();
String triggerPluginDir = props.getString("trigger.plugin.dir", "plugins/triggers");
+
loadPluginCheckersAndActions(triggerPluginDir);
- baseClassLoader = getBaseClassloader();
+ //baseClassLoader = getBaseClassloader();
+ baseClassLoader = this.getClassLoader();
tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
@@ -196,7 +201,6 @@ public class AzkabanWebServer extends AzkabanServer {
String timezone = props.getString(DEFAULT_TIMEZONE_ID);
TimeZone.setDefault(TimeZone.getTimeZone(timezone));
DateTimeZone.setDefault(DateTimeZone.forID(timezone));
-
logger.info("Setting timezone to " + timezone);
}
@@ -215,9 +219,7 @@ public class AzkabanWebServer extends AzkabanServer {
Class<?> userManagerClass = props.getClass(USER_MANAGER_CLASS_PARAM, null);
logger.info("Loading user manager class " + userManagerClass.getName());
UserManager manager = null;
-
if (userManagerClass != null && userManagerClass.getConstructors().length > 0) {
-
try {
Constructor<?> userManagerConstructor = userManagerClass.getConstructor(Props.class);
manager = (UserManager) userManagerConstructor.newInstance(props);
@@ -230,13 +232,11 @@ public class AzkabanWebServer extends AzkabanServer {
else {
manager = new XmlUserManager(props);
}
-
return manager;
}
- private ProjectManager loadProjectManager(Props props, TriggerManager tm) {
+ private ProjectManager loadProjectManager(Props props) {
logger.info("Loading JDBC for project management");
-
JdbcProjectLoader loader = new JdbcProjectLoader(props);
ProjectManager manager = new ProjectManager(loader, props);
return manager;
@@ -244,53 +244,26 @@ public class AzkabanWebServer extends AzkabanServer {
private ExecutorManager loadExecutorManager(Props props) throws Exception {
JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
- ExecutorManager execManager = new ExecutorManager(props, loader);
+ ExecutorManager execManager = new ExecutorManager(props, loader, alerters);
return execManager;
}
- private ExecutorManagerAdapter loadExecutorManagerAdapter(Props props) throws Exception {
-// JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
-// ExecutorManager execManager = new ExecutorManager(props, loader, true);
-// return execManager;
- String executorMode = props.getString("executor.manager.mode", "local");
- ExecutorManagerAdapter adapter;
- if(executorMode.equals("local")) {
- adapter = loadExecutorManager(props);
- } else if(executorMode.equals("remote")) {
- JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
- adapter = new ExecutorManagerRemoteAdapter(props, loader);
- } else {
- throw new Exception("Unknown ExecutorManager mode " + executorMode);
- }
- return adapter;
- }
-
-// private ScheduleManager loadScheduleManager(ExecutorManagerAdapter executorManager, TriggerManager tm, Props props ) throws Exception {
-// ScheduleManager schedManager = null;
-// String scheduleLoaderType = props.getString("azkaban.scheduler.loader", "TriggerBasedScheduleLoader");
-// if(scheduleLoaderType.equals("JdbcScheduleLoader")) {
-// ScheduleLoader loader = new JdbcScheduleLoader(props);
-// schedManager = new ScheduleManager(executorManager, loader, false);
-// schedManager.setProjectManager(projectManager);
-// schedManager.start();
-// } else if(scheduleLoaderType.equals("TriggerBasedScheduleLoader")) {
-// logger.info("Loading trigger based scheduler");
-// ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, executorManager, null, ScheduleManager.triggerSource);
-// schedManager = new ScheduleManager(executorManager, loader, true);
+// private ExecutorManagerAdapter loadExecutorManagerAdapter(Props props) throws Exception {
+// String executorMode = props.getString("executor.manager.mode", "local");
+// ExecutorManagerAdapter adapter;
+// if(executorMode.equals("local")) {
+// adapter = loadExecutorManager(props);
+// } else {
+// throw new Exception("Unknown ExecutorManager mode " + executorMode);
// }
-//
-// return schedManager;
+// return adapter;
// }
-
- private ScheduleManager loadScheduleManager(ExecutorManagerAdapter executorManager, TriggerManager tm, Props props ) throws Exception {
+
+ private ScheduleManager loadScheduleManager(TriggerManager tm, Props props ) throws Exception {
logger.info("Loading trigger based scheduler");
- ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, executorManager, null, ScheduleManager.triggerSource);
- return new ScheduleManager(executorManager, loader);
+ ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, ScheduleManager.triggerSource);
+ return new ScheduleManager(loader);
}
-// private TriggerBasedScheduler loadScheduler(ExecutorManager executorManager, ProjectManager projectManager, TriggerManager triggerManager) {
-// TriggerBasedScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager);
-// return new TriggerBasedScheduler(executorManager, projectManager, triggerManager, loader);
-// }
private TriggerManager loadTriggerManager(Props props) throws TriggerManagerException {
TriggerLoader loader = new JdbcTriggerLoader(props);
@@ -299,7 +272,6 @@ public class AzkabanWebServer extends AzkabanServer {
private void loadBuiltinCheckersAndActions() {
logger.info("Loading built-in checker and action types");
-
if(triggerManager instanceof TriggerManager) {
SlaChecker.setExecutorManager(executorManager);
ExecuteFlowAction.setExecutorManager(executorManager);
@@ -307,14 +279,15 @@ public class AzkabanWebServer extends AzkabanServer {
ExecuteFlowAction.setTriggerManager(triggerManager);
KillExecutionAction.setExecutorManager(executorManager);
SlaAlertAction.setExecutorManager(executorManager);
- Map<String, Alerter> alerters = loadAlerters(props);
+ //Map<String, azkaban.alert.Alerter> alerters = loadAlerters(props);
SlaAlertAction.setAlerters(alerters);
SlaAlertAction.setExecutorManager(executorManager);
CreateTriggerAction.setTriggerManager(triggerManager);
+ ExecutionChecker.setExecutorManager(executorManager);
}
-
triggerManager.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
triggerManager.registerCheckerType(SlaChecker.type, SlaChecker.class);
+ triggerManager.registerCheckerType(ExecutionChecker.type, ExecutionChecker.class);
triggerManager.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
triggerManager.registerActionType(KillExecutionAction.type, KillExecutionAction.class);
triggerManager.registerActionType(SlaAlertAction.type, SlaAlertAction.class);
@@ -324,7 +297,7 @@ public class AzkabanWebServer extends AzkabanServer {
private Map<String, Alerter> loadAlerters(Props props) {
Map<String, Alerter> allAlerters = new HashMap<String, Alerter>();
// load built-in alerters
- ExecutorMailer mailAlerter = new ExecutorMailer(props);
+ Emailer mailAlerter = new Emailer(props);
allAlerters.put("email", mailAlerter);
// load all plugin alerters
String pluginDir = props.getString("alerter.plugin.dir", "plugins/alerter");
@@ -662,28 +635,28 @@ public class AzkabanWebServer extends AzkabanServer {
return engine;
}
- private ClassLoader getBaseClassloader() throws MalformedURLException {
- final ClassLoader retVal;
-
- String hadoopHome = System.getenv("HADOOP_HOME");
- String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
-
- if (hadoopConfDir != null) {
- logger.info("Using hadoop config found in " + hadoopConfDir);
- retVal = new URLClassLoader(new URL[] { new File(hadoopConfDir)
- .toURI().toURL() }, getClass().getClassLoader());
- } else if (hadoopHome != null) {
- logger.info("Using hadoop config found in " + hadoopHome);
- retVal = new URLClassLoader(
- new URL[] { new File(hadoopHome, "conf").toURI().toURL() },
- getClass().getClassLoader());
- } else {
- logger.info("HADOOP_HOME not set, using default hadoop config.");
- retVal = getClass().getClassLoader();
- }
-
- return retVal;
- }
+// private ClassLoader getBaseClassloader() throws MalformedURLException {
+// final ClassLoader retVal;
+//
+// String hadoopHome = System.getenv("HADOOP_HOME");
+// String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
+//
+// if (hadoopConfDir != null) {
+// logger.info("Using hadoop config found in " + hadoopConfDir);
+// retVal = new URLClassLoader(new URL[] { new File(hadoopConfDir)
+// .toURI().toURL() }, getClass().getClassLoader());
+// } else if (hadoopHome != null) {
+// logger.info("Using hadoop config found in " + hadoopHome);
+// retVal = new URLClassLoader(
+// new URL[] { new File(hadoopHome, "conf").toURI().toURL() },
+// getClass().getClassLoader());
+// } else {
+// logger.info("HADOOP_HOME not set, using default hadoop config.");
+// retVal = getClass().getClassLoader();
+// }
+//
+// return retVal;
+// }
public ClassLoader getClassLoader() {
return baseClassLoader;
@@ -883,9 +856,9 @@ public class AzkabanWebServer extends AzkabanServer {
}
String pluginName = pluginProps.getString("trigger.name");
- String pluginWebPath = pluginProps.getString("trigger.web.path");
- int pluginOrder = pluginProps.getInt("trigger.order", 0);
- boolean pluginHidden = pluginProps.getBoolean("trigger.hidden", false);
+// String pluginWebPath = pluginProps.getString("trigger.web.path");
+// int pluginOrder = pluginProps.getInt("trigger.order", 0);
+// boolean pluginHidden = pluginProps.getBoolean("trigger.hidden", false);
List<String> extLibClasspath = pluginProps.getStringList("trigger.external.classpaths", (List<String>)null);
String pluginClass = pluginProps.getString("trigger.class");
@@ -963,8 +936,6 @@ public class AzkabanWebServer extends AzkabanServer {
}
TriggerPlugin plugin = (TriggerPlugin) obj;
-// AbstractAzkabanServlet avServlet = (AbstractAzkabanServlet) plugin.getServlet();
-// root.addServlet(new ServletHolder(avServlet), "/" + pluginWebPath + "/*");
installedTriggerPlugins.put(pluginName, plugin);
}
@@ -974,14 +945,6 @@ public class AzkabanWebServer extends AzkabanServer {
VelocityEngine ve = azkabanWebApp.getVelocityEngine();
ve.addProperty("jar.resource.loader.path", jarResourcePath);
-// // Sort plugins based on order
-// Collections.sort(installedTriggerPlugins, new Comparator<TriggerPlugin>() {
-// @Override
-// public int compare(TriggerPlugin o1, TriggerPlugin o2) {
-// return o1.getOrder() - o2.getOrder();
-// }
-// });
-
return installedTriggerPlugins;
}
diff --git a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
index 601abda..c9e2603 100644
--- a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
+++ b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
@@ -18,7 +18,6 @@ import org.apache.log4j.Logger;
import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerAdapter;
-import azkaban.triggerapp.TriggerConnectorParams;
import azkaban.trigger.TriggerManager;
import azkaban.user.Permission;
import azkaban.user.Role;
@@ -77,16 +76,17 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
Map<String, Object> result = executorManager.callExecutorJMX(hostPort, JMX_GET_ALL_MBEAN_ATTRIBUTES, mbean);
ret = result;
}
- else if (TriggerConnectorParams.JMX_GET_ALL_TRIGGER_SERVER_ATTRIBUTES.equals(ajax)) {
- if(!hasParam(req, JMX_MBEAN) || !hasParam(req, JMX_HOSTPORT)) {
- ret.put("error", "Parameters '" + JMX_MBEAN + "' and '"+ JMX_HOSTPORT +"' must be set");
- this.writeJSON(resp, ret, true);
- return;
- }
-// String hostPort = getParam(req, JMX_HOSTPORT);
-// String mbean = getParam(req, JMX_MBEAN);
- ret = triggerManager.getJMX().getAllJMXMbeans();
- }
+// else
+// if (TriggerConnectorParams.JMX_GET_ALL_TRIGGER_SERVER_ATTRIBUTES.equals(ajax)) {
+// if(!hasParam(req, JMX_MBEAN) || !hasParam(req, JMX_HOSTPORT)) {
+// ret.put("error", "Parameters '" + JMX_MBEAN + "' and '"+ JMX_HOSTPORT +"' must be set");
+// this.writeJSON(resp, ret, true);
+// return;
+// }
+//// String hostPort = getParam(req, JMX_HOSTPORT);
+//// String mbean = getParam(req, JMX_MBEAN);
+// ret = triggerManager.getJMX().getAllJMXMbeans();
+// }
else if (JMX_GET_MBEANS.equals(ajax)) {
ret.put("mbeans", server.getMbeanNames());
}
diff --git a/src/java/azkaban/webapp/session/SessionCache.java b/src/java/azkaban/webapp/session/SessionCache.java
index 514606a..3628be1 100644
--- a/src/java/azkaban/webapp/session/SessionCache.java
+++ b/src/java/azkaban/webapp/session/SessionCache.java
@@ -20,7 +20,6 @@ import azkaban.utils.Props;
import azkaban.utils.cache.Cache;
import azkaban.utils.cache.CacheManager;
import azkaban.utils.cache.Cache.EjectionPolicy;
-import azkaban.utils.cache.Element;
/**
diff --git a/src/package/execserver/bin/azkaban-executor-start.sh b/src/package/execserver/bin/azkaban-executor-start.sh
index 912eaa0..80261d7 100755
--- a/src/package/execserver/bin/azkaban-executor-start.sh
+++ b/src/package/execserver/bin/azkaban-executor-start.sh
@@ -19,6 +19,19 @@ do
CLASSPATH=$CLASSPATH:$file
done
+if [ "HADOOP_HOME" != "" ]; then
+ for file in $HADOOP_HOME/hadoop-core*.jar do
+ CLASSPATH=$CLASSPATH:$file
+ done
+ CLASSPATH=$CLASSPATH:$HADOOP_HOME/conf
+else
+ echo "Error: HADOOP_HOME is not set. Hadoop job types will not run properly."
+fi
+
+if [ "HIVE_HOME" != "" ]; then
+ CLASSPATH=$CLASSPATH:$HIVE_HOME/conf
+fi
+
echo $azkaban_dir;
echo $CLASSPATH;