azkaban-uncached
Changes
src/java/azkaban/jmx/DisplayName.java 21(+21 -0)
src/java/azkaban/jmx/JmxExecutorManager.java 31(+31 -0)
src/java/azkaban/jmx/JmxJettyServer.java 129(+129 -0)
src/java/azkaban/jmx/JmxScheduler.java 31(+31 -0)
src/java/azkaban/jmx/JmxSchedulerMBean.java 15(+15 -0)
src/java/azkaban/jmx/JmxSLAManager.java 45(+45 -0)
src/java/azkaban/jmx/JmxSLAManagerMBean.java 20(+20 -0)
src/java/azkaban/jmx/ParameterName.java 36(+36 -0)
src/java/azkaban/sla/SLA.java 1(+0 -1)
src/java/azkaban/sla/SLAManager.java 23(+23 -0)
src/sql/create_schedule_table.sql 4(+1 -3)
src/web/js/azkaban.exflow.view.js 19(+15 -4)
Details
diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index 63925de..5246f9b 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -19,9 +19,14 @@ package azkaban.execapp;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.TimeZone;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
import org.apache.log4j.Logger;
import org.joda.time.DateTimeZone;
import org.mortbay.jetty.Server;
@@ -31,6 +36,10 @@ import org.mortbay.thread.QueuedThreadPool;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.JdbcExecutorLoader;
+import azkaban.jmx.JmxExecutorManager;
+import azkaban.jmx.JmxJettyServer;
+import azkaban.jmx.JmxSLAManager;
+import azkaban.jmx.JmxScheduler;
import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectLoader;
import azkaban.utils.Props;
@@ -64,6 +73,9 @@ public class AzkabanExecutorServer {
private Props executorGlobalProps;
private Server server;
+ private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
+ private MBeanServer mbeanServer;
+
/**
* Constructor
*
@@ -75,7 +87,7 @@ public class AzkabanExecutorServer {
int portNumber = props.getInt("executor.port", DEFAULT_PORT_NUMBER);
int maxThreads = props.getInt("executor.maxThreads", DEFAULT_THREAD_NUMBER);
- Server server = new Server(portNumber);
+ server = new Server(portNumber);
QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
server.setThreadPool(httpThreadPool);
@@ -100,6 +112,8 @@ public class AzkabanExecutorServer {
}
runnerManager.setGlobalProps(executorGlobalProps);
+ configureMBeanServer();
+
try {
server.start();
}
@@ -131,7 +145,7 @@ public class AzkabanExecutorServer {
public ExecutorLoader getExecutorLoader() {
return executionLoader;
}
-
+
/**
* Returns the global azkaban properties
*
@@ -277,4 +291,36 @@ public class AzkabanExecutorServer {
return props;
}
+
+ private void configureMBeanServer() {
+ logger.info("Registering MBeans...");
+ mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
+ registerMbean("jetty", new JmxJettyServer(server));
+ }
+
+ public void close() {
+ try {
+ for (ObjectName name : registeredMBeans) {
+ mbeanServer.unregisterMBean(name);
+ logger.info("Jmx MBean " + name.getCanonicalName() + " unregistered.");
+ }
+ } catch (Exception e) {
+ logger.error("Failed to cleanup MBeanServer", e);
+ }
+ }
+
+ private void registerMbean(String name, Object mbean) {
+ Class<?> mbeanClass = mbean.getClass();
+ ObjectName mbeanName;
+ try {
+ mbeanName = new ObjectName(mbeanClass.getName() + ":name=" + name);
+ mbeanServer.registerMBean(mbean, mbeanName);
+ logger.info("Bean " + mbeanClass.getCanonicalName() + " registered.");
+ registeredMBeans.add(mbeanName);
+ } catch (Exception e) {
+ logger.error("Error registering mbean " + mbeanClass.getCanonicalName(), e);
+ }
+
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 5054c48..7374b60 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -18,6 +18,7 @@ package azkaban.execapp;
import java.io.File;
import java.io.IOException;
+import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -75,6 +76,9 @@ public class FlowRunnerManager implements EventListener {
private Props globalProps;
+ private long lastSubmitterThreadCheckTime = -1;
+ private long lastCleanerThreadCheckTime = -1;
+
public FlowRunnerManager(Props props, ExecutorLoader executorLoader, ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
@@ -131,6 +135,7 @@ public class FlowRunnerManager implements EventListener {
public void run() {
while (!shutdown) {
try {
+ lastSubmitterThreadCheckTime = System.currentTimeMillis();
FlowRunner flowRunner = queue.take();
executorService.submit(flowRunner);
} catch (InterruptedException e) {
@@ -157,6 +162,7 @@ public class FlowRunnerManager implements EventListener {
while (!shutdown) {
synchronized (this) {
try {
+ lastCleanerThreadCheckTime = System.currentTimeMillis();
wait(RECENTLY_FINISHED_TIME_TO_LIVE);
// Cleanup old stuff.
cleanRecentlyFinished();
@@ -472,4 +478,43 @@ public class FlowRunnerManager implements EventListener {
throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
}
+
+ /**
+ * private ExecutorService executorService;
+ private SubmitterThread submitterThread;
+ private CleanerThread cleanerThread;
+ * @return
+ */
+ public long getLastCleanerThreadCheckTime() {
+ return lastCleanerThreadCheckTime;
+ }
+
+ public long getLastSubmitterThreadCheckTime() {
+ return lastSubmitterThreadCheckTime;
+ }
+
+ public boolean isSubmitterThreadActive() {
+ return this.submitterThread.isAlive();
+ }
+
+ public boolean isCleanerThreadActive() {
+ return this.cleanerThread.isAlive();
+ }
+
+ public State getSubmitterThreadState() {
+ return this.submitterThread.getState();
+ }
+
+ public State getCleanerThreadState() {
+ return this.cleanerThread.getState();
+ }
+
+ public boolean isExecutorThreadPoolShutdown() {
+ return executorService.isShutdown();
+ }
+
+ public int getNumExecutingFlows() {
+ return runningFlows.size();
+ }
+
}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 35208cb..72c8e3d 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -17,6 +17,7 @@
package azkaban.executor;
import java.io.IOException;
+import java.lang.Thread.State;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -57,6 +58,8 @@ public class ExecutorManager {
private ExecutorMailer mailer;
private ExecutingManagerUpdaterThread executingManager;
+ private long lastThreadCheckTime = -1;
+
public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
this.executorLoader = loader;
this.loadRunningFlows();
@@ -68,6 +71,26 @@ public class ExecutorManager {
executingManager.start();
}
+ public String getExecutorHost() {
+ return executorHost;
+ }
+
+ public int getExecutorPort() {
+ return executorPort;
+ }
+
+ public State getExecutorThreadState() {
+ return executingManager.getState();
+ }
+
+ public boolean isThreadActive() {
+ return executingManager.isAlive();
+ }
+
+ public long getLastThreadCheckTime() {
+ return lastThreadCheckTime;
+ }
+
private void loadRunningFlows() throws ExecutorManagerException {
runningFlows.putAll(executorLoader.fetchActiveFlows());
}
@@ -394,6 +417,8 @@ public class ExecutorManager {
public void run() {
while(!shutdown) {
try {
+ lastThreadCheckTime = System.currentTimeMillis();
+
Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
ArrayList<ExecutableFlow> finalizeFlows = new ArrayList<ExecutableFlow>();
src/java/azkaban/jmx/DisplayName.java 21(+21 -0)
diff --git a/src/java/azkaban/jmx/DisplayName.java b/src/java/azkaban/jmx/DisplayName.java
new file mode 100644
index 0000000..7a55f94
--- /dev/null
+++ b/src/java/azkaban/jmx/DisplayName.java
@@ -0,0 +1,21 @@
+package azkaban.jmx;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import javax.management.DescriptorKey;
+
+/**
+ * DisplayName - This annotation allows to supply a display name for a method in
+ * the MBean interface.
+ */
+@Documented
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface DisplayName {
+ @DescriptorKey("displayName")
+ String value();
+}
src/java/azkaban/jmx/JmxExecutorManager.java 31(+31 -0)
diff --git a/src/java/azkaban/jmx/JmxExecutorManager.java b/src/java/azkaban/jmx/JmxExecutorManager.java
new file mode 100644
index 0000000..35b18a1
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxExecutorManager.java
@@ -0,0 +1,31 @@
+package azkaban.jmx;
+
+import azkaban.executor.ExecutorManager;
+
+public class JmxExecutorManager implements JmxExecutorManagerMBean {
+ private ExecutorManager manager;
+
+ public JmxExecutorManager(ExecutorManager manager) {
+ this.manager = manager;
+ }
+
+ @Override
+ public int getNumRunningFlows() {
+ return this.manager.getRunningFlows().size();
+ }
+
+ @Override
+ public String getExecutorThreadState() {
+ return manager.getExecutorThreadState().toString();
+ }
+
+ @Override
+ public boolean isThreadActive() {
+ return manager.isThreadActive();
+ }
+
+ @Override
+ public Long getLastThreadCheckTime() {
+ return manager.getLastThreadCheckTime();
+ }
+}
diff --git a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
new file mode 100644
index 0000000..d5649a4
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -0,0 +1,15 @@
+package azkaban.jmx;
+
+public interface JmxExecutorManagerMBean {
+ @DisplayName("OPERATION: getNumRunningFlows")
+ public int getNumRunningFlows();
+
+ @DisplayName("OPERATION: getExecutorThreadState")
+ public String getExecutorThreadState();
+
+ @DisplayName("OPERATION: isThreadActive")
+ public boolean isThreadActive();
+
+ @DisplayName("OPERATION: getLastThreadCheckTime")
+ public Long getLastThreadCheckTime();
+}
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManager.java b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
new file mode 100644
index 0000000..9bc3e2c
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
@@ -0,0 +1,11 @@
+package azkaban.jmx;
+
+import azkaban.execapp.FlowRunnerManager;
+
+public class JmxFlowRunnerManager {
+ private FlowRunnerManager manager;
+
+ public JmxFlowRunnerManager(FlowRunnerManager manager) {
+ this.manager = manager;
+ }
+}
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
new file mode 100644
index 0000000..5c865c7
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
@@ -0,0 +1,5 @@
+package azkaban.jmx;
+
+public interface JmxFlowRunnerManagerMBean {
+
+}
src/java/azkaban/jmx/JmxJettyServer.java 129(+129 -0)
diff --git a/src/java/azkaban/jmx/JmxJettyServer.java b/src/java/azkaban/jmx/JmxJettyServer.java
new file mode 100644
index 0000000..6a74376
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxJettyServer.java
@@ -0,0 +1,129 @@
+package azkaban.jmx;
+
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Server;
+
+public class JmxJettyServer implements JmxJettyServerMBean {
+ private Server server;
+ private Connector connector;
+
+ public JmxJettyServer(Server server) {
+ this.server = server;
+ this.connector = server.getConnectors()[0];
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.server.isRunning();
+ }
+
+ @Override
+ public boolean isFailed() {
+ return this.server.isFailed();
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.server.isStopped();
+ }
+
+ @Override
+ public int getNumThreads() {
+ return this.server.getThreadPool().getThreads();
+ }
+
+ @Override
+ public int getNumIdleThreads() {
+ return this.server.getThreadPool().getIdleThreads();
+ }
+
+ @Override
+ public String getHost() {
+ return connector.getHost();
+ }
+
+ @Override
+ public int getPort() {
+ return connector.getPort();
+ }
+
+ @Override
+ public int getConfidentialPort() {
+ return connector.getConfidentialPort();
+ }
+
+ @Override
+ public int getConnections() {
+ return connector.getConnections();
+ }
+
+ @Override
+ public int getConnectionsOpen() {
+ return connector.getConnectionsOpen();
+ }
+
+ @Override
+ public int getConnectionsOpenMax() {
+ return connector.getConnectionsOpenMax();
+ }
+
+ @Override
+ public int getConnectionsOpenMin() {
+ return connector.getConnectionsOpenMin();
+ }
+
+ @Override
+ public long getConnectionsDurationAve() {
+ return connector.getConnectionsDurationAve();
+ }
+
+ @Override
+ public long getConnectionsDurationMax() {
+ return connector.getConnectionsDurationMax();
+ }
+
+ @Override
+ public long getConnectionsDurationMin() {
+ return connector.getConnectionsDurationMin();
+ }
+
+ @Override
+ public long getConnectionsDurationTotal() {
+ return connector.getConnectionsDurationTotal();
+ }
+
+ @Override
+ public long getConnectionsRequestAve() {
+ return connector.getConnectionsRequestsAve();
+ }
+
+ @Override
+ public long getConnectionsRequestMax() {
+ return connector.getConnectionsRequestsMax();
+ }
+
+ @Override
+ public long getConnectionsRequestMin() {
+ return connector.getConnectionsRequestsMin();
+ }
+
+ @Override
+ public void turnStatsOn() {
+ connector.setStatsOn(true);
+ }
+
+ @Override
+ public void turnStatsOff() {
+ connector.setStatsOn(false);
+ }
+
+ @Override
+ public void resetStats() {
+ connector.statsReset();
+ }
+
+ @Override
+ public boolean isStatsOn() {
+ return connector.getStatsOn();
+ }
+}
diff --git a/src/java/azkaban/jmx/JmxJettyServerMBean.java b/src/java/azkaban/jmx/JmxJettyServerMBean.java
new file mode 100644
index 0000000..7b1f046
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxJettyServerMBean.java
@@ -0,0 +1,72 @@
+package azkaban.jmx;
+
+public interface JmxJettyServerMBean {
+ @DisplayName("OPERATION: isRunning")
+ public boolean isRunning();
+
+ @DisplayName("OPERATION: isFailed")
+ public boolean isFailed();
+
+ @DisplayName("OPERATION: isStopped")
+ public boolean isStopped();
+
+ @DisplayName("OPERATION: getNumThreads")
+ public int getNumThreads();
+
+ @DisplayName("OPERATION: getNumIdleThreads")
+ public int getNumIdleThreads();
+
+ @DisplayName("OPERATION: getHost")
+ public String getHost();
+
+ @DisplayName("OPERATION: getPort")
+ public int getPort();
+
+ @DisplayName("OPERATION: getConfidentialPort")
+ public int getConfidentialPort();
+
+ @DisplayName("OPERATION: getConnections")
+ public int getConnections();
+
+ @DisplayName("OPERATION: getConnectionsOpen")
+ public int getConnectionsOpen();
+
+ @DisplayName("OPERATION: getConnectionsOpenMax")
+ public int getConnectionsOpenMax();
+
+ @DisplayName("OPERATION: getConnectionsOpenMin")
+ public int getConnectionsOpenMin();
+
+ @DisplayName("OPERATION: getConnectionsDurationAve")
+ public long getConnectionsDurationAve();
+
+ @DisplayName("OPERATION: getConnectionsDurationMax")
+ public long getConnectionsDurationMax();
+
+ @DisplayName("OPERATION: getConnectionsDurationMin")
+ public long getConnectionsDurationMin();
+
+ @DisplayName("OPERATION: getConnectionsDurationTotal")
+ public long getConnectionsDurationTotal();
+
+ @DisplayName("OPERATION: getConnectionsRequestAve")
+ public long getConnectionsRequestAve();
+
+ @DisplayName("OPERATION: getConnectionsRequestMax")
+ public long getConnectionsRequestMax();
+
+ @DisplayName("OPERATION: getConnectionsRequestMin")
+ public long getConnectionsRequestMin();
+
+ @DisplayName("OPERATION: turnStatsOn")
+ public void turnStatsOn();
+
+ @DisplayName("OPERATION: turnStatsOff")
+ public void turnStatsOff();
+
+ @DisplayName("OPERATION: resetStats")
+ public void resetStats();
+
+ @DisplayName("OPERATION: isStatsOn")
+ public boolean isStatsOn();
+}
src/java/azkaban/jmx/JmxScheduler.java 31(+31 -0)
diff --git a/src/java/azkaban/jmx/JmxScheduler.java b/src/java/azkaban/jmx/JmxScheduler.java
new file mode 100644
index 0000000..73bcf98
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxScheduler.java
@@ -0,0 +1,31 @@
+package azkaban.jmx;
+
+import azkaban.scheduler.ScheduleManager;
+
+public class JmxScheduler implements JmxSchedulerMBean {
+ private final ScheduleManager manager;
+
+ public JmxScheduler(ScheduleManager manager) {
+ this.manager = manager;
+ }
+
+ @Override
+ public String getScheduleThreadState() {
+ return manager.getThreadState().toString();
+ }
+
+ @Override
+ public Long getNextScheduleTime() {
+ return manager.getNextUpdateTime();
+ }
+
+ @Override
+ public Long getLastThreadCheckTime() {
+ return manager.getLastCheckTime();
+ }
+
+ @Override
+ public Boolean isThreadActive() {
+ return manager.isThreadActive();
+ }
+}
\ No newline at end of file
src/java/azkaban/jmx/JmxSchedulerMBean.java 15(+15 -0)
diff --git a/src/java/azkaban/jmx/JmxSchedulerMBean.java b/src/java/azkaban/jmx/JmxSchedulerMBean.java
new file mode 100644
index 0000000..19a8b70
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxSchedulerMBean.java
@@ -0,0 +1,15 @@
+package azkaban.jmx;
+
+public interface JmxSchedulerMBean {
+ @DisplayName("OPERATION: getScheduleThreadState")
+ String getScheduleThreadState();
+
+ @DisplayName("OPERATION: getNextScheduleTime")
+ Long getNextScheduleTime();
+
+ @DisplayName("OPERATION: getLastCheckTime")
+ Long getLastThreadCheckTime();
+
+ @DisplayName("OPERATION: isThreadActive")
+ Boolean isThreadActive();
+}
\ No newline at end of file
src/java/azkaban/jmx/JmxSLAManager.java 45(+45 -0)
diff --git a/src/java/azkaban/jmx/JmxSLAManager.java b/src/java/azkaban/jmx/JmxSLAManager.java
new file mode 100644
index 0000000..b8e89a4
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxSLAManager.java
@@ -0,0 +1,45 @@
+package azkaban.jmx;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import azkaban.sla.SLA;
+import azkaban.sla.SLAManager;
+
+public class JmxSLAManager implements JmxSLAManagerMBean {
+ private final SLAManager manager;
+
+ public JmxSLAManager(SLAManager manager) {
+ this.manager = manager;
+ }
+
+ @Override
+ public String getSLAThreadState() {
+ return manager.getSLAThreadState().toString();
+ }
+
+ @Override
+ public boolean isThreadActive() {
+ return manager.isThreadActive();
+ }
+
+ @Override
+ public Long getLastThreadCheckTime() {
+ return manager.getLastCheckTime();
+ }
+
+ @Override
+ public int getNumActiveSLA() {
+ return manager.getNumActiveSLA();
+ }
+
+ @Override
+ public List<String> getSLASummary() {
+ ArrayList<String> summary = new ArrayList<String>();
+ for(SLA sla: manager.getSLAList()) {
+ summary.add(sla.toString());
+ }
+ return summary;
+ }
+
+}
src/java/azkaban/jmx/JmxSLAManagerMBean.java 20(+20 -0)
diff --git a/src/java/azkaban/jmx/JmxSLAManagerMBean.java b/src/java/azkaban/jmx/JmxSLAManagerMBean.java
new file mode 100644
index 0000000..1378b57
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxSLAManagerMBean.java
@@ -0,0 +1,20 @@
+package azkaban.jmx;
+
+import java.util.List;
+
+public interface JmxSLAManagerMBean {
+ @DisplayName("OPERATION: getSLAThreadState")
+ String getSLAThreadState();
+
+ @DisplayName("OPERATION: isThreadActive")
+ boolean isThreadActive();
+
+ @DisplayName("OPERATION: getLastThreadCheckTime")
+ Long getLastThreadCheckTime();
+
+ @DisplayName("OPERATION: getNumActiveSLA")
+ int getNumActiveSLA();
+
+ @DisplayName("OPERATION: getSLASummary")
+ List<String> getSLASummary();
+}
src/java/azkaban/jmx/ParameterName.java 36(+36 -0)
diff --git a/src/java/azkaban/jmx/ParameterName.java b/src/java/azkaban/jmx/ParameterName.java
new file mode 100644
index 0000000..3ee0b97
--- /dev/null
+++ b/src/java/azkaban/jmx/ParameterName.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2011 Adconion, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jmx;
+
+/**
+ * ParameterName - This annotation allows to supply
+ * a parameter name for a method in the MBean interface.
+ */
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import javax.management.DescriptorKey;
+
+@Documented
+@Target(ElementType.PARAMETER)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ParameterName {
+ @DescriptorKey("parameterName")
+ String value();
+}
diff --git a/src/java/azkaban/scheduler/ScheduleLoader.java b/src/java/azkaban/scheduler/ScheduleLoader.java
index 9caed47..276b10e 100644
--- a/src/java/azkaban/scheduler/ScheduleLoader.java
+++ b/src/java/azkaban/scheduler/ScheduleLoader.java
@@ -16,7 +16,6 @@
package azkaban.scheduler;
-import java.sql.Connection;
import java.util.List;
public interface ScheduleLoader {
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index ad67def..a15907b 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -16,8 +16,8 @@
package azkaban.scheduler;
+import java.lang.Thread.State;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -35,15 +35,12 @@ import org.joda.time.format.DateTimeFormatter;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
-import azkaban.executor.ExecutableFlow.FailureAction;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.flow.Flow;
-import azkaban.jobExecutor.utils.JobExecutionException;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
-
import azkaban.scheduler.Schedule.FlowOptions;
import azkaban.scheduler.Schedule.SlaOptions;
import azkaban.sla.SLA.SlaAction;
@@ -70,6 +67,10 @@ public class ScheduleManager {
private final ExecutorManager executorManager;
private final ProjectManager projectManager;
private final SLAManager slaManager;
+
+ // Used for mbeans to query Scheduler status
+ private long lastCheckTime = -1;
+ private long nextWakupTime = -1;
/**
* Give the schedule manager a loader class that will properly load the
@@ -336,6 +337,7 @@ public class ScheduleManager {
while (stillAlive.get()) {
synchronized (this) {
try {
+ lastCheckTime = System.currentTimeMillis();
// TODO clear up the exception handling
Schedule s = schedules.peek();
@@ -343,6 +345,7 @@ public class ScheduleManager {
// If null, wake up every minute or so to see if
// there's something to do. Most likely there will not be.
try {
+ nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
this.wait(TIMEOUT_MS);
} catch (InterruptedException e) {
// interruption should occur when items are added or removed from the queue.
@@ -454,6 +457,7 @@ public class ScheduleManager {
// wait until flow run
long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
try {
+ nextWakupTime = System.currentTimeMillis() + millisWait;
this.wait(Math.min(millisWait, TIMEOUT_MS));
} catch (InterruptedException e) {
// interruption should occur when items are
@@ -491,4 +495,20 @@ public class ScheduleManager {
}
}
}
+
+ public long getLastCheckTime() {
+ return lastCheckTime;
+ }
+
+ public long getNextUpdateTime() {
+ return nextWakupTime;
+ }
+
+ public State getThreadState() {
+ return runner.getState();
+ }
+
+ public boolean isThreadActive() {
+ return runner.isAlive();
+ }
}
src/java/azkaban/sla/SLA.java 1(+0 -1)
diff --git a/src/java/azkaban/sla/SLA.java b/src/java/azkaban/sla/SLA.java
index eb08229..8611b95 100644
--- a/src/java/azkaban/sla/SLA.java
+++ b/src/java/azkaban/sla/SLA.java
@@ -216,7 +216,6 @@ public class SLA {
}
slaObj.put("jobSettings", settingsObj);
}
-
return slaObj;
}
src/java/azkaban/sla/SLAManager.java 23(+23 -0)
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
index ae17595..967228c 100644
--- a/src/java/azkaban/sla/SLAManager.java
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -1,5 +1,6 @@
package azkaban.sla;
+import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
@@ -64,6 +65,8 @@ public class SLAManager {
private final ExecutorManager executorManager;
private SlaMailer mailer;
+ private long lastCheckTime = -1;
+
/**
* Give the sla manager a loader class that will properly load the
* sla.
@@ -195,6 +198,8 @@ public class SLAManager {
while (stillAlive.get()) {
synchronized (this) {
try {
+ lastCheckTime = System.currentTimeMillis();
+
// TODO clear up the exception handling
SLA s = SLAs.peek();
@@ -463,5 +468,23 @@ public class SLAManager {
}
}
+ public int getNumActiveSLA() {
+ return runner.getRunnerSLAs().size();
+ }
+ public State getSLAThreadState() {
+ return runner.getState();
+ }
+
+ public boolean isThreadActive() {
+ return runner.isAlive();
+ }
+
+ public List<SLA> getSLAList() {
+ return runner.getRunnerSLAs();
+ }
+
+ public long getLastCheckTime() {
+ return lastCheckTime;
+ }
}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index e3cceae..bbca3cf 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -19,6 +19,7 @@ package azkaban.webapp;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.net.MalformedURLException;
import java.net.URL;
@@ -30,6 +31,9 @@ import java.util.Comparator;
import java.util.List;
import java.util.TimeZone;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.velocity.app.VelocityEngine;
@@ -46,6 +50,10 @@ import org.mortbay.thread.QueuedThreadPool;
import azkaban.executor.ExecutorManager;
import azkaban.executor.JdbcExecutorLoader;
+import azkaban.jmx.JmxExecutorManager;
+import azkaban.jmx.JmxJettyServer;
+import azkaban.jmx.JmxSLAManager;
+import azkaban.jmx.JmxScheduler;
import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectManager;
@@ -97,7 +105,7 @@ import joptsimple.OptionSpec;
*/
public class AzkabanWebServer implements AzkabanServer {
private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
-
+
public static final String AZKABAN_HOME = "AZKABAN_HOME";
public static final String DEFAULT_CONF_PATH = "conf";
public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
@@ -117,6 +125,7 @@ public class AzkabanWebServer implements AzkabanServer {
private final VelocityEngine velocityEngine;
+ private final Server server;
private UserManager userManager;
private ProjectManager projectManager;
private ExecutorManager executorManager;
@@ -129,20 +138,24 @@ public class AzkabanWebServer implements AzkabanServer {
private SessionCache sessionCache;
private File tempDir;
private List<ViewerPlugin> viewerPlugins;
+
+ private MBeanServer mbeanServer;
+ private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
/**
* Constructor usually called by tomcat AzkabanServletContext to create the
* initial server
*/
public AzkabanWebServer() throws Exception {
- this(loadConfigurationFromAzkabanHome());
+ this(null, loadConfigurationFromAzkabanHome());
}
/**
* Constructor
*/
- public AzkabanWebServer(Props props) throws Exception {
+ public AzkabanWebServer(Server server, Props props) throws Exception {
this.props = props;
+ this.server = server;
velocityEngine = configureVelocityEngine(props.getBoolean(VELOCITY_DEV_MODE_PARAM, false));
sessionCache = new SessionCache(props);
userManager = loadUserManager(props);
@@ -162,6 +175,8 @@ public class AzkabanWebServer implements AzkabanServer {
logger.info("Setting timezone to " + timezone);
}
+
+ configureMBeanServer();
}
@@ -378,7 +393,6 @@ public class AzkabanWebServer implements AzkabanServer {
logger.error("Exiting Azkaban...");
return;
}
- app = new AzkabanWebServer(azkabanSettings);
// int portNumber =
// azkabanSettings.getInt("jetty.port",DEFAULT_PORT_NUMBER);
@@ -388,7 +402,6 @@ public class AzkabanWebServer implements AzkabanServer {
DEFAULT_THREAD_NUMBER);
logger.info("Setting up Jetty Server with port:" + sslPortNumber + " and numThreads:" + maxThreads);
-
final Server server = new Server();
SslSocketConnector secureConnector = new SslSocketConnector();
secureConnector.setPort(sslPortNumber);
@@ -399,7 +412,8 @@ public class AzkabanWebServer implements AzkabanServer {
secureConnector.setTrustPassword(azkabanSettings.getString("jetty.trustpassword"));
server.addConnector(secureConnector);
-
+ app = new AzkabanWebServer(server, azkabanSettings);
+
QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
server.setThreadPool(httpThreadPool);
@@ -445,6 +459,7 @@ public class AzkabanWebServer implements AzkabanServer {
logger.info("Shutting down http server...");
try {
app.getScheduleManager().shutdown();
+ app.close();
server.stop();
server.destroy();
}
@@ -671,5 +686,38 @@ public class AzkabanWebServer implements AzkabanServer {
return props;
}
+ private void configureMBeanServer() {
+ logger.info("Registering MBeans...");
+ mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
+ registerMbean("jetty", new JmxJettyServer(server));
+ registerMbean("scheduler", new JmxScheduler(scheduleManager));
+ registerMbean("slaManager", new JmxSLAManager(slaManager));
+ registerMbean("executorManager", new JmxExecutorManager(executorManager));
+ }
+ public void close() {
+ try {
+ for (ObjectName name : registeredMBeans) {
+ mbeanServer.unregisterMBean(name);
+ logger.info("Jmx MBean " + name.getCanonicalName() + " unregistered.");
+ }
+ } catch (Exception e) {
+ logger.error("Failed to cleanup MBeanServer", e);
+ }
+ }
+
+ private void registerMbean(String name, Object mbean) {
+ Class<?> mbeanClass = mbean.getClass();
+ ObjectName mbeanName;
+ try {
+ mbeanName = new ObjectName(mbeanClass.getName() + ":name=" + name);
+ mbeanServer.registerMBean(mbean, mbeanName);
+ logger.info("Bean " + mbeanClass.getCanonicalName() + " registered.");
+ registeredMBeans.add(mbeanName);
+ } catch (Exception e) {
+ logger.error("Error registering mbean " + mbeanClass.getCanonicalName(), e);
+ }
+
+ }
}
src/sql/create_schedule_table.sql 4(+1 -3)
diff --git a/src/sql/create_schedule_table.sql b/src/sql/create_schedule_table.sql
index 462e244..a2ca6d2 100644
--- a/src/sql/create_schedule_table.sql
+++ b/src/sql/create_schedule_table.sql
@@ -1,5 +1,3 @@
-DROP TABLE if exists schedules;
-
CREATE TABLE schedules (
project_id INT NOT NULL,
project_name VARCHAR(128) NOT NULL,
@@ -13,7 +11,7 @@ CREATE TABLE schedules (
submit_time BIGINT,
submit_user VARCHAR(128),
enc_type TINYINT,
- schedule_options LONGBLOB,
+ schedule_options LONGBLOB,
primary key(project_id, flow_name)
) ENGINE=InnoDB;
src/web/js/azkaban.exflow.view.js 19(+15 -4)
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index cd6ff66..4fb7326 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -193,13 +193,24 @@ azkaban.FlowTabView= Backbone.View.extend({
// Nodes can be in a killed state, even if the parents have succeeded due to failure option Finish running
// We want to re-enable those.
var shouldAdd = true;
- for(var key in node.in) {
- var dependency = node.in[key];
- if (dependency.status != 'SUCCEEDED' && dependency.status!='SKIPPED') {
+ if (node.in) {
+ var size = 0;
+ for(var key in node.in) {
+ size++;
+ var dependency = node.in[key];
+ if (dependency.status != 'SUCCEEDED' && dependency.status!='SKIPPED') {
+ shouldAdd = false;
+ break;
+ }
+ }
+
+ if (size == 0) {
shouldAdd = false;
- break;
}
}
+ else {
+ shouldAdd = false;
+ }
if (shouldAdd) {
failedJobs.push(node.id);