1
2
3
4
5 """Implementation of sequence motifs (PRIVATE).
6 """
7 from Bio.Seq import Seq
8 from Bio.SubsMat import FreqTable
9 from Bio.Alphabet import IUPAC
10 import math,random
11
13 """
14 A class representing sequence motifs.
15 """
17 self.instances = []
18 self.has_instances=False
19 self.counts = {}
20 self.has_counts=False
21 self.mask = []
22 self._pwm_is_current = False
23 self._pwm = []
24 self._log_odds_is_current = False
25 self._log_odds = []
26 self.alphabet=alphabet
27 self.length=None
28 self.background=dict((n, 1.0/len(self.alphabet.letters)) \
29 for n in self.alphabet.letters)
30 self.beta=1.0
31 self.info=None
32 self.name=""
33
35 if self.length==None:
36 self.length = len
37 elif self.length != len:
38 print "len",self.length,self.instances, len
39 raise ValueError("You can't change the length of the motif")
40
46
64
65
67 """
68 sets the mask for the motif
69
70 The mask should be a string containing asterisks in the position of significant columns and spaces in other columns
71 """
72 self._check_length(len(mask))
73 self.mask=[]
74 for char in mask:
75 if char=="*":
76 self.mask.append(1)
77 elif char==" ":
78 self.mask.append(0)
79 else:
80 raise ValueError("Mask should contain only '*' or ' ' and not a '%s'"%char)
81
82 - def pwm(self,laplace=True):
83 """
84 returns the PWM computed for the set of instances
85
86 if laplace=True (default), pseudocounts equal to self.background multiplied by self.beta are added to all positions.
87 """
88
89 if self._pwm_is_current:
90 return self._pwm
91
92 self._pwm = []
93 for i in xrange(self.length):
94 dict = {}
95
96 for letter in self.alphabet.letters:
97 if laplace:
98 dict[letter]=self.beta*self.background[letter]
99 else:
100 dict[letter]=0.0
101 if self.has_counts:
102
103 for letter in self.alphabet.letters:
104 dict[letter]+=self.counts[letter][i]
105 elif self.has_instances:
106
107 for seq in self.instances:
108
109 dict[seq[i]]+=1
110 self._pwm.append(FreqTable.FreqTable(dict,FreqTable.COUNT,self.alphabet))
111 self._pwm_is_current=1
112 return self._pwm
113
115 """
116 returns the logg odds matrix computed for the set of instances
117 """
118
119 if self._log_odds_is_current:
120 return self._log_odds
121
122 self._log_odds = []
123 pwm=self.pwm(laplace)
124 for i in xrange(self.length):
125 d = {}
126 for a in self.alphabet.letters:
127 d[a]=math.log(pwm[i][a]/self.background[a],2)
128 self._log_odds.append(d)
129 self._log_odds_is_current=1
130 return self._log_odds
131
133 """Method returning the information content of a motif.
134 """
135 res=0
136 pwm=self.pwm()
137 for i in range(self.length):
138 res+=2
139 for a in self.alphabet.letters:
140 if pwm[i][a]!=0:
141 res+=pwm[i][a]*math.log(pwm[i][a],2)
142 return res
143
145 """
146 Computes expected score of motif's instance and its standard deviation
147 """
148 exs=0.0
149 var=0.0
150 pwm=self.pwm()
151 for i in range(self.length):
152 ex1=0.0
153 ex2=0.0
154 for a in self.alphabet.letters:
155 if pwm[i][a]!=0:
156 ex1+=pwm[i][a]*(math.log(pwm[i][a],2)-math.log(self.background[a],2))
157 ex2+=pwm[i][a]*(math.log(pwm[i][a],2)-math.log(self.background[a],2))**2
158 exs+=ex1
159 var+=ex2-ex1**2
160 if st_dev:
161 return exs,math.sqrt(var)
162 else:
163 return exs
164
166 """
167 a generator function, returning found positions of instances of the motif in a given sequence
168 """
169 if not self.has_instances:
170 raise ValueError ("This motif has no instances")
171 for pos in xrange(0,len(sequence)-self.length+1):
172 for instance in self.instances:
173 if instance.tostring()==sequence[pos:pos+self.length].tostring():
174 yield(pos,instance)
175 break
176
177 - def score_hit(self,sequence,position,normalized=0,masked=0):
178 """
179 give the pwm score for a given position
180 """
181 lo=self.log_odds()
182 score = 0.0
183 for pos in xrange(self.length):
184 a = sequence[position+pos]
185 if not masked or self.mask[pos]:
186 try:
187 score += lo[pos][a]
188 except:
189 pass
190 if normalized:
191 if not masked:
192 score/=self.length
193 else:
194 score/=len([x for x in self.mask if x])
195 return score
196
197 - def search_pwm(self,sequence,normalized=0,masked=0,threshold=0.0,both=True):
198 """
199 a generator function, returning found hits in a given sequence with the pwm score higher than the threshold
200 """
201 if both:
202 rc = self.reverse_complement()
203
204 sequence=sequence.tostring().upper()
205 for pos in xrange(0,len(sequence)-self.length+1):
206 score = self.score_hit(sequence,pos,normalized,masked)
207 if score > threshold:
208 yield (pos,score)
209 if both:
210 rev_score = rc.score_hit(sequence,pos,normalized,masked)
211 if rev_score > threshold:
212 yield (-pos,rev_score)
213
215 """
216 return the similarity score based on pearson correlation for the given motif against self.
217
218 We use the Pearson's correlation of the respective probabilities.
219 """
220
221 if self.alphabet != motif.alphabet:
222 raise ValueError("Cannot compare motifs with different alphabets")
223
224 max_p=-2
225 for offset in range(-self.length+1,motif.length):
226 if offset<0:
227 p = self.dist_pearson_at(motif,-offset)
228 else:
229 p = motif.dist_pearson_at(self,offset)
230
231 if max_p<p:
232 max_p=p
233 max_o=-offset
234 return 1-max_p,max_o
235
237 sxx = 0
238 sxy = 0
239 sx = 0
240 sy = 0
241 syy = 0
242 norm=max(self.length,offset+motif.length)
243
244 for pos in range(max(self.length,offset+motif.length)):
245 for l in self.alphabet.letters:
246 xi = self[pos][l]
247 yi = motif[pos-offset][l]
248 sx = sx + xi
249 sy = sy + yi
250 sxx = sxx + xi * xi
251 syy = syy + yi * yi
252 sxy = sxy + xi * yi
253
254 norm *= len(self.alphabet.letters)
255 s1 = (sxy - sx*sy*1.0/norm)
256 s2 = (norm*sxx - sx*sx*1.0)*(norm*syy- sy*sy*1.0)
257 return s1/math.sqrt(s2)
258
260 """
261 A similarity measure taking into account a product probability of generating overlaping instances of two motifs
262 """
263 max_p=0.0
264 for offset in range(-self.length+1,other.length):
265 if offset<0:
266 p = self.dist_product_at(other,-offset)
267 else:
268 p = other.dist_product_at(self,offset)
269 if max_p<p:
270 max_p=p
271 max_o=-offset
272 return 1-max_p/self.dist_product_at(self,0),max_o
273
282
284 r"""Calculates the DPQ distance measure between motifs.
285
286 It is calculated as a maximal value of DPQ formula (shown using LaTeX
287 markup, familiar to mathematicians):
288
289 \sqrt{\sum_{i=1}^{alignment.len()} \sum_{k=1}^alphabet.len() \
290 \{ m1[i].freq(alphabet[k])*log_2(m1[i].freq(alphabet[k])/m2[i].freq(alphabet[k])) +
291 m2[i].freq(alphabet[k])*log_2(m2[i].freq(alphabet[k])/m1[i].freq(alphabet[k]))
292 }
293
294 over possible non-spaced alignemts of two motifs. See this reference:
295
296 D. M Endres and J. E Schindelin, "A new metric for probability
297 distributions", IEEE transactions on Information Theory 49, no. 7
298 (July 2003): 1858-1860.
299 """
300
301 min_d=float("inf")
302 min_o=-1
303 d_s=[]
304 for offset in range(-self.length+1,other.length):
305
306 if offset<0:
307 d = self.dist_dpq_at(other,-offset)
308 overlap = self.length+offset
309 else:
310 d = other.dist_dpq_at(self,offset)
311 overlap = other.length-offset
312 overlap = min(self.length,other.length,overlap)
313 out = self.length+other.length-2*overlap
314
315
316 d = (d/(out+overlap))*(2*overlap+out)/(2*overlap)
317
318 d_s.append((offset,d))
319 if min_d> d:
320 min_d=d
321 min_o=-offset
322 return min_d,min_o
323
325 """
326 calculates the dist_dpq measure with a given offset.
327
328 offset should satisfy 0<=offset<=len(self)
329 """
330 def dpq (f1,f2,alpha):
331 s=0
332 for n in alpha.letters:
333 avg=(f1[n]+f2[n])/2
334 s+=f1[n]*math.log(f1[n]/avg,2)+f2[n]*math.log(f2[n]/avg,2)
335 return math.sqrt(s)
336
337 s=0
338 for i in range(max(self.length,offset+other.length)):
339 f1=self[i]
340 f2=other[i-offset]
341 s+=dpq(f1,f2,self.alphabet)
342 return s
343
345 """Reads the motif from the stream (in AlignAce format).
346
347 the self.alphabet variable must be set beforehand.
348 If the last line contains asterisks it is used for setting mask
349 """
350
351 while 1:
352 ln = stream.readline()
353 if "*" in ln:
354 self.set_mask(ln.strip("\n\c"))
355 break
356 self.add_instance(Seq(ln.strip(),self.alphabet))
357
359 """ string representation of a motif.
360 """
361 str = ""
362 for inst in self.instances:
363 str = str + inst.tostring() + "\n"
364
365 if masked:
366 for i in xrange(self.length):
367 if self.mask[i]:
368 str = str + "*"
369 else:
370 str = str + " "
371 str = str + "\n"
372 return str
373
375 """return the length of a motif
376
377 Please use this method (i.e. invoke len(m)) instead of refering to the m.length directly.
378 """
379 if self.length==None:
380 return 0
381 else:
382 return self.length
383
385 """
386 writes the motif to the stream
387 """
388
389 stream.write(self.__str__())
390
391
392
394 """
395 FASTA representation of motif
396 """
397 if not self.has_instances:
398 self.make_instances_from_counts()
399 str = ""
400 for i,inst in enumerate(self.instances):
401 str = str + ">instance%d\n"%i + inst.tostring() + "\n"
402
403 return str
404
406 """
407 Gives the reverse complement of the motif
408 """
409 res = Motif()
410 if self.has_instances:
411 for i in self.instances:
412 res.add_instance(i.reverse_complement())
413 else:
414 res.has_counts=True
415 res.counts["A"]=self.counts["T"][:]
416 res.counts["T"]=self.counts["A"][:]
417 res.counts["G"]=self.counts["C"][:]
418 res.counts["C"]=self.counts["G"][:]
419 res.counts["A"].reverse()
420 res.counts["C"].reverse()
421 res.counts["G"].reverse()
422 res.counts["T"].reverse()
423 res.length=self.length
424 res.mask = self.mask
425 return res
426
427
429 """
430 reads the motif from Jaspar .pfm file
431
432 The instances are fake, but the pwm is accurate.
433 """
434 return self._from_horiz_matrix(stream,letters="ACGT",make_instances=make_instances)
435
456
458 """reads a horizontal count matrix from stream and fill in the counts.
459 """
460 if letters==None:
461 letters=self.alphabet.letters
462 self.counts = {}
463 self.has_counts=True
464
465 for i in letters:
466 ln = stream.readline().strip().split()
467
468 if ln[0]==i:
469 ln=ln[1:]
470
471 try:
472 self.counts[i]=map(int,ln)
473 except ValueError:
474 self.counts[i]=map(float,ln)
475
476
477 s = sum(self.counts[nuc][0] for nuc in letters)
478 l = len(self.counts[letters[0]])
479 self.length=l
480 self.set_mask("*"*l)
481 if make_instances==True:
482 self.make_instances_from_counts()
483 return self
484
485
487 """Creates "fake" instances for a motif created from a count matrix.
488
489 In case the sums of counts are different for different columnes, the
490 shorter columns are padded with background.
491 """
492 alpha="".join(self.alphabet.letters)
493
494 col=[]
495 self.has_instances=True
496 self.instances=[]
497 s = sum(map(lambda nuc: self.counts[nuc][0],self.alphabet.letters))
498 for i in range(self.length):
499 col.append("")
500 for n in self.alphabet.letters:
501 col[i] = col[i]+ (n*(self.counts[n][i]))
502 if len(col[i])<s:
503 print "WARNING, column too short",len(col[i]),s
504 col[i]+=(alpha*s)[:(s-len(col[i]))]
505
506
507 for i in range(s):
508 inst=""
509 for j in range(self.length):
510 inst+=col[j][i]
511
512 inst=Seq(inst,self.alphabet)
513 self.add_instance(inst)
514 return self.instances
515
517 """Creates the count matrix for a motif with instances.
518
519 """
520
521
522 counts={}
523 for a in self.alphabet.letters:
524 counts[a]=[]
525 self.has_counts=True
526 s = len(self.instances)
527 for i in range(self.length):
528 ci = dict((a,0) for a in self.alphabet.letters)
529 for inst in self.instances:
530 ci[inst[i]]+=1
531 for a in self.alphabet.letters:
532 counts[a].append(ci[a])
533 self.counts=counts
534 return counts
535
537 """
538 reads the motif from Jaspar .sites file
539
540 The instances and pwm are OK.
541 """
542
543 while True:
544 ln = stream.readline()
545 if ln=="" or ln[0]!=">":
546 break
547
548 ln=stream.readline().strip()
549 i=0
550 while ln[i]==ln[i].lower():
551 i+=1
552 inst=""
553 while i<len(ln) and ln[i]==ln[i].upper():
554 inst+=ln[i]
555 i+=1
556 inst=Seq(inst,self.alphabet)
557 self.add_instance(inst)
558
559 self.set_mask("*"*len(inst))
560 return self
561
562
564 """Returns the probability distribution over symbols at a given position, padding with background.
565
566 If the requested index is out of bounds, the returned distribution comes from background.
567 """
568 if index in range(self.length):
569 return self.pwm()[index]
570 else:
571 return self.background
572
574 """Returns the consensus sequence of a motif.
575 """
576 res=""
577 for i in range(self.length):
578 max_f=0
579 max_n="X"
580 for n in sorted(self[i]):
581 if self[i][n]>max_f:
582 max_f=self[i][n]
583 max_n=n
584 res+=max_n
585 return Seq(res,self.alphabet)
586
588 """returns the least probable pattern to be generated from this motif.
589 """
590 res=""
591 for i in range(self.length):
592 min_f=10.0
593 min_n="X"
594 for n in sorted(self[i]):
595 if self[i][n]<min_f:
596 min_f=self[i][n]
597 min_n=n
598 res+=min_n
599 return Seq(res,self.alphabet)
600
602 """Maximal possible score for this motif.
603
604 returns the score computed for the consensus sequence.
605 """
606 return self.score_hit(self.consensus(),0)
607
609 """Minimal possible score for this motif.
610
611 returns the score computed for the anticonsensus sequence.
612 """
613 return self.score_hit(self.anticonsensus(),0)
614
615 - def weblogo(self,fname,format="PNG",**kwds):
616 """
617 uses the Berkeley weblogo service to download and save a weblogo of itself
618
619 requires an internet connection.
620 The parameters from **kwds are passed directly to the weblogo server.
621 """
622 import urllib
623 import urllib2
624 al= self._to_fasta()
625 url = 'http://weblogo.berkeley.edu/logo.cgi'
626 values = {'sequence' : al,
627 'format' : format,
628 'logowidth' : '18',
629 'logoheight' : '5',
630 'logounits' : 'cm',
631 'kind' : 'AUTO',
632 'firstnum' : "1",
633 'command' : 'Create Logo',
634 'smallsamplecorrection' : "on",
635 'symbolsperline' : 32,
636 'res' : '96',
637 'res_units' : 'ppi',
638 'antialias' : 'on',
639 'title' : '',
640 'barbits' : '',
641 'xaxis': 'on',
642 'xaxis_label' : '',
643 'yaxis': 'on',
644 'yaxis_label' : '',
645 'showends' : 'on',
646 'shrink' : '0.5',
647 'fineprint' : 'on',
648 'ticbits' : '1',
649 'colorscheme' : 'DEFAULT',
650 'color1' : 'green',
651 'color2' : 'blue',
652 'color3' : 'red',
653 'color4' : 'black',
654 'color5' : 'purple',
655 'color6' : 'orange',
656 'color1' : 'black',
657 }
658 for k,v in kwds.iteritems():
659 values[k]=str(v)
660
661 data = urllib.urlencode(values)
662 req = urllib2.Request(url, data)
663 response = urllib2.urlopen(req)
664 f=open(fname,"w")
665 im=response.read()
666
667 f.write(im)
668 f.close()
669
670
672 """Write the representation of a motif in TRANSFAC format
673 """
674 res="XX\nTY Motif\n"
675 try:
676 res+="ID %s\n"%self.name
677 except:
678 pass
679 res+="BF undef\nP0"
680 for a in self.alphabet.letters:
681 res+=" %s"%a
682 res+="\n"
683 if not self.has_counts:
684 self.make_counts_from_instances()
685 for i in range(self.length):
686 if i<9:
687 res+="0%d"%(i+1)
688 else:
689 res+="%d"%(i+1)
690 for a in self.alphabet.letters:
691 res+=" %d"%self.counts[a][i]
692 res+="\n"
693 res+="XX\n"
694 return res
695
697 """Return string representation of the motif as a matrix.
698
699 """
700 if letters==None:
701 letters=self.alphabet.letters
702 self._pwm_is_current=False
703 pwm=self.pwm(laplace=False)
704 res=""
705 for i in range(self.length):
706 res+="\t".join([str(pwm[i][a]) for a in letters])
707 res+="\n"
708 return res
709
731
736
756
758 """Matrix of log-odds scores for a nucleotide sequence.
759
760 scans a nucleotide sequence and returns the matrix of log-odds
761 scores for all positions.
762
763 - the result is a one-dimensional list or numpy array
764 - the sequence can only be a DNA sequence
765 - the search is performed only on one strand
766 """
767 if self.alphabet!=IUPAC.unambiguous_dna:
768 raise ValueError("Wrong alphabet! Use only with DNA motifs")
769 if seq.alphabet!=IUPAC.unambiguous_dna:
770 raise ValueError("Wrong alphabet! Use only with DNA sequences")
771
772 seq = seq.tostring()
773
774
775 try:
776 import _pwm
777 except ImportError:
778
779 return self._pwm_calculate(seq)
780
781
782
783 logodds=[[y[1] for y in sorted(x.items())] for x in self.log_odds()]
784 return _pwm.calculate(seq, logodds)
785
787 logodds = self.log_odds()
788 m = len(logodds)
789 s = len(sequence)
790 n = s - m + 1
791 result = [None] * n
792 for i in xrange(n):
793 score = 0.0
794 for j in xrange(m):
795 c = sequence[i+j]
796 temp = logodds[j].get(c)
797 if temp==None:
798 break
799 score += temp
800 else:
801 result[i] = score
802 return result
803