#!/usr/bin/env python # import test_comet_lib as lib import expanse_lib as lib import sys import os, stat import textwrap class Submission(object): def __init__(self, argv): #COMMAND LINE PARSING import argparse parser = argparse.ArgumentParser() parser.add_argument('--account', metavar="ACCOUNT", type=str, default=lib.account, help="The account string to use when submitting jobs. Default is read from config files.") parser.add_argument('--url', metavar="URL", dest="URL", type=str, help="Notification URL") try: cmdline_options, cmd = parser.parse_known_args(argv) cmd = cmd[1:] if not ('--' in cmd) else cmd[cmd.index('--')+1:] except Exception as e: print("There was a problem submitting your job") print(e) sys.exit(1) myrt = lib.getRuntime(lib.getProperties("scheduler.conf")) self.sentToCloud = lib.sentToCloud(cmd, myrt) if len(cmd) > 0 : if "_or_" in cmd[0]: if self.sentToCloud: cmd[0] = "/projects/ps-ngbt/home/cipres/ngbw/aws/scripts/" + cmd[0].replace("_or_", "_") else: cmd[0] = cmd[0].replace("_or_cloud", "") else: #if "_comet" in cmd[0]: # if "_comet.centos7" in cmd[0]: cmd[0] = cmd[0].replace("_comet.centos7 ", "_expanse ") # else: cmd[0] = cmd[0].replace("_comet ", "_expanse ") cmd[0] = cmd[0].replace("_centos7 ", "_expanse ") self.cmdline = cmd self.account = cmdline_options.account self.url = cmdline_options.URL self.tooltype = lib.getToolType(self.cmdline) self.scheduler_info = lib.schedulerInfo(lib.getProperties("scheduler.conf"), self.tooltype, self.cmdline, self.sentToCloud) if int(self.scheduler_info["gpu"]) > 0 : cmd[0] = cmd[0].replace("_expanse", "_expanse.gpu") if self.sentToCloud and "-threads" in cmd[0] : items = cmd[0].split(" ") i = 0 j = -1 k = -1 while i < len(items) : if items[i] == "-threads" : j = i if items[i] == "-instances" : k = i i += 1 if j>0 and (j+1) < len(items) : if k>0 and (k+1) < len(items) and (items[j+1] == items[k+1]) : items[k+1] = str(self.scheduler_info["threads_per_process"]) items[j+1] = str(self.scheduler_info["threads_per_process"]) cmd[0] = " ".join(items) self.cmdline = cmd def createEpilog(self): rfile = open(lib.epilogue, "w") if self.sentToCloud == True : text = """#!/bin/bash date +'%%s %%a %%b %%e %%R:%%S %%Z %%Y' > %s/term.txt squeue -j $SLURM_JOB_ID -l >> %s/term.txt echo "This file created by srun of: $*" >> %s/term.txt instance_id=`curl -s http://169.254.169.254/latest/meta-data/instance-id` EC2_AVAIL_ZONE=`curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone` EC2_REGION=`echo ${EC2_AVAIL_ZONE} | sed 's/[a-z]$//'` VOLUME_ID=`aws ec2 --region=${EC2_REGION} describe-volumes --filter Name=attachment.instance-id,Values=${instance_id} --query Volumes[].VolumeId --output text` aws ec2 delete-tags --region=${EC2_REGION} --resources ${instance_id} --tags Key=CI-UserID Key=CI-UserName Key=CI-JobHandle Key=CI-RemoteJobID Key=CI-TaskID aws ec2 delete-tags --region=${EC2_REGION} --resources ${VOLUME_ID} --tags Key=CI-UserID Key=CI-UserName Key=CI-JobHandle Key=CI-RemoteJobID Key=CI-TaskID """ % (lib.jobdir, lib.jobdir, lib.jobdir) else: text = """#!/bin/bash date +'%%s %%a %%b %%e %%R:%%S %%Z %%Y' > %s/term.txt squeue -j $SLURM_JOB_ID -l >> %s/term.txt echo "This file created by srun of: $*" >> %s/term.txt """ % (lib.jobdir, lib.jobdir, lib.jobdir) rfile.write(textwrap.dedent(text)); rfile.close(); #os.chmod(lib.epilogue, 0744); os.chmod(lib.epilogue, stat.S_IRWXU|stat.S_IRGRP|stat.S_IROTH); def createCommandFiles(self): cmdfileList = None if self.isSimpleWorkflow(): cmdfileList = lib.splitCommandLine(self.cmdline) else: # Write the command line to a file, batch_command.cmdline. rfile = open(lib.cmdfile, "w") rfile.write("#!/bin/sh\n") ### rfile.write("ulimit -f %d\n" % (lib.max_file_size)) rfile.writelines((" ".join(self.cmdline), "\n")) rfile.close() #os.chmod(lib.cmdfile, 0744) os.chmod(lib.cmdfile, stat.S_IRWXU|stat.S_IRGRP|stat.S_IROTH) cmdfileList = [] cmdfileList.append(lib.cmdfile) return cmdfileList def submitBatchScript(self): # If this is a "direct" run type job we don't need to create a qsub script, we'll just run batch_ommand.cmdline. # The command is responsible for creating done.txt and running an epilog script to create term.txt. if self.scheduler_info["is_direct"]: return lib.submitDirectJob(account, url, lib.email, lib.jobname, cmdline) self.createEpilog() self.cmdfileList = self.createCommandFiles() scheduler_stdout_txt = "#SBATCH -o _scheduler_stdout.txt" scheduler_stderr_txt = "#SBATCH -e _scheduler_stderr.txt" stdout_txt = "1>stdout.txt" stdout_txt_2 = "1>>stdout.txt" stderr_txt = "2>stderr.txt" stderr_txt_2 = "2>>stderr.txt" try: if int(self.scheduler_info["no_stdout_saved"]) == 1: stdout_txt = "1>/dev/null" stdout_txt_2 = "1>>/dev/null" #scheduler_stdout_txt = "##SBATCH -o _scheduler_stdout.txt" except Exception as e: print("no_stdout_saved not found") try: if int(self.scheduler_info["no_stderr_saved"]) == 1: stderr_txt = "2>/dev/null" stderr_txt_2 = "2>>/dev/null" #scheduler_stderr_txt = "##SBATCH -e _scheduler_stderr.txt" except Exception as e: print("no_stderr_saved not found") # Create the qsub script rfile = open(lib.runfile, "w") rfile.write("#!/bin/sh\n") if (self.sentToCloud): qos = "amzn" else: qos = self.scheduler_info["queue"] + "-cipres" if self.scheduler_info["queue"] == "compute": qos= "cipres" if self.scheduler_info["queue"] == "large-shared": qos= "large-shared-normal" if (self.sentToCloud): if int(self.scheduler_info["gpu"]) > 0 : text = """ #SBATCH --licenses=cipres:1 #SBATCH --res=cipresres #SBATCH -p %s #SBATCH --qos %s #SBATCH --job-name %s #SBATCH --exclusive #SBATCH --time=00:%d:00 %s %s #SBATCH --export=ALL #SBATCH --mail-user="%s" #SBATCH --mail-type="all" #SBATCH -A %s """ % (self.scheduler_info["queue"], qos, lib.jobname, self.scheduler_info["runtime"], scheduler_stdout_txt, scheduler_stderr_txt, lib.email, self.account) else : if self.scheduler_info["threads_per_process"] <= 4 : vcpu = 8 ctype = 2 nmem = 1 if self.scheduler_info["threads_per_process"] > 4 and self.scheduler_info["threads_per_process"] <= 8 : vcpu = 16 ctype = 4 nmem = 2 if (self.scheduler_info["threads_per_process"] > 8) : vcpu = 48 ctype = 12 nmem = 6 text = """ #SBATCH --licenses=cipres:1 #SBATCH --res=cipresres #SBATCH --no-requeue #SBATCH --nodes=1-1 #SBATCH --cpus-per-task=1 #SBATCH --license=cipres:1 #SBATCH --partition=%s #SBATCH --qos=%s #SBATCH --job-name %s #SBATCH --time=00:%d:00 %s %s #SBATCH --export=ALL #SBATCH --mail-user="%s" #SBATCH --mail-type="all" #SBATCH -A %s #SBATCH --gres=vcpu:c5:%d #SBATCH --constraint=c5.%dxlarge #SBATCH --mem=%d """ % (self.scheduler_info["queue"], qos, lib.jobname, self.scheduler_info["runtime"], scheduler_stdout_txt, scheduler_stderr_txt,lib.email, self.account, vcpu, ctype, 15000*nmem ) # """ % (self.scheduler_info["queue"], qos, lib.jobname, self.scheduler_info["runtime"],scheduler_stdout_txt, scheduler_stderr_txt, lib.email, self.account, self.scheduler_info["threads_per_process"]*2, self.scheduler_info["threads_per_process"]/2, 15000*self.scheduler_info["threads_per_process"]/4 ) # """ % (self.scheduler_info["queue"], qos, lib.jobname, self.scheduler_info["runtime"], lib.email, self.account, self.scheduler_info["threads_per_process"]*2, self.scheduler_info["threads_per_process"]/2, self.scheduler_info["threads_per_process"], 15000*self.scheduler_info["threads_per_process"]/4 ) else: text = """ #SBATCH --licenses=cipres:1 #SBATCH --res=cipresres #SBATCH -p %s #SBATCH --qos %s #SBATCH --job-name %s #SBATCH --time=00:%d:00 %s %s # TODO: make sure umask works, not sure what the corresponding SBATCH cmd is. #PBS -W umask=0007 #SBATCH --export=ALL #SBATCH --mail-user="%s" #SBATCH --mail-type="all" #SBATCH -A %s """ % (self.scheduler_info["queue"], qos, lib.jobname, self.scheduler_info["runtime"], scheduler_stdout_txt, scheduler_stderr_txt,lib.email, self.account) rfile.write(textwrap.dedent(text)) # if self.scheduler_info["gpu"] > 0 : # # TODO: make sure this comes out right. # text = "#SBATCH -N %d\n" % (self.scheduler_info["nodes"]) + \ # "#SBATCH --gres=gpu:p100:%d\n" % (self.scheduler_info["gpu"]) + \ # "#SBATCH --ntasks-per-node=%d\n" % (7 * self.scheduler_info["gpu"]) # else: # # --ntasks-per-node is the (number_mpi_processes * threads_per_process)/nodes = cores_used_per_node # text = "#SBATCH -N %d\n#SBATCH --ntasks-per-node=%d\n" % (self.scheduler_info["nodes"], self.scheduler_info["cores_used_per_node"]) ######################################## # Modified by Tony on 01/18/2019 per # Wayne Pfeiffer's request. # # If scheduler_info["gpu"] is exactly 1, # add "#SBATCH --mem=25G" to the variable text. # # If scheduler_info["gpu"] is greater than 1, # do NOT add "#SBATCH --mem=25G" to the variable text. # # If scheduler_info["gpu"] is missing/not defined, # use scheduler_info["nodes"] and scheduler_info["cores_used_per_node"]. ######################################## cpuCnt = 1 if self.scheduler_info["gpu"] > 0 : cpuCnt = 4 * self.scheduler_info["gpu"] #if self.scheduler_info["gpu"] == 1 : text = "#SBATCH -N %d\n" % (self.scheduler_info["nodes"]) if (self.sentToCloud): text = text + "#SBATCH --gres=gpu:AMZN_V100:%d\n" % (self.scheduler_info["gpu"]) text = text + "#SBATCH --constraint=p3.%dxlarge\n" % (2*self.scheduler_info["gpu"]) else: text = text + "#SBATCH --gpus=%d\n" % (self.scheduler_info["gpu"]) #if self.sentToCloud == False: text = text + "#SBATCH --ntasks-per-node=%d\n" % (10 * self.scheduler_info["gpu"]) #if self.sentToCloud == False and self.scheduler_info["mem"] == "none": # text = text + "#SBATCH --mem=25G\n" #else: # text = "#SBATCH -N %d\n" % (self.scheduler_info["nodes"]) # if (self.sentToCloud): # text = text + "#SBATCH --gres=gpu:AMZN_V100:%d\n" % (self.scheduler_info["gpu"]) # text = text + "#SBATCH --constraint=p3.%dxlarge\n" % (2*self.scheduler_info["gpu"]) # else: # text = text + "#SBATCH --gpus=%d\n" % (self.scheduler_info["gpu"]) # if self.sentToCloud == False : # text = text + "#SBATCH --ntasks-per-node=%d\n" % (10 * self.scheduler_info["gpu"]) else: # --ntasks-per-node is the (number_mpi_processes * threads_per_process)/nodes = cores_used_per_node if self.sentToCloud == False : text = "#SBATCH -N %d\n#SBATCH --ntasks-per-node=%d\n" % (self.scheduler_info["nodes"], self.scheduler_info["cores_used_per_node"]) else : # cpuCnt = 1 text = "" if self.scheduler_info["cpus-per-task"] != 1: if self.sentToCloud == False: text = text + "#SBATCH --cpus-per-task=%d\n" % (self.scheduler_info["cpus-per-task"]) if self.scheduler_info["mem"] != "none": if self.sentToCloud == False: text = text + "#SBATCH --mem=" + self.scheduler_info["mem"] + "\n" # if self.scheduler_info["gpu"] == 1 : # text = "#SBATCH -N %d\n" % (self.scheduler_info["nodes"]) + \ # "#SBATCH --gres=gpu:p100:%d\n" % (self.scheduler_info["gpu"]) + \ # "#SBATCH --ntasks-per-node=%d\n" % (7 * self.scheduler_info["gpu"]) + \ # "#SBATCH --mem=25G\n" # elif self.scheduler_info["gpu"] > 1 : # text = "#SBATCH -N %d\n" % (self.scheduler_info["nodes"]) + \ # "#SBATCH --gres=gpu:p100:%d\n" % (self.scheduler_info["gpu"]) + \ # "#SBATCH --ntasks-per-node=%d\n" % (7 * self.scheduler_info["gpu"]) # else: # # --ntasks-per-node is the (number_mpi_processes * threads_per_process)/nodes = cores_used_per_node # text = "#SBATCH -N %d\n#SBATCH --ntasks-per-node=%d\n" % (self.scheduler_info["nodes"], self.scheduler_info["cores_used_per_node"]) rfile.write(textwrap.dedent(text)) text = "module load slurm\n" rfile.write(textwrap.dedent(text)) # if (lib.sentToCloud(self.cmdline)): # singularity_env = "export SINGULARITYENV_LD_LIBRARY_PATH=\"/opt/beagle/2.1.gnu/lib:/opt/beast/1.8.4/lib\"\n" + \ # "export SINGULARITYENV_PREPEND_PATH=\"/opt/beast/1.8.4/bin\"\n" # rfile.write(textwrap.dedent(singularity_env)) if self.sentToCloud == True : text = "\ninstance_id=`curl -s http://169.254.169.254/latest/meta-data/instance-id`\n" + \ "EC2_AVAIL_ZONE=`curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone`\n" + \ "EC2_REGION=`echo ${EC2_AVAIL_ZONE} | sed 's/[a-z]$//'`\n" + \ "VOLUME_ID=`aws ec2 --region=${EC2_REGION} describe-volumes --filter Name=attachment.instance-id,Values=${instance_id} --query Volumes[].VolumeId --output text`\n" # "echo ${VOLUME_ID} >> ./test_file.txt\n" userid = lib.getJobInfo("User ID") username = lib.getJobInfo("User Name") jobhandle = lib.getJobInfo("JobHandle") taskid = lib.getJobInfo("Task ID") bucketname = lib.getJobInfo("BucketName") if userid is not None: text = text + "aws ec2 create-tags --region=${EC2_REGION} --resources ${instance_id} --tags Key=CI-UserID,Value=" + userid + "\n" text = text + "aws ec2 create-tags --region=${EC2_REGION} --resources ${VOLUME_ID} --tags Key=CI-UserID,Value=" + userid + "\n" if username is not None: text = text + "aws ec2 create-tags --region=${EC2_REGION} --resources ${instance_id} --tags Key=CI-UserName,Value=" + username + "\n" text = text + "aws ec2 create-tags --region=${EC2_REGION} --resources ${VOLUME_ID} --tags Key=CI-UserName,Value=" + username + "\n" if jobhandle is not None: text = text + "aws ec2 create-tags --region=${EC2_REGION} --resources ${instance_id} --tags Key=CI-JobHandle,Value=" + jobhandle + "\n" text = text + "aws ec2 create-tags --region=${EC2_REGION} --resources ${VOLUME_ID} --tags Key=CI-JobHandle,Value=" + jobhandle + "\n" if taskid is not None: text = text + "aws ec2 create-tags --region=${EC2_REGION} --resources ${instance_id} --tags Key=CI-TaskID,Value=" + taskid + "\n" text = text + "aws ec2 create-tags --region=${EC2_REGION} --resources ${VOLUME_ID} --tags Key=CI-TaskID,Value=" + taskid + "\n" # if bucketname is not None: # text = text + "aws s3 sync s3://" + bucketname + " . \n " rfile.write(textwrap.dedent(text)) remotejobcmd = "\nremotejobid=`grep JOBID _JOBINFO.TXT |awk -F '=' '{print($2)}'`\n" + \ "aws ec2 create-tags --region=${EC2_REGION} --resources ${instance_id} --tags Key=CI-RemoteJobID,Value=${remotejobid}\n" + \ "aws ec2 create-tags --region=${EC2_REGION} --resources ${VOLUME_ID} --tags Key=CI-RemoteJobID,Value=${remotejobid}\n\n" rfile.write(textwrap.dedent(remotejobcmd)) # text = "EC2_NAME=`aws ec2 describe-tags --region ${EC2_REGION} --filters Name=resource-id,Values=${instance_id} Name=key,Values=Name --output text | cut -f5`" nodeinfocmd = "date >> ./_NODEINFO.TXT\n" + \ "echo 'instance id: '${instance_id} >> ./_NODEINFO.TXT\n" + \ "echo 'hostname' >> ./_NODEINFO.TXT\n" + \ "hostname 2>&1 >> ./_NODEINFO.TXT\n" + \ "echo 'lscpu' >> ./_NODEINFO.TXT\n" + \ "lscpu 2>&1 >> ./_NODEINFO.TXT\n" + \ "echo '-----------------------' >> ./_NODEINFO.TXT\n" + \ "echo 'lstopo-no-graphics' >> ./_NODEINFO.TXT\n" + \ "lstopo-no-graphics 2>&1 >> ./_NODEINFO.TXT\n" + \ "echo '-----------------------' >> ./_NODEINFO.TXT\n" + \ "echo 'nvidia-smi' >> ./_NODEINFO.TXT\n" + \ "nvidia-smi 2>&1 >> ./_NODEINFO.TXT\n" + \ "echo '-----------------------' >> ./_NODEINFO.TXT\n" + \ "echo 'singularity --version' >> ./_NODEINFO.TXT\n" + \ "singularity --version 2>&1 >> ./_NODEINFO.TXT\n" + \ "echo '-----------------------' >> ./_NODEINFO.TXT\n" + \ "echo 'singularity buildcfg' >> ./_NODEINFO.TXT \n" + \ "singularity buildcfg 2>&1 >> ./_NODEINFO.TXT\n" + \ "echo '-----------------------' >> ./_NODEINFO.TXT\n" # for cc in self.cmdline: # ccitems = cc.split(" ") # for c in ccitems: # if c.endswith('img'): # nodeinfocmd = nodeinfocmd + "echo 'singularity inspect' >> ./_NODEINFO.TXT\n" + \ # "singularity inspect " + c + " 2>&1 >> ./_NODEINFO.TXT\n" + \ # "echo '-----------------' >> ./_NODEINFO.TXT\n" # break beaglecmd = "" for cc in self.cmdline: ccitems = cc.split(" ") for cm in ccitems: # beaglecmd = beaglecmd + cm + " " if cm == 'infile.xml': break beaglecmd = beaglecmd + cm + " " beaglecmd = beaglecmd + " -beagle_info" nodeinfocmd = nodeinfocmd + "echo 'beagle info' >> ./_NODEINFO.TXT\n" + \ beaglecmd + " 2>&1 >> ./_NODEINFO.TXT\n" + \ "echo '-------------------' >> ./_NODEINFO.TXT\n" rfile.write(textwrap.dedent(nodeinfocmd)) # nodeinfocmd = "" # for cmd in self.cmdline: # nodeinfocmd = nodeinfocmd + "echo '" + cmd +"' >> ./_NODEINFO.TXT\n" # rfile.write(textwrap.dedent(nodeinfocmd)) if self.scheduler_info["remove_mv2_param"] == 1: text = """ cd %s curl %s\&status=START export CIPRES_THREADSPP=%d export CIPRES_NP=%d date +'%%s %%a %%b %%e %%R:%%S %%Z %%Y' > start.txt """ % (lib.jobdir, self.url, # curl int(self.scheduler_info["threads_per_process"]), # CIPRES_THREADSPP int(self.scheduler_info["mpi_processes"])) # CIPRES_NP else: text = """ cd %s source /etc/profile.d/modules.sh export MV2_CPU_MAPPING=`genmap -tpr %d` echo "MV2_CPU_MAPPING=$MV2_CPU_MAPPING" curl %s\&status=START export CIPRES_THREADSPP=%d export CIPRES_NP=%d date +'%%s %%a %%b %%e %%R:%%S %%Z %%Y' > start.txt """ % (lib.jobdir, int(self.scheduler_info["threads_per_process"]), # genmap -tpr self.url, # curl int(self.scheduler_info["threads_per_process"]), # CIPRES_THREADSPP int(self.scheduler_info["mpi_processes"])) # CIPRES_NP rfile.write(textwrap.dedent(text)) if self.isSimpleWorkflow(): if (len(self.cmdfileList) > 1): thereIsMore = "&&" else: thereIsMore = "" if (self.sentToCloud): # if int(self.scheduler_info["gpu"]) >= 1 : text = """ srun -u --epilog=%s -N %d -n %d %s %s %s %s """ % ( lib.epilogue, # srun --epilog int(self.scheduler_info["nodes"]), # srun -N int(self.scheduler_info["mpi_processes"]), # srun -n self.cmdfileList[0], stdout_txt, stderr_txt, thereIsMore ) # else : # text = """ # srun -u --epilog=%s %s 1>stdout.txt 2>stderr.txt %s # """ % ( lib.epilogue, # srun --epilog # self.cmdfileList[0], # thereIsMore # ) else : text = """ srun -u --epilog=%s -N %d -n %d --cpus-per-task=%d --mpi=pmi2 --kill-on-bad-exit=1 %s %s %s %s """ % ( lib.epilogue, # srun --epilog int(self.scheduler_info["nodes"]), # srun -N int(self.scheduler_info["mpi_processes"]), # srun -n int(self.scheduler_info["threads_per_process"]), # srun --cpus-per-task self.cmdfileList[0], stdout_txt, stderr_txt, thereIsMore ) rfile.write(textwrap.dedent(text)) # Write the rest of the commands. Starting with the 2nd one (we already did the first above). count = 2 for f in self.cmdfileList[1:] : if (len(self.cmdfileList) > count): thereIsMore = "&&" else: thereIsMore = "" if (self.sentToCloud): # if int(self.scheduler_info["gpu"]) >= 1 : text = """ srun -u --epilog=%s -N 1 -n 1 --mpi=pmi2 %s %s %s """ % ( lib.epilogue,f, stdout_txt_2, stderr_txt_2, thereIsMore) # else : # text = """ # srun -u --epilog=%s --mpi=pmi2 %s 1>>stdout.txt 2>>stderr.txt %s # """ % ( lib.epilogue,f, thereIsMore) else : text = """ srun -u --epilog=%s -N 1 -n 1 --cpus-per-task=%d --mpi=pmi2 --kill-on-bad-exit=1 %s %s %s %s """ % ( lib.epilogue, int(self.scheduler_info["threads_per_process"]), f, stdout_txt_2, stderr_txt_2, thereIsMore) rfile.write(textwrap.dedent(text)) count = count + 1 elif self.isMpiComplexWorkflow(): if (self.sentToCloud): # if int(self.scheduler_info["gpu"]) >= 1 : text = """ export PBS_NODEFILE=`/usr/bin/generate_pbs_nodefile` cp $PBS_NODEFILE __hostfile.txt srun -u --epilog=%s -N 1 -n 1 %s %s %s """ % ( lib.epilogue, lib.cmdfile, stdout_txt, stderr_txt) # else : # text = """ # export PBS_NODEFILE=`/usr/bin/generate_pbs_nodefile` # cp $PBS_NODEFILE __hostfile.txt # srun -u --epilog=%s %s 1>stdout.txt 2>stderr.txt # """ % ( lib.epilogue, lib.cmdfile) else : text = """ export PBS_NODEFILE=`/usr/bin/generate_pbs_nodefile` cp $PBS_NODEFILE __hostfile.txt srun -u --epilog=%s -N 1 -n 1 --cpus-per-task=1 --kill-on-bad-exit=1 %s %s %s """ % ( lib.epilogue, lib.cmdfile, stdout_txt, stderr_txt) rfile.write(textwrap.dedent(text)) else : if (self.sentToCloud): # if int(self.scheduler_info["gpu"]) >= 1 : text = """ srun -u --epilog=%s -N %d -n %d %s %s %s %s """ % ( lib.epilogue, # srun --epilog int(self.scheduler_info["nodes"]), # srun -N int(self.scheduler_info["mpi_processes"]), # srun -n "", lib.cmdfile, # cmd to run (this is the %s, right before ">") stdout_txt, stderr_txt) # else : # text = """ # srun -u --epilog=%s -N 1 -n 1 %s %s 1>stdout.txt 2>stderr.txt # """ % ( lib.epilogue, # srun --epilog # "", # lib.cmdfile) # cmd to run (this is the %s, right before ">") else : text = """ srun -u --epilog=%s -N %d -n %d %s --kill-on-bad-exit=1 %s %s %s """ % ( lib.epilogue, # srun --epilog int(self.scheduler_info["nodes"]), # srun -N int(self.scheduler_info["mpi_processes"]), # srun -n "--mpi=pmi2", lib.cmdfile, # cmd to run (this is the %s, right before ">") stdout_txt, stderr_txt) rfile.write(textwrap.dedent(text)) rfile.write("\nretval=$?\n") rfile.write("chmod a+r *.*\n") rfile.write("date +'%s %a %b %e %R:%S %Z %Y' > done.txt\n") rfile.write('echo "retval=$retval">> done.txt\n') rfile.write("curl %s\&status=DONE\n" % self.url) # if lib.sentToCloud(self.cmdline) == True : # text = "aws ec2 delete-tags --region=${EC2_REGION} --resources ${instance_id} ${VOLUME_ID} --tags Key=CI-UserID Key=CI-JobHandle Key=CI-RemoteJobID Key=CI-TaskID\n" # rfile.write(textwrap.dedent(text)) #for ccc in self.cmdline: # rfile.write(ccc) # remotejobcmd = "\nremotejobid=`grep JOBID _JOBINFO.TXT |awk -F '=' '{print($2)}'`\n" + \ # "aws ec2 create-tags --region=${EC2_REGION} --resources ${instance_id} --tags Key=CI-RemoteJobID,Value=${remotejobid}\n\n" # rfile.write(textwrap.dedent(remotejobcmd)) # nodeinfocmd = "echo 'lscpu' >> ./_NODEINFO.TXT\n" + \ # "lscpu 2>&1 >> ./_NODEINFO.TXT\n" + \ # "echo '-----------------------' >> ./_NODEINFO.TXT\n" + \ # "echo 'lstopo-no-graphics' >> ./_NODEINFO.TXT\n" + \ # "lstopo-no-graphics 2>&1 >> ./_NODEINFO.TXT\n" + \ # "echo '-----------------------' >> ./_NODEINFO.TXT\n" + \ # "echo 'nvidia-smi' >> ./_NODEINFO.TXT\n" + \ # "nvidia-smi 2>&1 >> ./_NODEINFO.TXT\n" + \ # "echo '-----------------------' >> ./_NODEINFO.TXT\n" + \ # "echo 'singularity --version' >> ./_NODEINFO.TXT\n" + \ # "singularity --version 2>&1 >> ./_NODEINFO.TXT\n" + \ # "echo '-----------------------' >> ./_NODEINFO.TXT\n" + \ # "echo 'singularity buildcfg' >> ./_NODEINFO.TXT \n" + \ # "singularity buildcfg 2>&1 >> ./_NODEINFO.TXT\n" + \ # "echo '-----------------------' >> ./_NODEINFO.TXT\n" #cmds = self.cmdline.split(" ") # for cc in self.cmdline: # ccitems = cc.split(" ") # for c in ccitems: # if c.endswith('img'): # nodeinfocmd = nodeinfocmd + "echo 'singularity inspect' >> ./_NODEINFO.TXT\n" + \ # "singularity inspect " + c + " 2>&1 >> ./_NODEINFO.TXT\n" + \ # "echo '-----------------' >> ./_NODEINFO.TXT\n" ## break # # beaglecmd = "" # for cc in self.cmdline: # ccitems = cc.split(" ") # for cm in ccitems: # beaglecmd = beaglecmd + cm + " " # if cm == 'beast': # break # beaglecmd = beaglecmd + " -beagle_info" # nodeinfocmd = nodeinfocmd + "echo 'beagle info' >> ./_NODEINFO.TXT\n" + \ # beaglecmd + " 2>&1 >> ./_NODEINFO.TXT\n" + \ # "echo '-------------------' >> ./_NODEINFO.TXT\n" # # rfile.write(textwrap.dedent(nodeinfocmd)) rfile.write("exit $retval") rfile.close() lib.log("./_JOBINFO.TXT","\nChargeFactor=%f\ncores=%i" % ( self.scheduler_info.get('ChargeFactor',1.0), self.scheduler_info['cores_used']) ) # print "made it to end" # sys.exit(0) #if lib.is_cloudeligible == "true" : # rfile = open(lib.nodeinfofile, "w") # nodeinfocmd = "echo 'lscpu'\n" + \ # "lscpu 2>&1 >> ./_NODEINFO.TXT\n" + \ # "echo '-----------------------\n" + \ # "echo 'lstopo-no-graphics'\n" + \ # "lstopo-no-graphics 2>&1 >> ./_NODEINFO.TXT\n" + \ # "echo '-----------------------\n" + \ # "echo 'nvidia-smi'\n" + \ # "nvidia-sm\n" + \ # "echo '-----------------------\n" + \ # "echo 'singularity --version'\n" + \ # "singularity --version\n" + \ # "echo '-----------------------\n" + \ # "echo 'singularity buildcfg'\n" + \ # "singularity buildcfg\n" + \ # "echo '-----------------------\n" # rfile.write(textwrap.dedent(nodeinfocmd)) ret = lib.submitJob() #ret = 0 #if lib.is_cloudeligible == "true" : # remotejobid = lib.remotejobid # if remotejobid is not None: # cmd = "instance_id=`curl -s http://169.254.169.254/latest/meta-data/instance-id`\n" + \ # "EC2_AVAIL_ZONE=`curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone`\n" + \ # "EC2_REGION=`echo ${EC2_AVAIL_ZONE} | sed 's/[a-z]$//'`\n" + \ # rfile = open(lib.runfile, "w") # remoteidcmd = "aws ec2 create-tags --region=${EC2_REGION} --resources ${instance_id} --tags Key=CI-RemoteJobID,Value=" + remotejobid + "\n" # rfile.close() # os.system(cmd) return ret def isWorkflow(self): return self.isSimpleWorkflow() or self.isMpiComplexWorkflow() def isSimpleWorkflow(self): return self.scheduler_info["workflow_type"] == "simple" def isMpiComplexWorkflow(self): return self.scheduler_info["workflow_type"] == "mpi_complex" def main(argv=None): """ Usage is: submit.py [--account ] [--url ] -- Run from the working dir of the job which must contain (in addition to the job files) a file named scheduler.conf with scheduler properties for the job. , if present, gives the project to charge the job to. Url is the url of the submitting website including the taskid parameter. Returns 0 with "jobid=" on stdout if job submitted ok Returns 1 with multiline error message on stdout if error. Returns 2 for the specific error of queue limit exceeded. """ submission = Submission(argv) submission.submitBatchScript() if __name__ == "__main__": sys.exit(main())