#!/usr/bin/env sampy # inherited from Iain Bertram 2005 # modified 2011/05/14 joel snow # 2006/01/19 add FarmMinEvents jms # 2006/04/06 modified for SAM v7 joel snow # 2007/05/23 patch from Petr Vokac to reduce race condition jms # 2007/10/06 address oracle directly to speed getting request info, thanks # to R. Herber jms # 2010/09/12 Change module name from Queue to RQueue to avoid conflict with # Python Queue module jms # 2010/11/15 patch from Petr Vokac to sanitize input from http accessed # settings file. jms # $Id: RQueue.py,v 1.2 2011-05-15 19:58:52 joel Exp $ import sys,os,getopt,urllib2 import traceback import cx_Oracle import re import datetime from traceback import * from Sam import sam from SamException import SamExceptions VERSION = "$Revision: 1.2 $" DATE = "$Date: 2011-05-15 19:58:52 $" DEBUG = 0 TEST = 0 date_str1 = "^(\\d{4})/(\\d{1,2})/(\\d{1,2})$" date_str2 = "^(\\d{4})/(\\d{1,2})/(\\d{1,2})[ -]" + \ "(\\d{1,2}):(\\d{1,2}):(\\d{1,2})$" date_re1 = re.compile(date_str1) date_re2 = re.compile(date_str2) months = ["Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"] def usage(): print "Usage: " + os.path.basename(sys.argv[0]) + " [options] [args]" print "Gets next available MC request by default." return def help(): usage() print """options: -s | --status [args] Report request system status only -t | --test Do not reserve request -h | --help -v | --version -d | --debug""" print "example usage: $ " + os.path.basename(sys.argv[0]) print VERSION + " " + DATE processing_time_SQL = \ "((select max(effective_date) "+ \ "from work_request_status_histories wrshi "+ \ "where wrshi.request_id = wr.request_id "+ \ "and wrshi.request_status_id in ("+ \ "select request_status_id "+ \ "from work_request_statuses "+ \ "where request_status in "+ \ "('partial','complete','finished','terminated'))) - "+ \ "(select max(effective_date) "+ \ "from work_request_status_histories wrsh2 "+ \ "where wrsh2.request_id = wr.request_id "+ \ "and wrsh2.request_status_id = ("+ \ "select request_status_id "+ \ "from work_request_statuses "+ \ "where request_status = 'approved')))" def valid_date(input): try: m = re.match(date_re1,input.strip()) if m: year = int(m.group(1)) if year < 1999 or year > 2020: return None month = int(m.group(2)) if month < 1 or month > 12: return None day = int(m.group(3)) if day < 1 or day > 31: return None datetime.date(year,month,day) return "%04d/%02d/%02d 00:00:00" % (year,month,day) else: m = re.match(date_re2,input.strip()) if m: year = int(m.group(1)) if year < 1999 or year > 2020: return None month = int(m.group(2)) if month < 1 or month > 12: return None day = int(m.group(3)) if day < 1 or day > 31: return None datetime.date(year,month,day) hour = int(m.group(4)) if hour < 0 or hour > 23: return None minute = int(m.group(5)) if minute < 0 or minute > 59: return None second = int(m.group(6)) if second < 0 or second > 59: return None return "%04d/%02d/%02d %02d:%02d:%02d" % \ (year,month,day,hour,minute,second) except Exception, ex: print ex for i in traceback.extract_tb(sys.exc_info()[2]): print i pass return None def connectOracle(): try: conn = cx_Oracle.connect("samread/reader@d0ofprd1") except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit return conn def getRequestInfo(conn,afterDate='2000/01/01',requestStatus=''): is_archived = 0 narg = 0 id_only = 0 event_counts_only = 0 joins = [ "work_request wr", "work_request_types wrt using(request_type_id)", "work_request_statuses wrs using(request_status_id)", "working_groups wg using(work_grp_id)", "persons p using(person_id)", ] wheres = [ ] showQuery = 0 wheres += ["wrt.request_type = 'simulation'"] test_value = valid_date(afterDate) if test_value: wheres += [ "wr.create_date >= to_date('%s','yyyy/mm/dd hh24:mi:ss')" % afterDate ] else: print "created_on_or_after value, %r, is invalid" % afterDate print "Unable to do query because of above errors." raise SystemExit if not is_archived: wheres += ["wr.is_archived = 0"] if requestStatus != "": wheres += ["wrs.request_status = '%s'" % requestStatus] query = \ "select wr.request_id, request_type, request_status, "+ \ "(select max(effective_date) "+ \ "from work_request_status_histories wrshi "+ \ "where wrshi.request_id = wr.request_id "+ \ "and wrshi.request_status_id = request_status_id) "+ \ "status_date, "+ \ processing_time_SQL+" processing_time, "+ \ "work_grp_name, username, is_archived, number_of_events, "+ \ "request_snap_id, comments, priority, wr.create_date, "+ \ "wr.update_date from "+" join ".join(joins)+" " if len(wheres): query += "where "+" and ".join(wheres) if id_only: query += " order by wr.request_id" else: query += " order by priority desc, status_date, request_id" if showQuery: print query try: cursor = conn.cursor() cursor.execute(query) rows = cursor.fetchall() cursor.close() except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit info = [] ## info is list of tuples with ## (reqid,type,status,group,username,numberOfEvents,priority,comments) if not len(rows): print "No work requests found." else: for row in rows: info.append((row[0],str(row[1]),str(row[2]),str(row[5]), str(row[6]),str(row[8]),str(row[11]),str(row[10]))) return info def getParams(conn,reqid): try: cursor = conn.cursor() cursor.execute( """select param_category||'.'||param_type||'="'|| param_value||'"' param from param_values_work_requests pvwr join param_values using(param_value_id) join param_types using(param_type_id) join param_categories using(param_category_id) where request_id = %d order by param""" % reqid) params = cursor.fetchall() cursor.close() except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit return params def getHandlers(conn,reqid): try: cursor = conn.cursor() cursor.execute( """select request_handler_id, request_handler_status, number_of_events, number_of_files, facility_name, project_id, project_name, project_status, grid_job_identifier from work_request_handlers join work_request_handler_statuses using(request_handler_status_id) left join mc_production_centers using(facility_id) left join analysis_projects using(project_id) where request_id = %d order by request_handler_id""" % reqid) handlers = cursor.fetchall() cursor.close() except cx_Oracle.DatabaseError, ex: print str(ex) for i in traceback.extract_tb(sys.exc_info()[2]): print i raise SystemExit return handlers def queue_prompt(): print "Determining the next Request to be Run" print "======================================" print " " FarmMaxEvents=1e12 FarmMinEvents=0 MaxEventsFlag=raw_input("Does your farm have a maximum number of events per request (if so answer 'yes'):") if MaxEventsFlag == 'yes': FarmMaxEvents=int(raw_input("What is the maximum number of events for a request:")) MinEventsFlag=raw_input("Does your farm have a minimum number of events per request (if so answer 'yes'):") if MinEventsFlag == 'yes': FarmMinEvents=int(raw_input("What is the minimum number of events for a request:")) return FarmMaxEvents,FarmMinEvents class Queue: def __init__(self,FarmMaxEvents=1e12,FarmMinEvents=0): global TEST, DEBUG self.FarmMaxEvents = FarmMaxEvents self.FarmMinEvents = FarmMinEvents self.facility = "" self.DEBUG = DEBUG self.TEST = TEST self.attempts = 0 self.napproved = 0 self.npending = 0 self.nhold = 0 self.npartial = 0 self.nterminated = 0 self.nfinished = 0 self.ncomplete = 0 self.nnew = 0 self.nselected = 0 self.approved = [] self.pending = [] self.hold = [] self.partial = [] self.terminated = [] self.finished = [] self.complete = [] self.new = [] self.selected = [] self.Next_Request = -1 self.reqdategt = 0 url="http://www-d0.fnal.gov/computing/mcprod/queue/settings.py" f = urllib2.urlopen(url) list=f.readlines() f.close() Group_List = sam.getRegisteredWorkGroups() Group_Events={} for item in Group_List : Group_Events[item]= [0, 1, 0, 0., 0., 0.] ## sanitize input from http - P.Vokac 11/2010 reAllowExec = re.compile("^\\s*(Group_Events\\s*\\[\\s*(['\"]).*?\\2\\s*\\]\\s*=\\s*\\[[0-9,\. ]+?\\]|reqdategt\\s*=\\s*(['\"]).*?\\3|requestsgt\\s*=\\s*\\d+)\\s*$") for item in list: if reAllowExec.match(item) == None: continue exec item self.reqdategt = reqdategt self.Group_List = Group_List self.Group_Events = Group_Events def nextRequest(self): self.Next_Request = 0 self.Number_Events = 0 self.requestList = [] try: if self.DEBUG: print "trying to get request list" print self.reqdategt fields = self.reqdategt.split("-") if len(fields) != 3: print "Bad after date format!" raise SystemExit ad = fields[2] +"/%2.2d/%2.2d" % (months.index(fields[1])+1, int(fields[0])) ## requestList = sam.getRequestInformation(afterDate=self.reqdategt) conn = connectOracle() self.requestList = getRequestInfo(conn,afterDate=ad) if self.DEBUG: print len(self.requestList) print requestsgt except: print 'Job failed as could not get requests.' print_exc() self.Next_Request = -1 else: for item in self.requestList: requestType = item[1] if requestType != 'simulation': continue requestId = item[0] requestStatus = item[2] workGroupName = item[3] numberOfEvents =long(item[5]) priority=item[6] comments=item[7] if DEBUG: print requestId,requestStatus,workGroupName,numberOfEvents,priority if self.Group_Events.has_key(workGroupName): self.Group_Events[workGroupName][5] = self.Group_Events[workGroupName][5] + numberOfEvents if requestStatus == 'complete' : self.Group_Events[workGroupName][0]= self.Group_Events[workGroupName][0] + numberOfEvents elif requestStatus == 'finished' : self.Group_Events[workGroupName][0]= self.Group_Events[workGroupName][0] + numberOfEvents elif requestStatus == 'partial' : self.Group_Events[workGroupName][0]= self.Group_Events[workGroupName][0] + numberOfEvents elif requestStatus == 'approved': self.napproved += 1 if priority == 'None' : priority = 0. if ((self.Group_Events[workGroupName][2] == 0) or (self.Group_Events[workGroupName][3] < long(priority))) : if numberOfEvents <= self.FarmMaxEvents and numberOfEvents >= self.FarmMinEvents: self.Group_Events[workGroupName][2] = long(requestId) self.Group_Events[workGroupName][3] = long(priority) self.Group_Events[workGroupName][4] = numberOfEvents self.Group_Events[workGroupName][0]= self.Group_Events[workGroupName][0] + numberOfEvents Min_Events=1e12 Number_Events=0 for group in self.Group_List: if self.Group_Events[group][1]>0 : self.Group_Events[group][3] = float(self.Group_Events[group][0])/float(self.Group_Events[group][1]) if self.Group_Events[group][2] > 0: if self.Group_Events[group][3] < Min_Events: if DEBUG: print '---> New Request ' self.Next_Request = self.Group_Events[group][2] Min_Events = self.Group_Events[group][3] Number_Events = self.Group_Events[group][4] if (self.napproved == 0): print "No approved requests available." elif (self.Next_Request == 0): print "No requests fit given criteria." else: print_line = 'The Next Request to be processed is Request ID = %s'%self.Next_Request print_line += ' with %s events.'%Number_Events print print_line self.Number_Events = Number_Events def reserve(self,answer=""): if self.Next_Request <= 0: return if self.attempts > 9: print "Giving up after 10 attempts. " return if (answer == ""): answer=raw_input("Do you want to run this request (answer yes):") print answer myRequestId=self.Next_Request requestList = sam.getRequestInformation(requestId=myRequestId) request = requestList[0].getRequest() params = request.getParams() handlers = requestList[0].getRequestHandlers() if (answer == 'yes'): self.attempts += 1 if (self.facility == ""): self.facility=raw_input("What is the facility (this must be a name entered in the MC Production Center table): ") #print self.facility try: result = sam.setRequestStatus(requestId=myRequestId, requestStatus='selected') except SamExceptions.StatusInvalid,e: ## if can't select request try to get another if self.DEBUG: print str(e) print "Can not select request " + str(myRequestId) + "! Will try to get another request..." self.nextRequest() self.reserve(answer) return ## Creating a request handler when request status is selected ## changes request status to partial. if len(handlers) == 0: handlerId = sam.createRequestHandler(requestId=myRequestId) print 'Created requestHandler with id',handlerId else: handlerId = handlers[-1].getRequestHandlerId() ## use last one? print 'Warning: Previous requestHandler with id',handlerId,'found for request',myRequestId if self.facility != 'SAMGrid': sam.modifyRequestHandler(requestHandlerId=handlerId, facilityName=self.facility) def getStatus(self): try: if self.DEBUG: print "trying to get request list" print self.reqdategt fields = self.reqdategt.split("-") if len(fields) != 3: print "Bad after date format!" raise SystemExit ad = fields[2] +"/%2.2d/%2.2d" % (months.index(fields[1])+1, int(fields[0])) conn = connectOracle() self.requestList = getRequestInfo(conn,afterDate=ad) if self.DEBUG: print len(self.requestList) print requestsgt except: print 'Job failed as could not get requests.' print_exc() else: for item in self.requestList: requestType = item[1] if requestType != 'simulation': continue requestId = item[0] requestStatus = item[2] if DEBUG: print requestId,requestStatus if requestStatus == 'complete' : self.ncomplete += 1 self.complete.append(requestId) elif requestStatus == 'finished' : self.nfinished += 1 self.finished.append(requestId) elif requestStatus == 'partial' : self.npartial += 1 self.partial.append(requestId) elif requestStatus == 'approved': self.napproved += 1 self.approved.append(requestId) elif requestStatus == 'pending': self.npending += 1 self.pending.append(requestId) elif requestStatus == 'new': self.nnew += 1 self.new.append(requestId) elif requestStatus == 'selected': self.nselected += 1 self.selected.append(requestId) elif requestStatus == 'terminated': self.nterminated += 1 self.terminated.append(requestId) elif requestStatus == 'hold' or requestStatus == 'hold-pending' or requestStatus == 'hold-partial' : self.nhold += 1 self.hold.append(requestId) def getNextRequest(self): return self.Next_Request def getNumberEvents(self): return self.Number_Events def getGroupList(self): return self.Group_List def getGroupEvents(self): return self.Group_Events def getRequestList(self): return self.requestList def reset(self): self.attemps = 0 return def getNapproved(self): return self.napproved def getNpending(self): return self.npending def getNhold(self): return self.nhold def getNnew(self): return self.nnew def getNpartial(self): return self.npartial def getNterminated(self): return self.nterminated def getNfinished(self): return self.nfinished def getNcomplete(self): return self.ncomplete def statusReport(self,args=[]): print "MC request system status:" self.getStatus() print "For groups:" print self.Group_Events.keys() print "Request status counts after", self.reqdategt print "new".ljust(12)+":",self.nnew print "pending".ljust(12)+":",self.npending print "approved".ljust(12)+":",self.napproved print "selected".ljust(12)+":",self.nselected print "partial".ljust(12)+":",self.npartial print "hold".ljust(12)+":",self.nhold print "complete".ljust(12)+":",self.ncomplete print "finished".ljust(12)+":",self.nfinished print "terminated".ljust(12)+":",self.nterminated if len(args) != 0: print for arg in args: if arg == "new": print "status new:" print self.new elif arg == "pending": print "status pending:" print self.pending elif arg == "approved": print "status approved:" print self.approved elif arg == "selected": print "status selected:" print self.selected elif arg == "partial": print "status partial:" print self.partial elif arg == "hold": print "status hold:" print self.hold elif arg == "complete": print "status complete:" print self.complete elif arg == "finished": print "status finished:" print self.finished elif arg == "terminated": print "status terminated:" print self.terminated def main(argv): global TEST, DEBUG try: opts, args = getopt.getopt(argv[1:],"hdvts", ["help","debug","version","test","status"]) except getopt.GetoptError: # print help information and exit: usage() print "Use --help option for more info." return 1 dostatus = 0 for o, a in opts: if o in ("-d", "--debug"): DEBUG = 1 print "debug enabled" if o in ("-t", "--test"): TEST = 1 print "test mode enabled" if o in ("-h", "--help"): help() sys.exit() if o in ("-v", "--version"): print VERSION + " " + DATE sys.exit() if o in ("-s", "--status"): dostatus = 1 if dostatus: queue=Queue() queue.statusReport(args) else: FarmMaxEvents,FarmMinEvents = queue_prompt() queue=Queue(FarmMaxEvents,FarmMinEvents) queue.nextRequest() if not TEST: queue.reserve() if __name__ == "__main__": sys.exit(main(sys.argv))