import os import sys import pprint import subprocess import argparse import json from library import get_time, add_time_options import sqlalchemy as sa sys.path.insert(1, os.environ['LAPGALAXY'] + '/lib') from galaxy.util.properties import load_app_properties import galaxy.config from galaxy.objectstore import build_object_store_from_config from galaxy.model import mapping from datetime import date, timedelta,datetime class Report: def __init__(self): config = os.environ['LAPGALAXY'] + "/config/galaxy.ini" self.model, self.object_store, self.engine = self._init(config) self.sa_session = self.model.context.current def _init(self, config): if config.startswith('/'): config = os.path.abspath(config) else: config = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, config)) properties = load_app_properties(ini_file=config) config = galaxy.config.Configuration(**properties) object_store = build_object_store_from_config(config) return ( mapping.init( config.file_path, config.database_connection, create_tables=False, object_store=object_store ), object_store, config.database_connection.split(':')[0] ) def retrieve_users(self, gold_data): result = self.sa_session.query(self.model.User).all() users = {} reverse_user ={} for user in result: nuser = vars(user) email = nuser['email'] reverse_user[nuser['id']]= email user_data = {} users[email] = user_data user_data['create_time'] = nuser['create_time'] user_data['update_time'] = nuser['update_time'] user_data['id'] = nuser['id'] user_data['email'] = email if len(gold_data) > 0: if email not in gold_data: print "The user '"+email+"' is not in gold! This should not be the case and should be corrected!" sys.exit(1) user_data["gold"] = gold_data[email] return users, reverse_user def retrieve_jobs(self, users, reverse_user, date_start, date_end, fail=False): result = self.sa_session.query(self.model.Job)\ .filter(sa.and_( self.model.Job.table.c.create_time >= date_start, self.model.Job.table.c.create_time < date_end + timedelta(days=1) ))\ .all() for job in result: job = vars(job) job_data = {} job_data['job_runner_name'] = job['job_runner_name'] job_data['state'] = job['state'] job_data['tool_id'] = job['tool_id'] job_data['update_time'] = job['update_time'] job_data['create_time'] = job['create_time'] td = (job['update_time'] - job['create_time']) duration = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6 job_data['duration'] = duration if fail is True: if job['state'] == 'ok' or (job['state'] == 'deleted' and job['exit_code'] == 0): continue else: job_data['dump'] = pprint.pformat(job) time = job['create_time'] user = reverse_user[job['user_id']] user_data = users[user] if 'jobs' not in user_data: user_data['jobs'] = {} table = user_data['jobs'] key = str(job['id']) + "." + user if 'gold' in user_data and key in user_data['gold']['jobs'] and job['job_runner_name'] == "drmaa": slurm_data = user_data['gold']['jobs'][key] if 'slurm' not in table: table['slurm'] = [] job_data['slurm'] = slurm_data table['slurm'].append(job_data) else: if 'local' not in table: table['local'] = [] table['local'].append(job_data) def retrieve_jobs_gold(self, gold_data): cmd = 'sudo /opt/gold/bin/glsjob --raw -quiet --show JobId,User,Project,Charge,Processors,Stage' result = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) for line in result.stdout.readlines(): #read and store result in log file row = line.split('|') if row[5].strip() != "Charge": continue job_id = row[0].strip() user = row[1].strip() project = row[2].strip() charge = row[3].strip() cpus = int(row[4].strip()) amount = int(charge) if user not in gold_data: print "User '"+ user + "' is not in accounting system but have jobs associated to a project '" + project + "'. This should never happen!" sys.exit(1) data = gold_data[user] job_data = {} job_data['amount'] = amount job_data['project'] = project job_data['cpus'] = cpus data['total'][project]['count'] = data['total'][project]['count'] + 1 data['total'][project]['used'] = data['total'][project]['used'] + amount key = job_id + "." + user data['jobs'][key] = job_data def retrieve_account_gold(self): cmd = 'sudo /opt/gold/bin/glsaccount --raw -quiet --show Amount,Project,Users' result = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) out = {} for line in result.stdout.readlines(): #read and store result in log file row = line.split('|') amount = row[0].strip() users = row[2].strip() project = row[1].strip() ulist = users.split(',') if len(ulist) > 0 and ulist[0] == 'memebers': ulist = ulist[1:len(ulist)] for user in ulist: data = {} data['total'] = {} data['jobs'] = {} if user in out: data = out[user] else: out[user] = data data['total'][project] = {} data['total'][project]['remaining'] = amount data['total'][project]['count'] = 0 data['total'][project]['used'] = 0 return out def generate_report(self, start_date, end_date): gold_data = self.retrieve_account_gold() self.retrieve_jobs_gold(gold_data) users, reverse_user = self.retrieve_users(gold_data) self.retrieve_jobs(users, reverse_user, start_date, end_date) return users def generate_fail(self, start_date, end_date): gold_data = {} users, reverse_user = self.retrieve_users(gold_data) self.retrieve_jobs(users ,reverse_user, start_date, end_date, fail=True) return users def default(o): if type(o) is date or type(o) is datetime: return o.isoformat() def serialize_json(object, start, end): outx = {'data':object, 'generated_on' :date.today().isoformat() , "start": start.isoformat(), "end": end.isoformat()} return json.dumps(outx, indent=4, sort_keys=True, default=default) def write_output(args, out): if args.output is not None: output = args.output[0] output = open(output, 'w') output.write(out) output.close() else: print out if __name__ == "__main__": parser = argparse.ArgumentParser(prog='reporting script') add_time_options(parser) parser.add_argument('--output', nargs=1, help="Output file") parser.add_argument('--fail', action='store_true', help='Report failed jobs') args = parser.parse_args() if 'help' in args and args.hel is not None: parser.print_help() sys.exit(0) start, end = get_time(args) if args.fail is True: out = Report().generate_fail(start, end) else: out = Report().generate_report(start, end) out = serialize_json(out, start, end) write_output(args, out)