#!/usr/bin/python import datetime import getpass import os import re import shutil import smtplib import socket import subprocess import sys from email.mime.text import MIMEText if "ST_AUTH" in os.environ: del os.environ['ST_AUTH'] if "ST_USER" in os.environ: del os.environ['ST_USER'] if "ST_KEY" in os.environ: del os.environ['ST_KEY'] class CipresError(Exception): pass def call_swift(*arguments): # swift_args = ["/usr/local/bin/swift", "-A", "https://cloud.sdsc.edu/auth/v1.0", "-U", "cipres:mmiller", "-K", "caslpefL"] swift_args = ["/usr/bin/swift", "--os-auth-url", "https://identity.cloud.sdsc.edu:5000/v3", "--auth-version", "3", "--os-project-name", "cipres", "--os-username", "mmiller@ucsd.edu", "--os-password", "onTzMvJ4"] for arg in arguments: swift_args.append(arg) process = subprocess.Popen(swift_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (out, err) = process.communicate() if err: raise CipresError(err) return out def read_job_file(dirname): pair_pattern = re.compile("\\s*=\\s*") job_file = open("%s/_JOBINFO.TXT" % (dirname)) job_handle = None username = None user_email = None try: while True: container = job_file.readline(); if not container: break pair = pair_pattern.split(container, 1) if len(pair) == 2: if pair[0] == "JobHandle": job_handle = pair[1].rstrip() elif pair[0] == "email": user_email = pair[1].rstrip() index = user_email.find("@") username = user_email[:index] finally: job_file.close() if job_handle is None or username is None or user_email is None: raise CipresError("_JOBINFO.TXT is corrupted") return job_handle, username, user_email def container_exists(dirname): swift_output = call_swift("list") if re.search("\\b" + dirname + "\\b", swift_output): return True return False def upload_files(source_dir): archive_dir = "/projects/ps-ngbt/zipfiles" new_containers = [ ] uploads = [ ] errors = [ ] os.chdir(source_dir) job_dirs = os.listdir(".") for dirname in job_dirs: try: job_handle, username, user_email = read_job_file(dirname) job_dir = "%s/%s" % (source_dir, dirname) user_archive_dir = "%s/%s" % (archive_dir, username) archive_file = "%s.zip" % (job_handle) output_file = "%s/%s" % (user_archive_dir, archive_file) upload_name = "" new_user_container = False if not container_exists(username): call_swift("post", "-r", ".r:*,.rlistings", "-m", "Web-Listings: true", username) new_user_container = True if not os.path.exists(user_archive_dir): os.mkdir(user_archive_dir, 0775) subprocess.check_output(["jar", "cvMf", output_file, dirname], stderr=subprocess.STDOUT) os.chdir(user_archive_dir) if os.path.getsize(archive_file) < 4294967296: call_swift("upload", "-c", username, archive_file) upload_name = "file %s" % (archive_file) else: os.remove(archive_file) index = job_handle.rfind("-") result_dir = "Job_%s" % (job_handle[index + 1:index + 5]) if os.path.exists(result_dir): shutil.rmtree(result_dir) os.mkdir(result_dir, 0775) os.chdir(job_dir) job_files = os.listdir(".") for filename in job_files: if os.path.getsize(filename) > 805306368: output_file = "%s/%s/%s.zip" % (user_archive_dir, result_dir, filename) subprocess.check_output(["jar", "cvMf", output_file, filename], stderr=subprocess.STDOUT) os.remove(filename) os.chdir(source_dir) output_file = "%s/%s/%s" % (user_archive_dir, result_dir, archive_file) subprocess.check_output(["jar", "cvMf", output_file, dirname], stderr=subprocess.STDOUT) os.chdir(user_archive_dir) call_swift("upload", "-c", username, result_dir) upload_name = "directory %s" % (result_dir) if new_user_container: new_containers.append("Directory %s created for user %s" % (username, user_email)) uploads.append("The %s was written to directory %s for user %s" % (upload_name, username, user_email)) shutil.rmtree(job_dir) except subprocess.CalledProcessError as process_err: errors.append("Upload for %s failed: %s\n" % (dirname, process_err.output)) except (IOError, OSError, CipresError) as err: errors.append("Upload for %s failed: %s\n" % (dirname, str(err))) os.chdir(source_dir) result = "From %s:\n" % (source_dir) if len(new_containers) > 0: result += "\n" result += "\n".join(new_containers) result += "\n" if len(uploads) > 0: result += "\n" result += "\n".join(uploads) result += "\n" if len(errors) > 0: result += "\n" result += "\n".join(errors) result += "\n" return result def already_running(script_name): command = "ps ux | grep %s | grep -v grep" % (script_name) process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) (out, err) = process.communicate() if err: raise CipresError(err) process_info = out.splitlines() if len(process_info) < 2: return None pid = os.getpid() pidlist = None for line in process_info: fields = line.split() if int(fields[1]) != pid: if pidlist == None: pidlist = fields[1] else: pidlist += "," pidlist += fields[1] command = "ps ux -p %s -ww --forest" % (pidlist) process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) (out, err) = process.communicate() if err: raise CipresError(err) return out def send_message(body): username = getpass.getuser() hostname = socket.getfqdn() sender = "%s@%s" % (username, hostname) reciever = "mmiller@sdsc.edu" message = MIMEText(body) message["Subject"] = "large file upload activity" message["From"] = "%s <%s>" % (username, sender) message["To"] = "Mark Miller <" + reciever + ">" mail_server = smtplib.SMTP("outbound.ucsd.edu") try: mail_server.sendmail(sender, [ reciever ], message.as_string()) finally: mail_server.quit() def main(argv): instance_info = already_running(argv[0]) if instance_info: body = "%s is already running:\n\n%s" % (argv[0], instance_info) send_message(body) return comet_result = upload_files("/archive/science/ngbt/backend/comet_workspace/MANUAL") gordon_result = upload_files("/archive/science/ngbt/backend/gordon_workspace/MANUAL") tscc_result = upload_files("/archive/science/ngbt/backend/tscc_workspace/MANUAL") today = datetime.date.today().strftime("%B %d, %Y") body = "Upload activity for %s:\n\n\n" % (today) if len(comet_result) > 0: body += "%s\n\n" % (comet_result) if len(tscc_result) > 0: body += "%s\n\n" % (tscc_result) if len(gordon_result) > 0: body += gordon_result send_message(body) if __name__ == "__main__": sys.exit(main(sys.argv))