azkaban-memoizeit

Metric stats for Azkaban #364

12/4/2014 10:51:00 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
new file mode 100644
index 0000000..6c2524d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/AbstractMetric.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import org.apache.log4j.Logger;
+
+
+public abstract class AbstractMetric<T> implements IMetric<T> {
+  protected static final Logger logger = Logger.getLogger(MetricReportManager.class);
+  protected String name;
+  protected T value;
+  protected String type;
+  protected MetricReportManager metricManager;
+
+  public AbstractMetric(String metricName, String metricType, T initialValue) {
+    name = metricName;
+    type = metricType;
+    value = initialValue;
+    metricManager = null;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getValueType() {
+    return type;
+  }
+
+  public void setMetricManager(final MetricReportManager manager) {
+    metricManager = manager;
+  }
+
+  public T getValue() {
+    return value;
+  }
+
+  protected void notifyManager() {
+    logger.debug(String.format("Notifying Manager for %s", this.getClass().getName()));
+    try {
+      metricManager.reportMetric(this);
+    } catch (NullPointerException ex) {
+      logger.error(String.format("Metric Manager is not set for %s metric %s", this.getClass().getName(), ex.toString()));
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
new file mode 100644
index 0000000..7226612
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/GangliaMetricEmitter.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import azkaban.utils.Props;
+
+
+public class GangliaMetricEmitter implements IMetricEmitter {
+  private static final String GANGLIA_METRIC_REPORTER_PATH = "azkaban.metric.ganglia.path";
+  private static final String GANGLIA_METRIC_REPORTER_GROUP = "azkaban.metric.ganglia.group";
+
+  private String gmetricPath;
+  private String gangliaGroup;
+
+  public GangliaMetricEmitter(Props azkProps) {
+    gmetricPath = azkProps.get(GANGLIA_METRIC_REPORTER_PATH);
+    gangliaGroup = azkProps.get(GANGLIA_METRIC_REPORTER_GROUP);
+  }
+
+  private String buildCommand(IMetric<?> metric) {
+    return String.format("%s -t %s -g \"%s\" -n %s -v %s", gmetricPath, metric.getValueType(), gangliaGroup,
+        metric.getName(), metric.getValue().toString());
+  }
+
+  @Override
+  public void reportMetric(final IMetric<?> metric) throws Exception {
+    String gangliaCommand = buildCommand(metric);
+    synchronized (metric) {
+      // executes shell command to report metric to ganglia dashboard
+      Process emission = Runtime.getRuntime().exec(gangliaCommand);
+      int exitCode = emission.waitFor();
+      if (exitCode != 0) {
+        throw new RuntimeException("Failed to report metric using gmetric");
+      }
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetric.java b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
new file mode 100644
index 0000000..7422ba4
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetric.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+public interface IMetric<T> {
+  String getName();
+  String getValueType();
+  void setMetricManager(final MetricReportManager manager);
+  void UpdateValueAndNotifyManager();
+  T getValue();
+}
diff --git a/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java b/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
new file mode 100644
index 0000000..963aa7c
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/IMetricEmitter.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+public interface IMetricEmitter {
+  void reportMetric(final IMetric<?> metric) throws Exception;
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
new file mode 100644
index 0000000..fe68a32
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metric/MetricReportManager.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metric;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+
+
+public class MetricReportManager {
+  private static final int MAX_EMITTER_THREADS = 2;
+  private static final Logger logger = Logger.getLogger(MetricReportManager.class);
+
+  private List<IMetric<?>> metrics;
+  private IMetricEmitter metricEmitter;
+  private ExecutorService executorService;
+  private static volatile MetricReportManager instance = null;
+
+  // For singleton
+  private MetricReportManager() {
+    executorService = Executors.newFixedThreadPool(MAX_EMITTER_THREADS);
+    metrics = new ArrayList<IMetric<?>>();
+  }
+
+  public static boolean isInstantiated() {
+    return instance != null;
+  }
+
+  public static MetricReportManager getInstance() {
+    if (instance == null) {
+      synchronized (MetricReportManager.class) {
+        if (instance == null) {
+          logger.info("Instantiating MetricReportManager");
+          instance = new MetricReportManager();
+        }
+      }
+    }
+    return instance;
+  }
+
+  // each element of metrics List is responsible to call this method and report metrics
+  public void reportMetric(final IMetric<?> metric) {
+    if (metric != null) {
+      logger.debug(String.format("Submitting %s metric for metric emission pool", metric.getName()));
+      executorService.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            metricEmitter.reportMetric(metric);
+          } catch (Exception ex) {
+            logger.error(String.format("Failed to report %s metric due to %s", metric.getName(), ex.toString()));
+          }
+        }
+      });
+    }
+  }
+
+  public void setMetricEmitter(final IMetricEmitter emitter) {
+    metricEmitter = emitter;
+  }
+
+  public IMetricEmitter getMetricEmitter() {
+    return metricEmitter;
+  }
+
+  public void AddMetric(final IMetric<?> metric) {
+    // metric null or already present
+    if (metric != null && getMetricFromName(metric.getName()) == null) {
+      logger.debug(String.format("Adding %s metric in Metric Manager", metric.getName()));
+      metrics.add(metric);
+      metric.setMetricManager(this);
+    }
+  }
+
+  public IMetric<?> getMetricFromName(final String name) {
+    IMetric<?> metric = null;
+    if (name != null) {
+      for (IMetric<?> currentMetric : metrics) {
+        if (currentMetric.getName().equals(name)) {
+          metric = currentMetric;
+          break;
+        }
+      }
+    }
+    return metric;
+  }
+
+  protected void finalize() {
+    executorService.shutdown();
+  }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 89351ee..7d5a064 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -29,9 +29,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.log4j.Logger;
-
 import org.joda.time.DateTimeZone;
-
 import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.servlet.Context;
@@ -41,7 +39,12 @@ import org.mortbay.thread.QueuedThreadPool;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.JdbcExecutorLoader;
 import azkaban.execapp.jmx.JmxFlowRunnerManager;
+import azkaban.execapp.metric.NumRunningFlowMetric;
+import azkaban.execapp.metric.NumRunningJobMetric;
 import azkaban.jmx.JmxJettyServer;
+import azkaban.metric.GangliaMetricEmitter;
+import azkaban.metric.IMetricEmitter;
+import azkaban.metric.MetricReportManager;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectLoader;
 import azkaban.server.AzkabanServer;
@@ -114,6 +117,7 @@ public class AzkabanExecutorServer {
             .getClass().getClassLoader());
 
     configureMBeanServer();
+    configureMetricReports(runnerManager, props);
 
     try {
       server.start();
@@ -125,6 +129,23 @@ public class AzkabanExecutorServer {
     logger.info("Azkaban Executor Server started on port " + portNumber);
   }
 
+  /**
+   * Configure Metric Reporting as per azkaban.properties settings
+   *
+   */
+  private void configureMetricReports(FlowRunnerManager runnerManager, Props props) {
+    if (props.getBoolean("executor.metric.reports", false)) {
+      logger.info("Starting to configure Metric Reports");
+      MetricReportManager metricManager = MetricReportManager.getInstance();
+      IMetricEmitter metricEmitter = new GangliaMetricEmitter(props);
+      metricManager.setMetricEmitter(metricEmitter);
+
+      metricManager.AddMetric(new NumRunningJobMetric());
+      metricManager.AddMetric(new NumRunningFlowMetric(runnerManager));
+      logger.info("Copleted configuring Metric Reports");
+    }
+  }
+
   private ExecutorLoader createExecLoader(Props props) {
     return new JdbcExecutorLoader(props);
   }
@@ -161,7 +182,7 @@ public class AzkabanExecutorServer {
 
   /**
    * Returns the currently executing executor server, if one exists.
-   * 
+   *
    * @return
    */
   public static AzkabanExecutorServer getApp() {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index f73a30a..c051cfd 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -42,6 +42,7 @@ import azkaban.event.Event.Type;
 import azkaban.event.EventHandler;
 import azkaban.event.EventListener;
 import azkaban.execapp.event.FlowWatcher;
+import azkaban.execapp.metric.NumRunningJobMetric;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
@@ -52,6 +53,8 @@ import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
 import azkaban.flow.FlowProps;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.metric.IMetric;
+import azkaban.metric.MetricReportManager;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
 import azkaban.utils.Props;
@@ -60,7 +63,7 @@ import azkaban.utils.SwapQueue;
 
 /**
  * Class that handles the running of a ExecutableFlow DAG
- * 
+ *
  */
 public class FlowRunner extends EventHandler implements Runnable {
   private static final Layout DEFAULT_LAYOUT = new PatternLayout(
@@ -122,7 +125,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   /**
    * Constructor. This will create its own ExecutorService for thread pools
-   * 
+   *
    * @param flow
    * @param executorLoader
    * @param projectLoader
@@ -138,7 +141,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   /**
    * Constructor. If executorService is null, then it will create it's own for
    * thread pools.
-   * 
+   *
    * @param flow
    * @param executorLoader
    * @param projectLoader
@@ -356,7 +359,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   /**
    * Main method that executes the jobs.
-   * 
+   *
    * @throws Exception
    */
   private void runFlow() throws Exception {
@@ -725,7 +728,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   /**
    * Determines what the state of the next node should be. Returns null if the
    * node should not be run.
-   * 
+   *
    * @param node
    * @return
    */
@@ -810,9 +813,19 @@ public class FlowRunner extends EventHandler implements Runnable {
     jobRunner.setLogSettings(logger, jobLogFileSize, jobLogNumFiles);
     jobRunner.addListener(listener);
 
+    configureJobLevelMetrics(jobRunner);
+
     return jobRunner;
   }
 
+  private void configureJobLevelMetrics(JobRunner jobRunner) {
+    if(MetricReportManager.isInstantiated()) {
+      MetricReportManager metricManager = MetricReportManager.getInstance();
+      NumRunningJobMetric metric = (NumRunningJobMetric) metricManager.getMetricFromName(NumRunningJobMetric.NUM_RUNNING_JOB_METRIC_NAME);
+      jobRunner.addListener(metric);
+    }
+  }
+
   public void pause(String user) {
     synchronized (mainSyncObj) {
       if (!flowFinished) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
new file mode 100644
index 0000000..6f47f76
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningFlowMetric.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import azkaban.execapp.FlowRunnerManager;
+import azkaban.metric.AbstractMetric;
+
+
+public class NumRunningFlowMetric extends AbstractMetric<Integer> {
+  public static final String NUM_RUNNING_FLOW_METRIC_NAME = "NumRunningFlowMetric";
+  public static final String NUM_RUNNING_FLOW_METRIC_TYPE = "uint16";
+  private static final int NUM_RUNNING_FLOW_INTERVAL = 60 * 1000; //milliseconds
+
+  private FlowRunnerManager flowManager;
+  private Timer timer = new Timer();
+
+  public NumRunningFlowMetric(FlowRunnerManager flowRunnerManager) {
+    super(NUM_RUNNING_FLOW_METRIC_NAME, NUM_RUNNING_FLOW_METRIC_TYPE, 0);
+    logger.debug("Instantiated NumRunningFlowMetric");
+    flowManager = flowRunnerManager;
+
+    // schedule timer to trigger UpdateValueAndNotifyManager
+    timer.schedule(new TimerTask() {
+
+      @Override
+      public void run() {
+        UpdateValueAndNotifyManager();
+      }
+    }, NUM_RUNNING_FLOW_INTERVAL, NUM_RUNNING_FLOW_INTERVAL);
+
+  }
+
+  public void UpdateValueAndNotifyManager() {
+    value = flowManager.getNumRunningFlows();
+    notifyManager();
+  }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
new file mode 100644
index 0000000..957359b
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/metric/NumRunningJobMetric.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp.metric;
+
+import azkaban.event.Event;
+import azkaban.event.Event.Type;
+import azkaban.event.EventListener;
+import azkaban.metric.AbstractMetric;
+
+public class NumRunningJobMetric extends AbstractMetric<Integer> implements EventListener {
+  public static final String NUM_RUNNING_JOB_METRIC_NAME = "NumRunningJobMetric";
+  public static final String NUM_RUNNING_JOB_METRIC_TYPE = "uint16";
+
+  public NumRunningJobMetric() {
+    super(NUM_RUNNING_JOB_METRIC_NAME, NUM_RUNNING_JOB_METRIC_TYPE , 0);
+    logger.debug("Instantiated NumRunningJobMetric");
+  }
+
+  @Override
+  public void UpdateValueAndNotifyManager() {
+    metricManager.reportMetric(this);
+  }
+
+  @Override
+  public void handleEvent(Event event) {
+    if (event.getType() == Type.JOB_STARTED) {
+      value = value + 1;
+    } else if (event.getType() == Type.JOB_FINISHED) {
+      value = value - 1;
+    }
+    notifyManager();
+  }
+}