import os, stat import string import math import re import pwd import subprocess import time import datetime from os.path import exists def getUsername(): return pwd.getpwuid( os.getuid() )[ 0 ] __property_separator_regex=""":|=|(?<=[^\\\\]) """ __property_regex = re.compile(__property_separator_regex) def getProperties(filename): #propFile= file( filename, "rU" ) #propFile= open( filename, "rU" ) propFile= open( filename, 'r', newline=None ) propDict= dict() for propLine in propFile: propDef= propLine.strip() if len(propDef) == 0: continue if propDef[0] in ( '!', '#' ): continue separator_location = __property_regex.search(propDef) if separator_location is not None: found = separator_location.start() else: found = len(propDef) name= propDef[:found].rstrip().replace('\\ ', ' ') value= propDef[found:].lstrip(":= ").rstrip() propDict[name]= value propFile.close() # print propDict return propDict def getToolType(commandlineString): if re.search(r'garli', "".join(commandlineString).lower()): return "garli" elif re.search(r'raxml', "".join(commandlineString).lower()): return "raxml" elif re.search(r'mbhwrapper', "".join(commandlineString).lower()): return "mrbayes" elif re.search(r'beast', "".join(commandlineString).lower()): return "beast" return None shared_queue = "shared" shared_queue_limit = 20160.0 short_queue = "compute" short_queue_limit = 20160.0 gpu_queue = "gpu" gpu_shared_queue = "gpu-shared" aws = "aws" large_shared_queue = "large-shared" cores_per_node = 24 # Effectively get rid of max_nodes by setting it to 5000 max_nodes = 5000 max_cores = max_nodes * cores_per_node default_cores = cores_per_node account = "sds121" # account = "ddp194" # account = "sds121" scheduler_file = "scheduler.conf" max_file_size = 15625000.0 email = "mmiller@sdsc.edu" jobname = "" runfile = "./_batch_command.run" statusfile = "./_batch_command.status" cmdfile = "./_batch_command.cmdline" nodeinfofile = "./_NODEINFO.TXT" jobdir = os.getcwd() epilogue="%s/_epilogue.sh" % (jobdir) jobname = os.environ.get("WB_JOBID", "cipres") #is_cloudeligible = os.environ.get("IS_CLOUDELIGIBLE", "false") remotejobid = None #specified_runtime = 0.0 def getRuntime(properties): try: rt = properties.get("runhours", 0.0) rt = math.ceil(float(rt) * 60 ) except: rt = 0.0 return rt def passthroughInfo(properties): retval = {} for key in properties: if key.startswith("--"): retval[key] = properties.get(key) return retval def schedulerInfo(properties, tooltype, cmdline, sentTC): """ properties is a dictionary containing keys: jobtype, mpi_processes, threads_per_process, nodes, runhours, node_exclusive, gpu. gpu=n, where n is greater than 0, means use n gpus. Job will be run in the gpu or gpu-shared partition, depending on value of node_exclusive. Based on properties and hardcoded info about the resource this returns a dictionary containing: is_direct, is_mpi, queue, runtime, mpi_processes, nodes, cores_used, cores_used_per_node, gpu""" # get runhours from properties and convert it to minutes, default to ???. try: runtime = properties.get("runhours", 0.0) runtime = math.ceil(float(runtime) * 60 ) except: runtime = 0.0 tpp = int(properties.get("threads_per_process", 1)) if tpp == 1: tpp = int(properties.get("threads-per-process", 1)) gpuNum = int(properties.get("gpu", 0)) largeData = int(properties.get("large_data", 0)) requestMem = properties.get("request_mem", "none") if requestMem == "none" : requestMem = properties.get("mem", "none") cpusPerTask = int(properties.get("cpus-per-task", 1)) if sentTC and gpuNum <= 0 : if tpp <= 4 : tpp = 4 if tpp > 4 and tpp <= 8 : tpp = 8 if tpp > 8 and tpp < 24 : tpp = 24 retval = { "runtime":runtime, "threads_per_process":tpp, # "threads_per_process":int(properties.get("threads_per_process", 1)), "nodes": int(properties.get("nodes", 1)), "mpi_processes":int(properties.get("mpi_processes",1)), "node_exclusive":int(properties.get("node_exclusive",1)), "is_direct" : [False, True][properties.get("jobtype", "serial") == "direct"], "is_mpi" : [False, True][properties.get("jobtype", "serial") == "mpi"], "gpu": int(properties.get("gpu", 0)), "remove_mv2_param": int(properties.get("remove_mv2_param", 0)), "no_stdout_saved": int(properties.get("no_stdout_saved", 0)), "no_stderr_saved": int(properties.get("no_stderr_saved", 0)), "workflow_type": properties.get("workflow_type", "none") } retval["cores_used"] = retval["mpi_processes"] * retval["threads_per_process"] retval["cores_used_per_node"] = math.ceil((float(retval["cores_used"]) / retval["nodes"])/cpusPerTask) if retval["node_exclusive"]: # if largeData: # if sentToCloud(cmdline, runtime): # retval["queue"] = aws # else: # retval["queue"] = large_shared_queue # else: if retval["gpu"]: if sentToCloud(cmdline, runtime): retval["queue"] = aws else: retval["queue"] = gpu_queue else: if sentToCloud(cmdline, runtime): retval["queue"] = aws else: retval["queue"] = short_queue else: if largeData: if sentToCloud(cmdline, runtime): retval["queue"] = aws else: retval["queue"] = large_shared_queue else: if retval["gpu"]: if sentToCloud(cmdline, runtime): retval["queue"] = aws else: retval["queue"] = gpu_shared_queue else: if sentToCloud(cmdline, runtime): retval["queue"] = aws else: retval["queue"] = shared_queue retval["mem"] = requestMem retval["cpus-per-task"] = cpusPerTask return retval def log(filename, message): f = open(filename, "a") f.write(message) f.close() def deleteJob(jobid, workingdir): if os.path.isfile(workingdir + "/cancelJobs"): os.chdir(workingdir) cmd = "./cancelJobs %d" % jobid else: cmd = "scancel %d" % jobid p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) outerr = p.communicate() output = outerr[0].decode() err = outerr[1] if (p.returncode != 0): raise SystemError("Error running '%s', return code is %d. stdout is '%s', stderr is '%s'" % (cmd, p.returncode, output, err)) else: if os.path.isfile(workingdir + "/stdout.txt") != True: fstdout = open(workingdir + "/stdout.txt", "x"); fstdout.write("===Job is deleted by user==="); fstdout.close(); if os.path.isfile(workingdir + "/stderr.txt") != True: fstderr = open(workingdir + "/stderr.txt", "x"); fstderr.write("===Job is deleted by user==="); fstderr.close(); def jobInQueue(): """ Return list of all of the user's jobs that are in the queue and aren't completed or cancelled. They may be waiting, running, etc. """ cmd = "squeue -u %s -h -o '%%.8i %%T'" % getUsername() p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) outerr = p.communicate() output = outerr[0].decode() err = outerr[1] if (p.returncode != 0): raise SystemError("Error running squeue, return code is %d. stderr is %s" % (p.returncode, err)) if (len(err) != 0): raise SystemError("Error running squeue, stderr is %s" % (err)) output_rows = output.split("\n") jobs = [] for row in output_rows: r = row.split() if len(r) > 1 and (r[1] != "CANCELLED" and r[1] != "COMPLETED"): jobs.append(r[0]) return jobs # To do: modify RAxML-Light.sh to accept --url argument and pass it here, like --account. Decide whether # to use --email too, maybe just on the last job? Or ask Mark if he wants all the emails? def submitDirectJob(account, url, email, jobname, commandline): # Not exactly a general purpose solution but for raxml-light we can just add account, email and url # arguments to the command line. rfile = open(cmdfile, "w") rfile.write("#!/bin/sh\n") rfile.write(" ".join(commandline)) rfile.write(" --account %s" % account) rfile.write(" --url %s" % url) rfile.write(" --email %s" % email) rfile.write("\n") rfile.close() os.chmod(cmdfile, stat.S_IRWXU|stat.S_IRGRP|stat.S_IROTH); #os.chmod(cmdfile, 0744); cmd = cmdfile p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) output = p.communicate()[0].decode() retval = p.returncode if retval != 0: print("Error submitting job:\n") print(output) log(statusfile, "submitDirectJob is returning %d.\nStdout/stderr is:%s\n" % (retval, output)) # When there's a bash syntax error in a script it exits with 2, but if we return 2, we've # defined that to mean "too many jobs queued" and cipres will print a special message. if (retval == 2): retval = 1 return retval log(statusfile, "Job submission stdout/stderr is: %s\n" % output) # output should be just the full job id, .trestles-fe1.sdsc.edu: firstline = output.splitlines() if len(firstline) == 1: firstline = firstline[0] p = re.compile(r"^(\d+).trestles.\S+", re.M) m = p.search(output) if m != None: jobid = m.group(0) short_jobid = m.group(1) print("jobid=%d" % int(short_jobid)) remotejobid = short_jobid log(statusfile, "JOBID is %s\n" % jobid) log("./_JOBINFO.TXT", "\nJOBID=%s\n" % jobid) return 0 print("Error, job submission says: %s" % output) log(statusfile, "can't find jobid, submitDirectJob is returning 1\n") return 1 # Returns 0 on success, 2 means too many jobs queued. def submitJob(): # cmd = "sbatch -L cipres:1 --res=CRES %s 2>> %s" % (runfile, statusfile) cmd = "sbatch %s 2>> %s" % (runfile, statusfile) log(statusfile, "running: %s\n" % (cmd)) p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) output = p.communicate()[0].decode() retval = p.returncode if retval != 0: # read whatever sbatch wrote to the statusfile and print it to stdout print("Error submitting job, sbatch says:\n") f = open(statusfile, "r"); print(f.read(), "\n\n"); f.close() print(output) # When we return 2 it means too many jobs are queued. qstat returns -226 on abe # in this situation ... not sure if that's true here, on trestles as well. # if retval == -226: # retval = 2 log(statusfile, "submit_job is returning %d\n" % retval) return retval log(statusfile, "sbatch output is: " + output + "\n" + "======================================================================" + "\n") # output from sbatch should 'submitted batch job N' wehre N is jobid. # p = re.compile(r"^(\d+).trestles.\S+", re.M) p = re.compile(r"^Submitted batch job (\d+)", re.M) m = p.search(output) if m != None: jobid = m.group(1) print("jobid=%d" % int(jobid)) log(statusfile, "JOBID is %s\n" % jobid) log("./_JOBINFO.TXT", "\nJOBID=%s\n" % jobid) submitAttributes(jobid) return 0 else: print("Error, sbatch says: %s" % output) log(statusfile, "can't get jobid, submit_job is returning 1\n") return 1 def submitAttributes(jobid): try: jobinfo = getProperties("./_JOBINFO.TXT") if "email" in jobinfo: uid = jobinfo["email"] else: uid = "UNKNOWN@phylo.org" submitdate = subprocess.Popen(["date", "+%F %T %:z"], stdout=subprocess.PIPE).communicate()[0].decode() cmd = "gateway_submit_attributes -gateway_user '%s' -submit_time '%s' -jobid '%s'" % (uid.strip(), submitdate.strip(), jobid) p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) outerr = p.communicate() output = outerr[0].decode() err = outerr[1] if (p.returncode != 0): log(statusfile, "%s failed with exit status %d\n" % (cmd, p.returncode)); log(statusfile, "stdout: %s\n" % (output)); log(statusfile, "stderr: %s\n" % (err)); else: log(statusfile, "ran %s" % (cmd)) except Exception as err: log(statusfile, "submitAttributes exception") def splitCommandLine(cmd): """ Splits cmd at && into individual commands, writing each one to a file named lib.cmdfile_I, where I is a number (1, ...). Returns the list of filenames created. """ str = " ".join(cmd) cmdlist = str.split("&&") filelist = [] i = 1 for cmd in cmdlist : cmd = cmd.strip() if not len(cmd): continue; filename = "%s_%s" % (cmdfile, i) rfile = open(filename, "w") rfile.write("#!/bin/sh\n") rfile.writelines((cmd, "\n")) rfile.close() os.chmod(filename, stat.S_IRWXU|stat.S_IRGRP|stat.S_IROTH) #os.chmod(filename, 0744) filelist.append(filename) i = i + 1 return filelist def getJobInfo(key): try: jobinfoMap = getProperties("./_JOBINFO.TXT") return jobinfoMap[key] except Exception as err: log(statusfile, "getJobInfo exception") def sentToCloud(cmdline, runtime): try: shouldSendToCloud = False if len(cmdline) > 0 : if "_cloud" in cmdline[0] : schedulerMap = getProperties("./scheduler.conf") if "beastwrapper_" in cmdline[0] and ("_1.8.4_" in cmdline[0] or "_1.10.4_" in cmdline[0] or "_1.8.3_" in cmdline[0] or "_1.10.5.pre_" in cmdline[0]) : if int(schedulerMap.get("gpu", 0)) >= 4 : shouldSendToCloud = True # if int(runtime) >= (168*60) and (int(schedulerMap.get("threads_per_process", 0)) == 4 or int(schedulerMap.get("threads_per_process", 0)) == 8 or int(schedulerMap.get("threads_per_process", 0)) == 24) : # shouldSendToCloud = True if "beast2wrapper_" in cmdline[0] and ("_2.6.1_" in cmdline[0]) : if int(schedulerMap.get("gpu", 0)) >= 4 : shouldSendToCloud = True # if int(runtime) >= (168*60) and (int(schedulerMap.get("threads_per_process", 0)) == 3 or int(schedulerMap.get("threads_per_process", 0)) == 4 or int(schedulerMap.get("threads_per_process", 0)) == 6 or int(schedulerMap.get("threads_per_process", 0)) == 24) : # shouldSendToCloud = True # try: # schedulerMap = getProperties("./scheduler.conf") # if schedulerMap["sent_to_cloud"] == "1" : # shouldSendToCloud = True except Exception as err: shouldSendToCloud = False # log(statusfile, "sentToCloud exception") return shouldSendToCloud