Package Bio :: Package PopGen :: Package FDist :: Module Async
[hide private]
[frames] | no frames]

Source Code for Module Bio.PopGen.FDist.Async

  1  # Copyright 2007 by Tiago Antao <tiagoantao@gmail.com>.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6   
  7  """ 
  8  This modules allows for asynchronous execution of Fdist and 
  9    spliting of loads. 
 10   
 11  FDistAsync Allows for the execution of FDist. 
 12   
 13  SplitFDist splits a single Fdist execution in several, taking advantage 
 14      of multi-core architectures. 
 15   
 16  """ 
 17   
 18  import os 
 19  import thread 
 20  from time import sleep 
 21  from Bio.PopGen.Async import Local 
 22  from Bio.PopGen.FDist.Controller import FDistController 
 23   
24 -class FDistAsync(FDistController):
25 """Asynchronous FDist execution. 26 """ 27
28 - def __init__(self, fdist_dir = "", ext = None):
29 """Constructor. 30 31 Parameters: 32 fdist_dir - Where fdist can be found, if = "", then it 33 should be on the path. 34 ext - Extension of binary names (e.g. nothing on Unix, 35 ".exe" on Windows 36 """ 37 FDistController.__init__(self, fdist_dir, ext)
38
39 - def run_job(self, parameters, input_files):
40 """Runs FDist asynchronously. 41 42 Gets typical Fdist parameters from a dictionary and 43 makes a "normal" call. This is run, normally, inside 44 a separate thread. 45 """ 46 npops = parameters['npops'] 47 nsamples = parameters['nsamples'] 48 fst = parameters['fst'] 49 sample_size = parameters['sample_size'] 50 mut = parameters.get('mut', 0) 51 num_sims = parameters.get('num_sims', 20000) 52 data_dir = parameters.get('data_dir', '.') 53 fst = self.run_fdist(npops, nsamples, fst, sample_size, 54 mut, num_sims, data_dir) 55 output_files = {} 56 output_files['out.dat'] = open(data_dir + os.sep + 'out.dat', 'r') 57 return fst, output_files
58
59 -class SplitFDist:
60 """Splits a FDist run. 61 62 The idea is to split a certain number of simulations in smaller 63 numbers (e.g. 30.000 sims split in 30 packets of 1.000). This 64 allows to run simulations in parallel, thus taking advantage 65 of multi-core CPUs. 66 67 Each SplitFDist object can only be used to run a single FDist 68 simulation. 69 """
70 - def __init__(self, report_fun = None, 71 num_thr = 2, split_size = 1000, fdist_dir = '', ext = None):
72 """Constructor. 73 74 Parameters: 75 report_fun - Function that is called when a single packet is 76 run, it should have a single parameter: Fst. 77 num_thr - Number of desired threads, typically the number 78 of cores. 79 split_size - Size that a full simulation will be split in. 80 ext - Binary extension name (e.g. nothing on Unix, '.exe' on 81 Windows). 82 """ 83 self.async = Local.Local(num_thr) 84 self.async.hooks['fdist'] = FDistAsync(fdist_dir, ext) 85 self.report_fun = report_fun 86 self.split_size = split_size
87 88 #There might be races when reporting...
89 - def monitor(self):
90 """Monitors and reports (using report_fun) execution. 91 92 Every time a partial simulation ends, calls report_fun. 93 IMPORTANT: monitor calls can be concurrent with other 94 events, ie, a tasks might end while report_fun is being 95 called. This means that report_fun should be consider that 96 other events might be happening while it is running (it 97 can call acquire/release if necessary). 98 """ 99 while(True): 100 sleep(1) 101 self.async.access_ds.acquire() 102 keys = self.async.done.keys()[:] 103 self.async.access_ds.release() 104 for done in keys: 105 self.async.access_ds.acquire() 106 fst, files = self.async.done[done] 107 del self.async.done[done] 108 out_dat = files['out.dat'] 109 f = open(self.data_dir + os.sep + 'out.dat','a') 110 f.writelines(out_dat.readlines()) 111 f.close() 112 out_dat.close() 113 self.async.access_ds.release() 114 for file in os.listdir(self.parts[done]): 115 os.remove (self.parts[done] + os.sep + file) 116 os.rmdir(self.parts[done]) 117 #print fst, out_dat 118 if self.report_fun: 119 self.report_fun(fst) 120 self.async.access_ds.acquire() 121 if len(self.async.waiting) == 0 and len(self.async.running) == 0 \ 122 and len(self.async.done) == 0: 123 break 124 self.async.access_ds.release()
125 #print 'R', self.async.running 126 #print 'W', self.async.waiting 127 #print 'R', self.async.running 128
129 - def acquire(self):
130 """Allows the external acquisition of the lock. 131 """ 132 self.async.access_ds.acquire()
133
134 - def release(self):
135 """Allows the external release of the lock. 136 """ 137 self.async.access_ds.release()
138 139 #You can only run a fdist case at a time
140 - def run_fdist(self, npops, nsamples, fst, sample_size, 141 mut = 0, num_sims = 20000, data_dir='.'):
142 """Runs FDist. 143 144 Parameters can be seen on FDistController.run_fdist. 145 146 It will split a single execution in several parts and 147 create separated data directories. 148 """ 149 num_parts = num_sims/self.split_size 150 self.parts = {} 151 self.data_dir = data_dir 152 for directory in range(num_parts): 153 full_path = data_dir + os.sep + str(directory) 154 try: 155 os.mkdir(full_path) 156 except OSError: 157 pass #Its ok, if it is already there 158 id = self.async.run_program('fdist', { 159 'npops' : npops, 160 'nsamples' : nsamples, 161 'fst' : fst, 162 'sample_size' : sample_size, 163 'mut' : mut, 164 'num_sims' : self.split_size, 165 'data_dir' : full_path 166 }, {}) 167 self.parts[id] = full_path 168 thread.start_new_thread(self.monitor, ())
169