flow-stats.js

360 lines | 9.507 kB Blame History Raw Download
/*
 * Copyright 2012 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

$.namespace('azkaban');

azkaban.FlowStatsModel = Backbone.Model.extend({});
azkaban.FlowStatsView = Backbone.View.extend({
  events: {
  },

  histogram: true,

  initialize: function(settings) {
    this.model.bind('change:view', this.handleChangeView, this);
    this.model.bind('render', this.render, this);
    if (settings.histogram != null) {
      this.histogram = settings.histogram;
    }
  },

  render: function(evt) {
  },

  show: function(execId) {
    this.analyzeExecution(execId);
  },

  fetchJobs: function(execId) {
    var requestURL = contextURL + "/executor";
    var requestData = {"execid": execId, "ajax":"fetchexecflow"};
    var jobs = [];
    var successHandler = function(data) {
      data.nodes.sort(function(a, b) {
        return a.startTime - b.startTime;
      });
      jobs = data.nodes;
    };
    $.ajax({
      url: requestURL,
      data: requestData,
      success: successHandler,
      dataType: "json",
      async: false
    });
    return jobs;
  },

  fetchJobStats: function(jobId, execId) {
    var requestURL = contextURL + "/executor";
    var requestData = {
      "execid": execId,
      "flowid": flowId,
      "jobid": jobId,
      "ajax": "fetchExecJobStats"
    };
    var stats = null;
    var successHandler = function(data) {
      stats = data;
    };
    $.ajax({
      url: requestURL,
      data: requestData,
      success: successHandler,
      dataType: "json",
      async: false
    });
    return stats;
  },

  updateStatsMapred: function(state, data, job) {
    var stats = data.stats;
    var mappers = parseInt(state.totalMappers);
    var reducers = parseInt(state.totalReducers);
    if (mappers >= stats.mapSlots.max) {
      stats.mapSlots.max = mappers;
      stats.mapSlots.job = job;
    }
    if (reducers >= stats.reduceSlots.max) {
      stats.reduceSlots.max = reducers;
      stats.reduceSlots.job = job;
    }
    stats.totalMapSlots += mappers;
    stats.totalReduceSlots += reducers;

  },

  updateStatsConf: function(conf, data, job) {
    var stats = data.stats;
    if (conf == null) {
      data.warnings.push("No job conf available for job " + job);
      return;
    }

    var jobMapMemoryMb = parseInt(conf['mapred.job.map.memory.mb']);
    if (jobMapMemoryMb >= stats.jobMapMemoryMb.max) {
      stats.jobMapMemoryMb.max = jobMapMemoryMb;
      stats.jobMapMemoryMb.job = job;
    }
    var jobReduceMemoryMb = parseInt(conf['mapred.job.reduce.memory.mb']);
    if (jobReduceMemoryMb >= stats.jobReduceMemoryMb.max) {
      stats.jobReduceMemoryMb.max = jobReduceMemoryMb;
      stats.jobReduceMemoryMb.job = job;
    }

    var childJavaOpts = conf['mapred.child.java.opts'];
    var parts = childJavaOpts.split(" ");
    for (var i = 0; i < parts.length; ++i) {
      var str = parts[i];
      if (str.indexOf('Xmx') > -1) {
        if (str.length <= 4) {
          continue;
        }
        var size = str.substring(4, str.length);
        var val = sizeStrToBytes(size);
        if (val >= stats.xmx.max) {
          stats.xmx.max = val;
          stats.xmx.str = size;
          stats.xmx.job = job;
        }
      }
      if (str.indexOf('Xms') > -1) {
        if (str.length <= 4) {
          continue;
        }
        var size = str.substring(4, str.length);
        var val = sizeStrToBytes(size);
        stats.xms.set = true;
        if (val >= stats.xms.max) {
          stats.xms.max = val;
          stats.xms.str = size;
          stats.xms.job = job;
        }
      }
    }

    var cacheFiles = conf['mapred.cache.files'];
    var cacheFilesFilesizes = conf['mapred.cache.files.filesizes'];
    if (cacheFiles != null && cacheFilesFilesizes != null) {
      stats.distributedCache.using = true;
      var parts = cacheFilesFilesizes.split(',');
      var size = 0;
      for (var i = 0; i < parts.length; ++i) {
        size += parseInt(parts[i]);
      }
      if (size >= stats.distributedCache.max) {
        stats.distributedCache.max = size;
        stats.distributedCache.job = job;
      }
    }
  },

  updateStatsCounters: function(state, data, job) {
    var stats = data.stats;
    if (state.counters == null) {
      data.warnings.push("No job counters available for job " + job);
      return;
    }
    var fileSystemCounters = state.counters['FileSystemCounters'];
    if (fileSystemCounters == null) {
      data.warnings.push("No FileSystemCounters available for job " + job);
      return;
    }
    var fileBytesRead = parseInt(fileSystemCounters['FILE_BYTES_READ']);
    if (fileBytesRead >= stats.fileBytesRead.max) {
      stats.fileBytesRead.max = fileBytesRead;
      stats.fileBytesRead.job = job;
    }

    var fileBytesWritten = parseInt(fileSystemCounters['FILE_BYTES_WRITTEN']);
    if (fileBytesWritten >= stats.fileBytesWritten.max) {
      stats.fileBytesWritten.max = fileBytesWritten;
      stats.fileBytesWritten.job = job;
    }

    var hdfsBytesRead = parseInt(fileSystemCounters['HDFS_BYTES_READ']);
    if (hdfsBytesRead >= stats.hdfsBytesRead.max) {
      stats.hdfsBytesRead.max = hdfsBytesRead;
      stats.hdfsBytesRead.job = job;
    }

    var hdfsBytesWritten = parseInt(fileSystemCounters['HDFS_BYTES_WRITTEN']);
    if (hdfsBytesWritten >= stats.hdfsBytesWritten.max) {
      stats.hdfsBytesWritten.max = hdfsBytesWritten;
      stats.hdfsBytesWritten.job = job;
    }
  },

  updateStats: function(jobStats, data, job) {
    var stats = data.stats;
    var state = jobStats.state;
    var conf = jobStats.conf;

    this.updateStatsMapred(state, data, job);
    this.updateStatsConf(conf, data, job);
    this.updateStatsCounters(state, data, job);
  },

  finalizeStats: function(data) {
    data.success = true;
  },

  analyzeExecution: function(execId) {
    var jobs = this.fetchJobs(execId);
    if (jobs == null) {
      this.model.set({'data': null});
      this.model.trigger('render');
      return;
    }

    var data = {
      success: false,
      message: null,
      warnings: [],
      durations: [],
      histogram: this.histogram,
      stats: {
        mapSlots: {
          max: 0,
          job: null
        },
        reduceSlots: {
          max: 0,
          job: null
        },
        totalMapSlots: 0,
        totalReduceSlots: 0,
        numJobs: jobs.length,
        longestTaskTime: 0,
        jobMapMemoryMb: {
          max: 0,
          job: null
        },
        jobReduceMemoryMb: {
          max: 0,
          job: null
        },
        xmx: {
          max: 0,
          str: null,
          job: null
        },
        xms: {
          set: false,
          max: 0,
          str: null,
          job: null
        },
        fileBytesRead: {
          max: 0,
          job: null
        },
        hdfsBytesRead: {
          max: 0,
          job: null
        },
        fileBytesWritten: {
          max: 0,
          job: null
        },
        hdfsBytesWritten: {
          max: 0,
          job: null
        },
        distributedCache: {
          using: false,
          max: 0,
          job: null
        },
      }
    };

    var jobsAnalyzed = 0;
    for (var i = 0; i < jobs.length; ++i) {
      var job = jobs[i];
      var duration = job.endTime - job.startTime;
      data.durations.push({
        job: job.id,
        duration: duration
      });

      var jobStats = this.fetchJobStats(job.id, execId);
      if (jobStats.jobStats == null) {
        data.warnings.push("No job stats available for job " + job.id);
        continue;
      }
      for (var j = 0; j < jobStats.jobStats.length; ++j) {
        this.updateStats(jobStats.jobStats[j], data, job.id);
      }
      ++jobsAnalyzed;
    }

    // If no jobs were analyzed, then no jobs had any job stats available. In
    // this case, display a No Flow Stats Available message.
    if (jobsAnalyzed == 0) {
      data.success = false;
      data.message = "There were no job stats provided by any job.";
    }
    else {
      this.finalizeStats(data);
    }

    this.model.set({'data': data});
    this.model.trigger('render');
  },

  render: function(evt) {
    var view = this;
    var data = this.model.get('data');
    if (data == null) {
      var msg = { message: "Error retrieving flow stats."};
      dust.render("flowstats-no-data", msg, function(err, out) {
        view.display(out);
      });
    }
    else if (data.success == false) {
      dust.render("flowstats-no-data", data, function(err, out) {
        view.display(out);
      });
    }
    else {
      var histogram = this.histogram;
      dust.render("flowstats", data, function(err, out) {
        view.display(out);
        if (histogram == true) {
          var yLabelFormatCallback = function(y) {
            var seconds = y / 1000.0;
            return seconds.toString() + " s";
          };

          Morris.Bar({
            element: "job-histogram",
            data: data.durations,
            xkey: "job",
            ykeys: ["duration"],
            labels: ["Duration"],
            yLabelFormat: yLabelFormatCallback
          });
        }
      });
    }
  },

  display: function(out) {
    $('#flow-stats-container').html(out);
  },
});