aws.py

471 lines | 18.825 kB Blame History Raw Download
import re
import os
import time
import datetime
import subprocess
import select
from threading import Thread
import shutil

import boto.rds
import boto
import boto.ec2
import boto.ec2.cloudwatch
import boto.ec2.autoscale
from boto.exception import EC2ResponseError
import paramiko


from cloudscale.distributed_jmeter.scripts.meet_sla_req import check
from cloudscale.distributed_jmeter.scripts.visualization.visualize import Visualize


class AWS:
    def __init__(self, cfg, scenario_path, r_path, output_path, logger, test=False):
        self.cfg = cfg
        self.r_path = r_path
        self.logger = logger
        self.scenario_path = scenario_path
        self.output_directory = output_path
        if not test:
            self.init()
            self.start()

    def init(self):
        self.key_name = self.cfg.get('EC2', 'key_name')
        self.startup_threads = self.cfg.get('TEST', 'startup_threads')
        self.rest_threads = self.cfg.get('TEST', 'rest_threads')
        self.host = self.cfg.get('SHOWCASE', 'host')
        self.user = self.cfg.get('EC2', 'remote_user')
        self.jmeter_url = self.cfg.get('SCENARIO', 'jmeter_url')
        self.region = self.cfg.get('AWS', 'region')
        self.access_key = self.cfg.get('AWS', 'aws_access_key_id')
        self.secret_key = self.cfg.get('AWS', 'aws_secret_access_key')
        self.num_jmeter_slaves = int(self.cfg.get('TEST', 'num_jmeter_slaves'))
        self.frontend_instances_identifier = self.cfg.get('SHOWCASE', 'frontend_instances_id')
        self.rds_identifiers = self.cfg.get('RDS', 'identifiers').split(',')
        self.is_autoscalable = True if self.cfg.get('SHOWCASE', 'autoscalable') == 'yes' else False
        self.num_threads = int(self.cfg.get('SCENARIO', 'num_threads'))
        self.instance_type = self.cfg.get('EC2', 'instance_type')
        self.ami_id = self.cfg.get('EC2', 'ami_id')
        self.scenario_duration = self.cfg.get('SCENARIO', 'duration_in_minutes')

        self.conn = boto.ec2.connect_to_region(
            self.region,
            aws_access_key_id=self.access_key,
            aws_secret_access_key=self.secret_key
        )

        self.key_pair = self.create_keypair()

        self.create_security_groups()

    def start(self):
        ips = []
        for i in xrange(self.num_jmeter_slaves):
            instance = self.create_instance("Creating master instance {0} ...".format(i + 1))
            time.sleep(15)
            self.logger.log(instance.ip_address)
            self.setup_master(instance.ip_address)
            ips.append(instance.ip_address)

        self.run_masters(ips)

    def setup_master(self, ip_addr):
        ssh = self.ssh_to_instance(ip_addr)

        scp = paramiko.SFTPClient.from_transport(ssh.get_transport())
        dirname = os.path.abspath(os.path.dirname(__file__))
        _, stdout, _ = ssh.exec_command('rm -rf /home/' + self.user + '/*')
        stdout.readlines()

        self.logger.log("Transfering jmeter_master.tar.gz ")
        _, stdout, _ = ssh.exec_command("wget -q -T90 %s -O jmeter.tar.gz" % self.jmeter_url)
        self.wait_for_command(stdout)

        self.logger.log("Transfering JMeter scenario ...")
        scp.put(self.scenario_path, 'scenario.jmx')

        self.logger.log("Unpacking JMeter ...")
        _, stdout, _ = ssh.exec_command("tar xvf jmeter.tar.gz")
        self.wait_for_command(stdout)

        _, stdout, _ = ssh.exec_command("find . -iname '._*' -exec rm -rf {} \;")
        self.wait_for_command(stdout)

    def wait_for_command(self, stdout, verbose=False):
        # Wait for the command to terminate
        while not stdout.channel.exit_status_ready():
            # Only print data if there is data to read in the channel
            if stdout.channel.recv_ready():
                rl, wl, xl = select.select([stdout.channel], [], [], 0.0)
                if len(rl) > 0:
                    response = stdout.channel.recv(1024)
                    if verbose:
                        print response


    def ssh_to_instance(self, ip_addr, i=0):
        try:
            if i < 3:
                ssh = paramiko.SSHClient()
                ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
                if self.key_pair:
                    ssh.connect(ip_addr, username=self.user, key_filename=os.path.abspath(self.key_pair))
                else:
                    ssh.connect(ip_addr, username=self.user, password="")
                return ssh
            raise Exception('Failed 3 times to SSH to %s' % ip_addr)
        except Exception as e:
            self.logger.log('%s\nTrying to reconnect ...' % e.message)
            time.sleep(30)
            return self.ssh_to_instance(ip_addr, i=i + 1)


    def run_masters(self, ips):
        start_time = datetime.datetime.utcnow()

        tmp_userpath = "/tmp/{0}".format(os.path.basename(self.scenario_path)[:-4])

        if not os.path.exists(tmp_userpath):
            os.makedirs(tmp_userpath, 0777)

        self.logger.log(self.output_directory)

        for ip in ips:
            self.logger.log("Running JMeter on instance %s" % ip)
            ssh = self.ssh_to_instance(ip)
            cmd = "(~/jmeter/bin/jmeter -n -t ~/scenario.jmx -l scenario.jtl -j scenario.log -Jall_threads=%s -Jstartup_threads=%s -Jrest_threads=%s -Jhost=%s;touch finish)" % (
                int(self.startup_threads)+int(self.rest_threads),
                self.startup_threads,
                self.rest_threads,
                self.host
            )
            self.logger.log(cmd)
            self.logger.log("Executing your JMeter scenario. This can take a while. Please wait ...")
            ssh.exec_command(cmd)
            ssh.close()

        i = 1
        threads = []
        for ip in ips:
            self.logger.log("Starting thread for %s" % ip)
            t = Thread(target=self.check_instance, args=(i, tmp_userpath, self.output_directory, ip))
            t.start()
            threads.append(t)
            i += 1

        for t in threads:
            t.join()

        self.terminate_instances(ips)
        end_time = datetime.datetime.utcnow()

        instances = self.get_instances_by_tag('Name', self.frontend_instances_identifier);
        instance_ids = [instance.id for instance in instances]
        rds_instance_ids = self.rds_identifiers
        ec2_data = self.get_cloudwatch_ec2_data(start_time, end_time, instance_ids)
        rds_data = self.get_cloudwatch_rds_data(start_time, end_time, rds_instance_ids)

        resultspath = self.output_directory

        cmd = "cp -r {0}/./ {1}/".format(tmp_userpath, resultspath)
        self.logger.log(cmd)
        p = subprocess.check_output(cmd.split())

        shutil.rmtree(tmp_userpath, True)

        filenames = ["{0}/scenario{1}.log".format(resultspath, j) for j in xrange(1, i)]
        self.logger.log(filenames)
        with open("{0}/scenario.log".format(resultspath), 'w') as outfile:
            for fname in filenames:
                with open(fname) as infile:
                    for line in infile:
                        outfile.write(line)
        cmd = "rm -rf %s" % " ".join(filenames)
        subprocess.call(cmd.split())
        cmd = "rm -rf %s/*.jtl" % resultspath
        subprocess.call(cmd.split())

        filenames = ["{0}/response-times-over-time{1}.csv".format(resultspath, j) for j in xrange(1, i)]
        self.logger.log(filenames)
        with open("{0}/response-times-over-time.csv".format(resultspath), 'w') as outfile:
            for fname in filenames:
                with open(fname) as infile:
                    for line in infile:
                        outfile.write(line)

        cmd = "rm -rf %s" % " ".join(filenames)
        subprocess.call(cmd.split())

        filename = "{0}/ec2-cpu.csv".format(resultspath)
        with open(filename, 'w') as fp:
            fp.write("instance_id,timestamp,average\n")
            for row in ec2_data:
                for data in row.get('data'):
                    fp.write("%s,%s,%s\n" % (row.get('instance_id'), self.unix_time(data['Timestamp']), data['Average']))

        filename = "{0}/rds-cpu.csv".format(resultspath)
        with open(filename, 'w') as fp:
            fp.write("instance_id,timestamp,average\n")
            for row in rds_data:
                for data in row.get('data'):
                    fp.write("%s,%s,%s\n" % (row.get('instance_id'), self.unix_time(data['Timestamp']), data['Average']))

        if self.is_autoscalable:
            activites = self.get_autoscalability_data(start_time, end_time)
            self.write_autoscalability_data(resultspath, activites)
        else:
            self.write_autoscalability_data(resultspath, [])

        slo_output = check("{0}/response-times-over-time.csv".format(resultspath))
        self.logger.log("<br>".join(slo_output).split('\n'))
        self.logger.log("Visualizing....")
        v = Visualize(self.num_threads, self.scenario_duration, self.r_path,
                      "{0}/response-times-over-time.csv".format(resultspath),
                      "{0}/autoscalability.log".format(resultspath))
        v.save()

        self.logger.log("finished!", fin=True)
        with open("{0}/finish".format(resultspath), "w") as fp:
            fp.write("finish")

    def unix_time(self, dt):

        epoch = datetime.datetime.fromtimestamp(0)
        delta = dt - epoch
        return delta.total_seconds()

    def unix_time_millis(self,dt):
        return self.unix_time(dt) * 1000.0

    def get_autoscalability_data(self, start_time, end_time):
        def get_action(activity):
            if activity.description.lower().find('terminating') > -1:
                return 'terminate'
            return 'launch'

        def filter_activites(activites):
            filtered_activites = []
            for activity in activites:
                instance_id = re.search('(i-.*)', activity.description.lower()).group(1)

                a = {
                    'instance_id': instance_id,
                    'start_time': activity.start_time + datetime.timedelta(hours=1),
                    'end_time': activity.end_time + datetime.timedelta(hours=1),
                    'action': get_action(activity)
                }
                filtered_activites.append(a)
            return filtered_activites

        def closest_activity(closest_activity, activites):
            for i in xrange(1, len(activites)):
                activity = activites[i]
                if activity['end_time'] > closest_activity['end_time'] and activity['end_time'] < start_time and \
                                activity['action'] == 'launch' and activity['instance_id'] not in terminating_ids:
                    closest_activity = activity
            return closest_activity

        autoscale = boto.ec2.autoscale.connect_to_region(
            self.region,
            aws_access_key_id=self.access_key,
            aws_secret_access_key=self.secret_key
        )
        activites = autoscale.get_all_activities('distributed_jmeter-as')
        launching_ids = []
        terminating_ids = []
        new_activites = filter_activites(activites)
        filtered_activites = []
        for activity in new_activites:
            if activity['end_time'] > start_time:
                filtered_activites.append(activity)

            if activity['action'] == 'terminate':
                terminating_ids.append(activity['instance_id'])

        # closest_activity_id = set(launching_ids) - set(terminating_ids) # get activites that were not terminated

        f_a = []
        for a in filtered_activites:
            for a2 in filtered_activites:
                if (a['action'] == 'terminate' and a2['instance_id'] == a['instance_id'] and a2[
                    'action'] == 'launch') or (a['action'] == 'launch' and a2['instance_id'] == a['instance_id']):
                    f_a.append(a)

        return f_a
        # return [closest_activity] + f_a

    def terminate_instances(self, ips):
        reservations = self.conn.get_all_instances()
        for res in reservations:
            for instance in res.instances:
                if instance.ip_address in ips:
                    self.conn.terminate_instances(instance_ids=[instance.id])

    def check_instance(self, i, tmp_userpath, resultspath, ip):
        cmd = "cat finish"

        ssh = self.ssh_to_instance(ip)
        _, _, stderr = ssh.exec_command(cmd)

        while len(stderr.readlines()) > 0:
            time.sleep(10)
            ssh.close()
            ssh = self.ssh_to_instance(ip)
            _, _, stderr = ssh.exec_command(cmd)

        self.logger.log("finishing thread " + str(i))
        ssh.close()
        ssh = self.ssh_to_instance(ip)
        scp = paramiko.SFTPClient.from_transport(ssh.get_transport())
        self.logger.log("jmeter scenario finished. collecting results")
        scp.get("/home/{0}/scenario.log".format(self.user),
                "{0}/{1}".format(tmp_userpath, "scenario" + str(i) + ".log"))
        # scp.get("/home/{0}/scenario.jtl".format(self.user),
        #         "{0}/{1}".format(tmp_userpath, "scenario" + str(i) + ".jtl"))
        scp.get("/home/{0}/response-times-over-time.csv".format(self.user),
                "{0}/{1}".format(tmp_userpath, "response-times-over-time" + str(i) + ".csv", self.user))
        # scp.get("/home/{0}/results-tree.xml".format(self.user), "{0}/{1}".format(tmp_userpath, "results-tree" + str(i) + ".xml", self.user))
        scp.close()
        ssh.close()

    def create_security_groups(self):
        self.logger.log("creating security groups ...")
        self.create_security_group('cs-jmeter', 'security group for jmeter', '8557', '0.0.0.0/0')
        self.add_security_group_rule('cs-jmeter', 'tcp', '1099', '0.0.0.0/0')
        # self.create_security_group('http', 'security group for http protocol', '80', '0.0.0.0/0')
        self.create_security_group('ssh', 'security group for http protocol', '22', '0.0.0.0/0')

    def create_security_group(self, name, description, port, cidr):
        try:
            self.conn.create_security_group(name, description)
            self.conn.authorize_security_group(group_name=name, ip_protocol='tcp', from_port=port, to_port=port,
                                               cidr_ip=cidr)
        except EC2ResponseError as e:
            if str(e.error_code) != 'InvalidGroup.Duplicate':
                raise

    def add_security_group_rule(self, group_name, protocol, port, cidr):
        try:
            group = self.conn.get_all_security_groups(groupnames=[group_name])[0]
            group.authorize(protocol, port, port, cidr)
        except EC2ResponseError as e:
            if str(e.error_code) != 'InvalidPermission.Duplicate':
                raise


    def create_instance(self, msg="creating ec2 instance"):
        self.logger.log(msg)
        res = self.conn.run_instances(self.ami_id, key_name=self.key_name,
                                      instance_type=self.instance_type,
                                      security_groups=['cs-jmeter', 'ssh', 'flask'])
        time.sleep(30)
        self.wait_available(res.instances[0])
        instance = self.conn.get_all_instances([res.instances[0].id])[0].instances[0]
        return instance

    def wait_available(self, instance):
        self.logger.log("waiting for instance to become available")
        self.logger.log("please wait ...")
        status = self.conn.get_all_instances(instance_ids=[instance.id])[0].instances[0].state
        i = 1
        while status != 'running':
            if i % 10 == 0:
                self.logger.log("please wait ...")
            status = self.conn.get_all_instances(instance_ids=[instance.id])[0].instances[0].state
            time.sleep(10)
            i = i + 1
        self.logger.log("instance is up and running")


    def write_config(self, config_path, instance):
        self.cfg.save_option(config_path, 'infrastructure', 'remote_user', 'ubuntu')
        self.cfg.save_option(config_path, 'infrastructure', 'ip_address', instance.ip_address)

    def write_autoscalability_data(self, resultspath, activites):
        with open("{0}/autoscalability.log".format(resultspath), "w") as fp:
            fp.write('"instance_id","start_time","end_time","action"\n')
            for activity in activites:
                fp.write('%s,%s,%s,%s\n' % (
                activity['instance_id'], str(activity['start_time']).split(".")[0], activity['end_time'],
                activity['action']))

    def create_keypair(self):
        try:
            keypair = self.conn.create_key_pair(self.key_name)
        except EC2ResponseError as e:
            if e.error_code == 'InvalidKeyPair.Duplicate':
                self.conn.delete_key_pair(key_name=self.key_name)
                keypair = self.conn.create_key_pair(self.key_name)
            else:
                raise e

        keypair.save(self.output_directory)
        return "%s/%s.pem" % (self.output_directory, self.key_name)

    def get_cloudwatch_ec2_data(self, start_time, end_time, instance_ids):
        conn = boto.ec2.cloudwatch.connect_to_region(
            self.region,
            aws_access_key_id=self.access_key,
            aws_secret_access_key=self.secret_key
        )
        data = []
        for instance_id in instance_ids:
            data.append({
                'instance_id': instance_id,
                'data': conn.get_metric_statistics(
                    60,
                    start_time,
                    end_time,
                    'CPUUtilization',
                    'AWS/EC2',
                    'Average',
                    dimensions={'InstanceId': [instance_id]}
                )
            })

        return data

    def get_cloudwatch_rds_data(self, start_time, end_time, instance_ids):
        conn = boto.ec2.cloudwatch.connect_to_region(
            self.region,
            aws_access_key_id=self.access_key,
            aws_secret_access_key=self.secret_key
        )
        data = []
        for instance_id in instance_ids:
            data.append({
                'instance_id': instance_id,
                'data': conn.get_metric_statistics(
                    60,
                    start_time,
                    end_time,
                    'CPUUtilization',
                    'AWS/RDS',
                    'Average',
                    dimensions={'DBInstanceIdentifier': [instance_id]}
                )
            })

        return data

    def get_instance_ids_from_ip(self, ips):
        instance_ids = []
        for ip in ips:
            instances = self.conn.get_only_instances()
            for instance in instances:
                if instance.ip_address == ip and instance:
                    instance_ids.append(instance.id)
                    break
        return instance_ids


    def get_instances_by_tag(self, tag, value):
        reservations = self.conn.get_all_instances()
        my_instances = []
        for res in reservations:
            for instance in res.instances:
                if tag in instance.tags and instance.tags[tag] == value:
                    my_instances.append(instance)
        return my_instances