#!/usr/bin/env python import sys import traceback import cx_Oracle import re import datetime help = [ "1) The required setups are:", " a) setup sam_python", " b) setup oracle_instant_client", " c) setup cx_Oracle", "2) The special purpose arguments are any argument without an equal sign", " with either a question mark or in any case the word 'help' and the", " argument showquery with a equal sign followed by a boolean value.", "3) The all tests are of the general form of name=value for table column", " tests and category.type=value for request parameter test. The name", " field are case insensitive.", "4) The table column tests are request_id, request_type, request_status,", " work_grp_name, username, number_of_events, request_snap_id, comments,", ' is_archived, data_tier with a default of "thumbnail",' " created_on_or_after, created_before, updated_on_or_after", " updated_before, history and facility. The names of the tests are", " case insensitive. Except for history and facility, all the tests are", " on values in the work request table. History is on the associated work", " request status histories. The format of the history test is discussed", " below. Facility is on the facility name associated with the asociated", " work request handler.", "5) the processing time test, time, takes a colon (:) separated pair of", " floating point numbers, either of which may be omitted. The test is", " time >= left and time < right with the respective test omitted if the", " number is omitted.", "6) The only date formats supported are yyyy/mm/dd hh24:mi:ss,", " yyyy/mm/dd-hh24:mi:ss (so that it does not have to be quoted) and", " yyyy/mm/dd with an implication of yyyy/mm/dd-00:00:00.", "7) All defined category.type pairs are supported; only those in use", " with work requests will return useful results.", "8) Except for the issue of possible performance issues due to a large", " number of tests, the program should work with any number of request", " parameter, history and facility tests.", "9) The comments test supports both equality tests and wildcard tests.", " The type of test is determined by the presence of a % in the string.", "10) The is_archived test accepts arguments of 0, 1 or X (do not care).", " If no is_archived test is given, then an implicit is_archived=0", " test is automatically supplied.", "11) showquery=true requests printing of a 'chunked' (into reasonable", " sized lines version of the query for work requests.", "12) history=status,on_or_after,before", " search for work requests with a work request status history entry with", " that status and effective dates with those limits. The two commas", " are necessary. The status is required. The dates are optional and", " absence implies no limit of that type.", "13) status and facility names are checked for validity.", "14) report=selection provides a restriction the output from the program", " The selection values supported are ids (request ids only), events", " (event counts only, at the request level and over-all) or summary", " (request ids and event counts only).", ] processing_time_SQL = \ "((select max(effective_date) "+ \ "from work_request_status_histories wrshi "+ \ "where wrshi.request_id = wr.request_id "+ \ "and wrshi.request_status_id in ("+ \ "select request_status_id "+ \ "from work_request_statuses "+ \ "where request_status in "+ \ "('partial','complete','terminated'))) - "+ \ "(select max(effective_date) "+ \ "from work_request_status_histories wrsh2 "+ \ "where wrsh2.request_id = wr.request_id "+ \ "and wrsh2.request_status_id = ("+ \ "select request_status_id "+ \ "from work_request_statuses "+ \ "where request_status = 'approved')))" def trinary(test,yes,no): if test: return yes else: return no def plural(text,count,plural=None): if plural: if count==1: return " %d %s " % (count,text) else: return " %d %s " % (count,plural) return " %d %s%s " % (count,text,trinary(count==1,"","s")) date_str1 = "^(\\d{4})/(\\d{1,2})/(\\d{1,2})$" date_str2 = "^(\\d{4})/(\\d{1,2})/(\\d{1,2})[ -]" + \ "(\\d{1,2}):(\\d{1,2}):(\\d{1,2})$" date_re1 = re.compile(date_str1) date_re2 = re.compile(date_str2) def valid_date(input): try: m = re.match(date_re1,input.strip()) if m: year = int(m.group(1)) if year < 1999 or year > 2020: return None month = int(m.group(2)) if month < 1 or month > 12: return None day = int(m.group(3)) if day < 1 or day > 31: return None datetime.date(year,month,day) return "%04d/%02d/%02d 00:00:00" % (year,month,day) else: m = re.match(date_re2,input.strip()) if m: year = int(m.group(1)) if year < 1999 or year > 2020: return None month = int(m.group(2)) if month < 1 or month > 12: return None day = int(m.group(3)) if day < 1 or day > 31: return None datetime.date(year,month,day) hour = int(m.group(4)) if hour < 0 or hour > 23: return None minute = int(m.group(5)) if minute < 0 or minute > 59: return None second = int(m.group(6)) if second < 0 or second > 59: return None return "%04d/%02d/%02d %02d:%02d:%02d" % \ (year,month,day,hour,minute,second) except Exception, ex: print ex for i in traceback.extract_tb(sys.exc_info()[2]): print i pass return None linesize=78 def chunk(text): if text: if len(text) <= linesize: return text words = text.split() lines = [] line = "" for w in words: if (len(line) + len(w)) < linesize: if line: line = line+" "+w else: line = w else: if line: lines += [ line, ] line = w if line: lines += [ line, ] return lines return None def checkStatus(conn,value): try: cursor = conn.cursor() cursor.execute( """select request_status_id from work_request_statuses where request_status = '%s'""" % value) status_id = cursor.fetchall() cursor.close() if status_id: return status_id[0][0] else: return None except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit def checkFacility(conn,value): try: cursor = conn.cursor() cursor.execute( """select facility_id from mc_production_centers where facility_name = '%s'""" % value) facility_id = cursor.fetchall() cursor.close() if facility_id: return facility_id[0][0] else: return None except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit def main(args): try: conn = cx_Oracle.connect("samread/reader@d0ofprd1") except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit args = args[1:] error = 0 produce_help = 0 if not len(args): produce_help = 1 error = 1 print "The use of arguments is required" is_archived = 0 narg = 0 ids_only = 0 event_counts_only = 0 details = 1 joins = [ "work_request wr", "work_request_types wrt using(request_type_id)", "work_request_statuses wrs using(request_status_id)", "working_groups wg using(work_grp_id)", "persons p using(person_id)", ] data_tier_value = None wheres = [ ] showQuery = 0 for arg in args: narg = narg+1 pos = arg.find("=") if pos < 0: test_value = arg.lower() if test_value.find("help") >= 0 or test_value.find("?") >= 0: produce_help = 1 else: print "arg %d has no equal sign" error = 1 else: name = arg[:pos].strip().lower() test_name = name.lower() value = arg[(pos+1):].strip() pos = test_name.find(".") if pos < 0: if test_name == "request_id": if value.isdigit(): wheres += ["wr.request_id = %d" % int(value)] else: error = 1 print "request_id value, %r, is not numeric" % value elif test_name == "request_type": wheres += ["wrt.request_type = '%s'" % value] elif test_name == "request_status": if checkStatus(conn,value): wheres += ["wrs.request_status = '%s'" % value] else: error = 1 print "request_status value, %r, is not known" % (value,) elif test_name == "work_grp_name": wheres += ["wg.work_grp_name = '%s'" % value] elif test_name == "username": wheres += ["p.username = '%s'" % value] elif test_name == "number_of_events": if value.isdigit(): wheres += ["wr.number_of_events = %d" % int(value)] else: error = 1 print "number_of_events value, %r, is not numeric" % value elif test_name == "request_snap_id": if value.isdigit(): wheres += ["wr.request_snap_id = %d" % int(value)] else: error = 1 print "request_snap_id value, %r, is not numeric" % value elif test_name == "comments": if value.find("%") >= 0: wheres += ["wr.comments like '%s'" % value] else: wheres += ["wr.comments = '%s'" % value] elif test_name == "is_archived": is_archived = 1 if value == "0": wheres += ["wr.is_archived = 0"] elif value == "1": wheres += ["wr.is_archived = 1"] elif value == "x": pass elif value == "X": pass else: error = 1 print "is_archived value, %r, is invalid" % value elif test_name == "data_tier": data_tier_value = value elif test_name == "created_on_or_after": test_value = valid_date(value) if test_value: wheres += [ "wr.create_date >= to_date('%s','yyyy/mm/dd hh24:mi:ss')" % value ] else: error = 1 print "created_on_or_after value, %r, is invalid" % value elif test_name == "created_before": test_value = valid_date(value) if test_value: wheres += [ "wr.create_date < to_date('%s','yyyy/mm/dd hh24:mi:ss')" % value ] else: error = 1 print "created_before value, %r, is invalid" % value elif test_name == "updated_on_or_after": test_value = valid_date(value) if test_value: wheres += [ "wr.update_date >= to_date('%s','yyyy/mm/dd hh24:mi:ss')" % value ] else: error = 1 print "updated_on_or_after value, %r, is invalid" % value elif test_name == "updated_before": test_value = valid_date(value) if test_value: wheres += [ "wr.update_date < to_date('%s','yyyy/mm/dd hh24:mi:ss')" % value ] else: error = 1 print "updated_before value, %r, is invalid" % value elif test_name == "history": test_value = value.split(",") if len(test_value) == 3: status = test_value[0].strip() if not checkStatus(conn,status): error = 1 print "history status value, %r, is not known" % (status,) on_or_after = test_value[1].strip() before = test_value[2].strip() if len(status): text = \ "wr.request_id in ("+\ "select request_id "+\ ("from work_request_status_histories wrsh%d " % narg)+\ ("join work_request_statuses wrs%d " % narg)+\ "using(request_status_id) "+\ ("where wrs%d.request_status = '%s' " % (narg,status)) if len(on_or_after): test_date = valid_date(on_or_after) if test_date: text = text +\ ("and wrsh%d.effective_date >= " % narg)+\ "to_date('%s','yyyy/mm/dd hh24:mi:ss') " % (test_date,) else: error = 1 print "history on or after date, %r, is invalid" % \ (on_or_after,) if len(before): test_date = valid_date(before) if test_date: text = text +\ ("and wrsh%d.effective_date < " % narg)+\ "to_date('%s','yyyy/mm/dd hh24:mi:ss')" % (test_date,) else: error = 1 print "history before date, %r, is invalid" % \ (before,) if not error: wheres += [ (text + ") ") ] else: error = 1 print "history status value, %r, not specified" % (status,) else: error = 1 print ("history value, %r, is not a list of 3 "+ "comma-separated values") % value elif test_name == "time": pos = value.find(":") if pos < 0: error = 1 print "time value, %r, does not have a colon separator" else: left = value[:pos] right = value[(pos+1):] if len(left): try: wheres += [ (processing_time_SQL+" >= %f" % float(left)) ] except Exception, ex: error = 1 print "time left value, %r, is invalid" % left print ex if len(right): try: wheres += [ (processing_time_SQL+" < %f" % float(right)) ] except Exception, ex: error = 1 print "time right value, %r, is invalid" % right print ex elif test_name == "facility": facility_id = checkFacility(conn,value) if facility_id: wheres += [ "wr.request_id in ("+ "select request_id from work_request_handlers "+ "where facility_id = %d)" % facility_id ] else: error = 1 print "facility name value, %r, is not known" % (value,) elif test_name == "showquery": test_value = value.lower() if test_value in ["1", "true", "t", "yes", "y"]: showQuery = 1 elif test_value not in ["0", "false", "f", "no", "n"]: error = 1 print "showQuery value, %r, is invalid" % (value,) elif test_name == "report": test_value = value.lower() if test_value == "ids": ids_only = 1 elif test_value == "events": event_counts_only = 1 elif test_value == "summary": event_counts_only = 1 details = 0 else: error = 1 print "report value, %r, is invalid" % (value,) else: error = 1 print "argument name, %r, is not recognized" % name else: category = test_name[:pos].strip() type = test_name[(pos+1):].strip() try: cursor = conn.cursor() cursor.execute( "select param_category_id from param_categories "+ "where param_category = '%s'" % category) values = cursor.fetchall() cursor.close() except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit if len(values): category_id = values[0][0] else: error = 1 print "parameter category %r is not recognized." % category try: cursor = conn.cursor() cursor.execute( "select param_type_id from param_types "+ ("where param_type = '%s' " % type)+ ("and param_category_id = %d " % category_id)) values = cursor.fetchall() cursor.close() except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit if len(values): type_id = values[0][0] else: error = 1 print "parameter type %r.%r is not recognized." % (category,type) try: cursor = conn.cursor() cursor.execute( "select param_value_id from param_values "+ ("where param_value = '%s' " % value)+ ("and param_type_id = %d " % type_id)) values = cursor.fetchall() cursor.close() except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit if len(values): value_id = values[0][0] joins += [ "param_values_work_requests pvwr%d " % narg+ "on(wr.request_id = pvwr%d.request_id)" % narg] wheres += ["pvwr%d.param_value_id = %d" % (narg,value_id)] else: error = 1 print "parameter type %r.%r=%r is not recognized." % \ (category,type,value) if produce_help: for line in help: print line print raise SystemExit if error: print "Unable to do query because of above errors." raise SystemExit if not is_archived: wheres += ["wr.is_archived = 0"] if not data_tier_value: data_tier_value = "thumbnail" total_processing_time = 0.0 total_events_requested = 0 total_events_generated = {} query = \ "select wr.request_id, request_type, request_status, "+ \ "(select max(effective_date) "+ \ "from work_request_status_histories wrshi "+ \ "where wrshi.request_id = wr.request_id "+ \ "and wrshi.request_status_id = request_status_id) "+ \ "status_date, "+ \ processing_time_SQL+" processing_time, "+ \ "work_grp_name, username, is_archived, number_of_events, "+ \ "request_snap_id, comments, priority, wr.create_date, "+ \ "wr.update_date from "+" join ".join(joins)+" " if len(wheres): query += "where "+" and ".join(wheres) if ids_only or not details: query += " order by wr.request_id" else: query += " order by priority desc, status_date, request_id" if showQuery: for line in chunk(query): print line try: cursor = conn.cursor() cursor.execute(query) rows = cursor.fetchall() cursor.close() except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit if ids_only: if len(rows): for row in rows: print str(row[0]) else: print "No work requests found." else: if len(rows): maxlenid = 0 maxlenstatus = 0 maxlenevents = 0 for row in rows: n = len(str(row[0])) if maxlenid < n: maxlenid = n n = len(str(row[2])) if maxlenstatus < n: maxlenstatus = n n = len(str(row[8])) if maxlenevents < n: maxlenevents = n first = 1 events_requested = 0 for row in rows: if details: if first: first = 0 else: print print (" Work Request %r " % (row[0],)).center(60,"=") if not event_counts_only: print " request type:".ljust(32,".")+str(row[1]) print " request status:".ljust(32,".")+str(row[2]) print " request status date:".ljust(32,".")+str(row[3]) if row[4]: print " processing time:".ljust(32,".")+("%.5g" % row[4]) else: print " processing time:".ljust(32,".")+"None" print " work group name:".ljust(32,".")+str(row[5]) print " user name:".ljust(32,".")+str(row[6]) print " is archived:".ljust(32,".")+str(row[7]) print " number of events:".ljust(32,".")+str(row[8]) print " request snapshot id:".ljust(32,".")+str(row[9]) print " comments:".ljust(32,".")+str(row[10]) print " priority:".ljust(32,".")+str(row[11]) print " created:".ljust(32,".")+str(row[12]) print " updated:".ljust(32,".")+str(row[13]) else: events = row[8] if not events: events = 0 print "%s %s %s" % \ (str(row[0]).rjust(maxlenid), str(row[2]).ljust(maxlenstatus), str(events).rjust(maxlenevents)) if row[4]: total_processing_time += row[4] if row[8]: try: events_requested = int(str(row[8])) total_events_requested += events_requested except: pass if not event_counts_only: if details: try: cursor = conn.cursor() cursor.execute( """select param_category||'.'||param_type||'="'|| param_value||'"' param from param_values_work_requests pvwr join param_values using(param_value_id) join param_types using(param_type_id) join param_categories using(param_category_id) where request_id = %d order by param""" % row[0]) params = cursor.fetchall() cursor.close() except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit if len(params): print plural("Parameter",len(params)).center(60,"-") for p in params: print " %s" % p[0] params = None else: print " No Parameters ".center(60,"-") try: cursor = conn.cursor() cursor.execute( """select request_status, effective_date from work_request_status_histories join work_request_statuses using(request_status_id) where request_id = %d order by effective_date, wrsh_id""" % row[0]) statuses = cursor.fetchall() cursor.close() except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit if len(statuses): print plural("Status",len(statuses),"Statuses").center(60,"-") for s in statuses: print " %-20.20s %s" % s statuses = None else: print " No Statuses ".center(60,"-") try: cursor = conn.cursor() cursor.execute( """select request_handler_id, request_handler_status, number_of_events, number_of_files, facility_name, project_id, project_name, project_status, grid_job_identifier from work_request_handlers join work_request_handler_statuses using(request_handler_status_id) left join mc_production_centers using(facility_id) left join analysis_projects using(project_id) where request_id = %d order by request_handler_id""" % row[0]) handlers = cursor.fetchall() cursor.close() except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit if len(handlers): if details: print plural("Handler",len(handlers)).center(60,"-") events_generated = {} first = 1 for h in handlers: if details and not event_counts_only: if first: first = 0 else: print print " request handler id:".ljust(32,".")+str(h[0]) print " request handler status:".ljust(32,".")+str(h[1]) print " number of events:".ljust(32,".")+str(h[2]) print " number of files:".ljust(32,".")+str(h[3]) print " facility name:".ljust(32,".")+str(h[4]) print " project id:".ljust(32,".")+str(h[5]) print " project name:".ljust(32,".")+str(h[6]) print " project status:".ljust(32,".")+str(h[7]) print " grid job identifier:".ljust(32,".")+str(h[8]) if h[2]: try: if h[1] in events_generated: events_generated[h[1]] += h[2] else: events_generated[h[1]] = h[2] except Exception, ex: print ex try: if h[1] in total_events_generated: total_events_generated[h[1]] += h[2] else: total_events_generated[h[1]] = h[2] except Exception, ex: print ex if details and (events_requested or events_generated): if first: first = 0 else: print print " events requested:".ljust(32,".")+str(events_requested) events_generated_list = events_generated.keys() events_generated_list.sort() for i in events_generated_list: print (" events %r:" % i).ljust(32,".")+ \ str(events_generated[i]) handlers = None else: if details: print " No Handlers ".center(60,"-") if details and not event_counts_only: try: cursor = conn.cursor() cursor.execute( """select family, appl_name, version, processing_order, proj_snap_id, proj_snap_version, proj_def_id, proj_def_name, installation_name, batch_system_name, station_name, queue_name from request_details rd left join application_families using(appl_family_id) left join project_snapshots using(proj_snap_id) left join project_definitions pd on (rd.proj_def_id = pd.proj_def_id) left join installations using(installation_id) left join stations using(station_id) left join queues using(queue_id) join batch_system_types using(batch_system_type_id) where request_id = %d order by req_detail_id""" % row[0]) request_details = cursor.fetchall() cursor.close() except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit if len(request_details): print plural("Detail",len(details)).center(60,"-") first = 1 for d in request_details: if first: first = 0 else: print print " family:".ljust(32,".")+str(d[0]) print " application name:".ljust(32,".")+str(d[1]) print " version:".ljust(32,".")+str(d[2]) print " processing order:".ljust(32,".")+str(d[3]) print " dataset snap id:".ljust(32,".")+str(d[4]) print " dataset snap version:".ljust(32,".")+str(d[5]) print " data definition id:".ljust(32,".")+str(d[6]) print " data definition name:".ljust(32,".")+str(d[7]) print " installation name:".ljust(32,".")+str(d[8]) print " batch system name:".ljust(32,".")+str(d[9]) print " station name:".ljust(32,".")+str(d[10]) print " queue name:".ljust(32,".")+str(d[11]) else: print " No Details ".center(60,"-") request_details = None if len(rows): print " File and Event Counts by Request ".center(60,"#") nFiles = 0 nEvents = 0 first = 1 for row in rows: try: cursor = conn.cursor() cursor.execute( """select %d r, count(*) n, sum(event_count) e from data_files smdf join data_files_param_values smdfpv on(smdf.file_id = smdfpv.file_id) join param_values using(param_value_id) join param_types using(param_type_id) join param_categories using(param_category_id) join data_tiers using(data_tier_id) where param_value = to_char(%d) and param_type='requestid' and param_category='global' and file_content_status_id != ( select file_content_status_id from file_content_statuses where file_content_status = 'bad') and data_tier = '%s' and exists ( select 1 from data_file_locations smdfl, data_storage_locations smdsl where smdfl.file_id = smdf.file_id and smdfl.location_id = smdsl.location_id and ( smdsl.location_type != 'tape' or exists ( select 1 from volumes smv where smv.volume_id = smdfl.volume_id and smv.volume_status = 'online'))) """ % (row[0],row[0],data_tier_value)) counts = cursor.fetchall() cursor.close() except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit if len(counts): for p in counts: if p[1]: nFiles += p[1] if p[2]: nEvents += p[2] print \ " request: %s number of files: %s, number of events: %s" % p counts = None print " Total number of files: %d, total number of events: %d" % \ (nFiles,nEvents) else: print "No work requests found." if total_events_requested or total_events_generated: print " Total Events ".center(60,"#") print " total events requested:".ljust(32,".")+ \ str(total_events_requested).rjust(12) total_events_generated_list = total_events_generated.keys() total_events_generated_list.sort() for i in total_events_generated_list: print (" total events %r:" % i).ljust(32,".")+ \ str(total_events_generated[i]).rjust(12) if total_processing_time: if total_events_requested or total_events_generated: print (" total processing time:".ljust(32,".")+"%12.5g") % \ total_processing_time else: print (" Total Processing Time is %.5g " % total_processing_time) \ .center(60,"#") conn.close() if __name__ == '__main__': sys.exit(main(sys.argv))