#!/usr/bin/env python """ """ import re import sys import os class FastaExtract: debug = False def __init__(self, filename, fixedLineLength = True, smallSequences = True): self.filename = filename self.indexFilename = os.path.basename(filename + ".idx") if FastaExtract.debug: print "indexFilename is " + self.indexFilename self.fp = None self.index = {} if FastaExtract.debug and os.path.exists(self.indexFilename): if FastaExtract.debug: print "reading existing index" self.__readIndex() else: if FastaExtract.debug: print "creating new index" self.__makeIndex() if FastaExtract.debug: print "storing index" self.__writeIndex() self.fixedLineLength = fixedLineLength if self.fixedLineLength: self.__setFixedLineLength() def close(self): if self.fp: self.fp.close() self.fp = None self.index = {} def __setFixedLineLength(self): self.fp.seek(0, 0) line = self.fp.readline(); self.start = self.fp.tell(); line = self.fp.readline(); line = line.rstrip("\n"); self.lineLength = len(line); self.fp.seek(0, 0) def __readIndex(self): fp = open(self.indexFilename, "r") for line in fp: fields = line.split() self.index[fields[0]] = (int(fields[1]), int(fields[2])) fp.close() # print self.index self.fp = open(self.filename, "r") def __writeIndex(self): fp = open(self.indexFilename, "w") for item in self.index: fp.write("%s\t%d\t%d\n" % (item, self.index[item][0], self.index[item][1])) fp.close() def __makeIndex(self): self.fp = open(self.filename, "r") expression = re.compile(r">(\S+).*") contigId = None sequenceStart = 0 while True: line = self.fp.readline() # EOF handling if not line: if contigId: if contigId in self.index: raise Exception("key %s is not unique in %s" % (contigId, self.filename)) self.index[contigId] = (start, self.fp.tell()) break match = expression.match(line) if match: if contigId: if contigId in self.index: raise Exception("key %s is not unique in %s" % (contigId, self.filename)) self.index[contigId] = (start, self.fp.tell() - 1) contigId = match.group(1) start = self.fp.tell() self.fp.seek(0, 0) # print self.index def seqLength(self, indexEntry): return (indexEntry[1] - indexEntry[0]) + 1 def getSequence1Base(self, id, offset, length): return self.getSequence(id, offset - 1, length) def getSequence(self, id, offset, length): if self.fixedLineLength: return self.__getSequenceFixedLineLen(id, offset, length) return self.__getSequenceVariableLineLen(id, offset, length) # Don't want to do this longer sequences but it's ok with the contig file def __getSequenceVariableLineLen(self, id, offset, length): # read in the whole sequence self.fp.seek(self.index[id][0], 0) buffer = self.fp.read(self.seqLength(self.index[id])) # Get rid of the newlines (maybe look for other whitespace too?) retval = buffer.replace("\n", "") # Return just the requested segment return retval[offset: (offset + length)] def __getSequenceFixedLineLen(self, id, offset, length): start = offset + int(offset / self.lineLength) + self.index[id][0] self.fp.seek(start, 0) got = total = 0 result = [] while got < length and (total < self.seqLength(self.index[id])) : ch = self.fp.read(1) total += 1 if (ch == '\n'): continue result.append(ch) got += 1 return ''.join(result) def complement(seq): complement = {'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A', 'N': 'N'} complseq = [complement[base] for base in seq] return complseq def reverse_complement(seq): seq = list(seq) seq.reverse() return ''.join(complement(seq)) def testit(fe): s = fe.getSequence("1", 0, 5) print "1, 0 for 5: %s" % s s = fe.getSequence("3", 0, 5) print "3, 0 for 5: %s" % s s = fe.getSequence("3", 47, 10) print "3, 47 for 10: %s" % s def main(argv=None): # If debug is true and test.fa.idx doesn't exist, the first # time we create FastaExtract we'll write out the idx and the 2nd # time we'll read it in and use it instead of creating a new one. FastaExtract.debug = True # fe = FastaExtract("test.fa") fe = FastaExtract("test.fa", fixedLineLength = False) testit(fe) fe = FastaExtract("test.fa", fixedLineLength = False) testit(fe) print reverse_complement("ACT") if __name__ == "__main__": sys.exit(main())