Package Pyblio :: Package Stores :: Package bsddbstore
[hide private]
[frames] | no frames]

Source Code for Package Pyblio.Stores.bsddbstore

  1  # This file is part of pybliographer 
  2  #  
  3  # Copyright (C) 1998-2006 Frederic GOBRY 
  4  # Email : gobry@pybliographer.org 
  5  #           
  6  # This program is free software; you can redistribute it and/or 
  7  # modify it under the terms of the GNU General Public License 
  8  # as published by the Free Software Foundation; either version 2  
  9  # of the License, or (at your option) any later version. 
 10  #    
 11  # This program is distributed in the hope that it will be useful, 
 12  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 14  # GNU General Public License for more details.  
 15  #  
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA. 
 19  #  
 20   
 21  """ 
 22  Store implementation on top of Berkeley DB (>= 4.1) 
 23   
 24  This store is suitable for large databases, or for cases where the 
 25  startup time is more important. 
 26  """ 
 27   
 28   
 29  # Tables in use: 
 30  #  
 31  # * database/entries [HASH] 
 32  #  
 33  #   key:   string value of an entry key 
 34  #   value: Store.Record as a pickled object 
 35  #  
 36  # * database/meta [HASH] 
 37  #  
 38  #   key:   a meta parameter (next available key,...) 
 39  #   value: its value 
 40  #  
 41  # * database/enum [HASH] 
 42  #  
 43  #   key:   id of the enum 
 44  #   value: pickled dict containing the values 
 45  #  
 46  # * index/full [HASH / DUP] 
 47  #  
 48  #   key:   the indexed value 
 49  #   value: the entry that contains the value 
 50  #  
 51  # * resultset/sets [HASH] 
 52  #  
 53  #   key:   resultset id 
 54  #   value: a serialized boolean numpy array containing the records 
 55   
 56  from gettext import gettext as _ 
 57   
 58  import os, shutil, copy, sys, traceback, string, weakref 
 59  from sets import Set 
 60   
 61  import cPickle as pickle 
 62  import logging 
 63   
 64  log = logging.getLogger('pyblio.stores.bsddb') 
 65   
 66  from Pyblio.Arrays import KeyArray, match_arrays 
 67  from Pyblio.Stores import resultset 
 68   
 69  # Python ships the bsddb module as 'bsddb', whereas when fetched as a 
 70  # standalone package it is named 'bsddb3'. For the moment, we need a 
 71  # version that is not yet shipped. 
 72   
73 -def _numver (txt):
74 v = [ int (x) for x in txt.split ('.') ] 75 v = v + [0] * (5 - len (v)) 76 77 return tuple (v)
78 79 _REQUIRED = (4,3,0,0,0) 80
81 -def _checkver (module):
82 version = _numver (module.__version__) 83 84 if version < _REQUIRED: 85 raise ImportError ('bsddb is too old (%s instead of %s)' % (version, _REQUIRED)) 86 87 return module.db
88 89 try: 90 import bsddb3 91 db = _checkver (bsddb3) 92 93 except ImportError, msg: 94 95 import bsddb 96 db = _checkver (bsddb) 97 98 from Pyblio import Store, Schema, Callback, Attribute, Exceptions, Tools, Query, Sort 99 100 _pl = pickle.loads 101 _ps = pickle.dumps 102 103 # -------------------------------------------------- 104
105 -def _idxadd(_idx, id, words, txn):
106 """ Mark id as matching all the words. """ 107 sid = str(id) 108 109 f, b = _idx 110 111 bw = Set([w.encode('utf-8') for w in words]) 112 113 for word in bw: 114 # Forward link, from word to record 115 a = KeyArray(s=f.get(word)) 116 a.add(id) 117 118 f.put(word, a.tostring()) 119 120 b.put(sid, _ps(bw)) 121 return
122 123
124 -def _idxdel(_idx, id, txn):
125 """ Remove any secondary index belonging to the entry """ 126 txn = None 127 128 sid = str(id) 129 130 f, b = _idx 131 132 try: 133 bw = _pl(b.get(sid)) 134 except TypeError: 135 bw = Set() 136 137 b.delete(sid) 138 139 for word in bw: 140 a = KeyArray(s=f.get(word)) 141 try: 142 del a[id] 143 f.put(word, a.tostring()) 144 except IndexError: 145 pass 146 147 return
148 149 150 # -------------------------------------------------- 151
152 -class RSDB(Callback.Publisher):
153 """ Virtual result set that loops over the full database""" 154
155 - def __init__ (self, _db):
156 Callback.Publisher.__init__(self) 157 158 self.id = 0 159 self._db = _db 160 161 _db.register('add-item', self._add) 162 _db.register('delete-item', self._delete) 163 _db.register('update-item', self._update) 164 return
165
166 - def itervalues (self):
167 c = self._db._db.cursor() 168 d = c.first() 169 170 while d: 171 yield _pl(d[1]) 172 d = c.next() 173 174 c.close() 175 return
176
177 - def iterkeys(self):
178 179 c = self._db._db.cursor() 180 d = c.first() 181 182 while d: 183 yield Store.Key(d[0]) 184 d = c.next() 185 186 c.close() 187 return
188 189 __iter__ = iterkeys 190
191 - def iteritems(self):
192 c = self._db._db.cursor() 193 d = c.first() 194 195 while d: 196 yield Store.Key(d[0]), _pl(d[1]) 197 d = c.next() 198 199 c.close() 200 return
201
202 - def __len__(self):
203 return self._db._db.stat()['nkeys']
204
205 - def view(self, criterion):
206 return resultset.View(self, criterion)
207
208 - def _add(self, k):
209 self.emit('add-item', k) 210 return
211
212 - def _delete(self, k):
213 self.emit('delete-item', k) 214 return
215
216 - def _update(self, k):
217 self.emit('update-item', k) 218 return
219 220 # --------------------------------------------------
221 -class ResultSetStore(Store.ResultSetStore):
222 - def __init__ (self, _db, txn):
223 _db.register('delete', self._on_delete_item) 224 225 self._db = weakref.ref(_db) 226 txn = _db._env.txn_begin(parent=txn) 227 try: 228 self._rs = db.DB(_db._env.e) 229 self._rs.open('resultset', 'sets', db.DB_HASH, db.DB_CREATE, txn=txn) 230 _db._env.txn_commit(txn) 231 except: 232 _db._env.txn_abort(txn) 233 raise 234 return
235
236 - def _close(self):
237 self._rs.close() 238 return
239
240 - def _save(self):
241 self._rs.sync() 242 return
243
244 - def __getitem__(self, k):
245 data = self._rs.get(str(k)) 246 if data is None: 247 raise KeyError("unknown resultset %r" % k) 248 contents, name = _pl(data) 249 rs = resultset.ResultSet(k, self._db(), contents=contents) 250 rs.name = name 251 return rs
252
253 - def __delitem__(self, k):
254 _db = self._db() 255 txn = _db._env.txn_begin () 256 try: 257 self._rs.delete(str(k), txn) 258 # get the rs dict 259 (last, avail) = _pl(_db._meta.get('rs', txn=txn)) 260 del avail[k] 261 _db._meta.put('rs', _ps ((last, avail)), txn = txn) 262 except: 263 _db._env.txn_abort(txn) 264 raise 265 _db._env.txn_commit(txn) 266 return
267
268 - def iteritems(self):
269 (last, avail) = _pl(self._db()._meta.get('rs')) 270 for k, name in avail.iteritems(): 271 yield k, self[k]
272
273 - def itervalues(self):
274 for k, v in self.iteritems(): 275 yield v
276
277 - def iterkeys(self):
278 for k, v in self.iteritems(): 279 yield k
280 281 __iter__ = itervalues 282
283 - def new(self, rsid=None, txn=None):
284 """ Create an empty result set """ 285 _db = self._db() 286 txn = _db._env.txn_begin(parent=txn) 287 try: 288 # get the next rs id 289 (last, avail) = _pl(_db._meta.get('rs', txn=txn)) 290 (last, rsid) = Tools.id_make(last, rsid) 291 _db._meta.put('rs', _ps((last, avail)), txn=txn) 292 rs = resultset.ResultSet(rsid, _db) 293 except: 294 _db._env.txn_abort(txn) 295 raise 296 _db._env.txn_commit(txn) 297 return rs
298
299 - def update(self, result_set, txn=None):
300 _db = self._db() 301 txn = _db._env.txn_begin(parent=txn) 302 try: 303 (last, avail) = _pl(_db._meta.get('rs', txn=txn)) 304 avail[result_set.id] = result_set.name 305 _db._meta.put('rs', _ps((last, avail)), txn=txn) 306 307 self._rs.put(str(result_set.id), 308 _ps((result_set._contents, 309 result_set.name)), txn=txn) 310 except: 311 _db._env.txn_abort(txn) 312 raise 313 _db._env.txn_commit(txn)
314
315 - def _on_delete_item(self, key, txn):
316 for v in self.itervalues(): 317 if key in v: 318 del v[key] 319 self.update(v, txn=txn)
320 321 # -------------------------------------------------- 322
323 -class _TxnEnv(object):
324 """ I pretend to be a DBEnv, with overloadable txn management 325 functions. I work when transactions are enabled. """ 326
327 - def __init__(self, *args, **kargs):
328 self.e = db.DBEnv(*args, **kargs)
329
330 - def txn_begin(self, *args, **kargs):
331 return self.e.txn_begin(*args, **kargs)
332
333 - def open(self, *args, **kargs):
334 return self.e.open(*args, **kargs)
335
336 - def txn_commit(self, txn):
337 txn.commit()
338
339 - def txn_abort(self, txn):
340 txn.abort()
341 342
343 -class _NoTxnEnv(_TxnEnv):
344 """ I pretend to be a DBEnv, with overloadable txn management 345 functions. I work when transactions are disabled. """ 346
347 - def txn_begin(self, *args, **kargs):
348 return None
349
350 - def txn_commit(self, txn):
351 return
352
353 - def txn_abort(self, txn):
354 return
355 356 _units = { 357 'k': 1024, 358 'M': 1024 ** 2, 359 'G': 1024 ** 3, 360 } 361 362
363 -class Database(Query.Queryable, Store.Database, Callback.Publisher):
364 """ A Pyblio database stored in a Berkeley DB engine """ 365
366 - def __init__ (self, path, schema=None, create=False, args={}):
367 368 Callback.Publisher.__init__(self) 369 370 self._use_txn = args.get('transactions', True) 371 372 # Instantiate the proper environment (yes, this could be done 373 # with an if :-)) 374 self._env = {True: _TxnEnv, 375 False: _NoTxnEnv}[self._use_txn]() 376 377 cache = args.get('cachesize', '10M') 378 379 if cache[-1] in _units.keys(): 380 cache = int(cache[:-1]) * _units[cache[-1]] 381 else: 382 cache = int(cache) 383 384 gbytes = cache / _units['G'] 385 bytes = cache - (gbytes * _units['G']) 386 387 self._env.e.set_cachesize(gbytes, bytes) 388 389 log.debug('environment configured with (%d Gb, %d b) cache, transactions: %s' % ( 390 gbytes, bytes, str(self._use_txn))) 391 392 if create: 393 try: 394 os.mkdir(path) 395 396 except OSError, msg: 397 raise Store.StoreError(_("cannot create '%s': %s") % ( 398 path, msg)) 399 400 flag = db.DB_CREATE 401 oflag = db.DB_CREATE | db.DB_INIT_MPOOL 402 403 if self._use_txn: 404 oflag |= db.DB_INIT_TXN 405 406 self._env.open(path, oflag) 407 408 else: 409 flag = db.DB_CREATE 410 oflag = db.DB_INIT_MPOOL 411 412 if self._use_txn: 413 oflag |= db.DB_INIT_TXN 414 415 self._env.open(path, oflag) 416 417 self._path = path 418 419 txn = self._env.txn_begin() 420 421 try: 422 # DB containing the actual entries 423 self._db = db.DB(self._env.e) 424 self._db.open('database', 'entries', db.DB_HASH, flag, txn=txn) 425 426 # DB with meta informations 427 self._meta = db.DB(self._env.e) 428 self._meta.open('database', 'meta', db.DB_HASH, flag, txn=txn) 429 430 if create: 431 self._schema = schema 432 self._meta.put('schema', _ps (schema), txn=txn) 433 self._meta.put('rs', _ps ((1, {})), txn=txn) 434 self._meta.put('view', _ps ((1, {}, {})), txn=txn) 435 self._meta.put('full', KeyArray().tostring(), txn=txn) 436 self._meta.put('serial', '1', txn=txn) 437 438 self._header_set(None, txn) 439 440 else: 441 self._schema = _pl(self._meta.get('schema', txn = txn)) 442 443 # Full text indexes 444 445 # Forward index: for each word as a key, return an array 446 # of matches 447 f = db.DB(self._env.e) 448 f.open('index', 'f', db.DB_BTREE, flag) 449 450 # Backward index: for each record, list the words it matches 451 b = db.DB(self._env.e) 452 b.open('index', 'b', db.DB_BTREE, flag) 453 454 self._idx = (f, b) 455 456 # Result sets handler 457 self.rs = ResultSetStore(self, txn) 458 except: 459 self._env.txn_abort(txn) 460 raise 461 462 self._env.txn_commit(txn) 463 return
464
465 - def _header_get(self):
466 return _pl(self._meta.get('header'))
467
468 - def _header_set(self, header, txn=None):
469 txn = self._env.txn_begin(txn) 470 471 try: 472 self._meta.put('header', _ps(header), txn=txn) 473 except: 474 self._env.txn_abort(txn) 475 raise 476 477 self._env.txn_commit(txn) 478 return
479 480 header = property(_header_get, _header_set) 481
482 - def _schema_get (self):
483 return self._schema
484
485 - def _schema_set (self, schema, txn = None):
486 487 txn = self._env.txn_begin (txn) 488 try: 489 self._meta.put ('schema', _ps (schema), txn = txn) 490 491 except: 492 self._env.txn_abort(txn) 493 raise 494 495 self._env.txn_commit(txn) 496 self._schema = schema 497 return
498 499 schema = property (_schema_get, _schema_set) 500 501
502 - def save(self):
503 # Flush the databases 504 self._db.sync() 505 self._meta.sync() 506 507 for i in self._idx: 508 i.sync() 509 510 self.rs._save() 511 return
512 513
514 - def add(self, val, key=None):
515 516 val = self.validate(val) 517 518 # Be careful to always point after the last serial id used. 519 txn = self._env.txn_begin() 520 521 try: 522 serial = int(self._meta.get('serial', txn=txn)) 523 full = KeyArray(s=self._meta.get('full', txn=txn)) 524 525 serial, key = Tools.id_make(serial, key) 526 full.add(key) 527 528 self._meta.put('full', full.tostring(), txn=txn) 529 self._meta.put('serial', str(serial), txn=txn) 530 531 key = Store.Key(key) 532 val.key = key 533 534 self._insert(key, val, txn) 535 536 self.emit('add', val, txn) 537 except: 538 self._env.txn_abort(txn) 539 raise 540 541 self._env.txn_commit(txn) 542 self.emit('add-item', val) 543 return key
544 545
546 - def __setitem__ (self, key, val):
547 assert self.has_key (key), \ 548 _('entry %s does not exist') % `key` 549 550 val = self.validate(val) 551 val.key = key 552 553 txn = self._env.txn_begin () 554 555 try: 556 # Start by doing the update in the external tables, which 557 # might still want to access the previous version 558 self.emit('update', key, val, txn) 559 560 _idxdel(self._idx, key, txn) 561 self._insert(key, val, txn) 562 563 except: 564 etype, value, tb = sys.exc_info () 565 traceback.print_exception (etype, value, tb) 566 567 self._env.txn_abort(txn) 568 raise 569 570 self._env.txn_commit(txn) 571 self.emit('update-item', key) 572 return
573 574
575 - def __delitem__ (self, key):
576 id = str (key) 577 578 txn = self._env.txn_begin () 579 580 try: 581 # Start by cleaning up dependencies, as they might wish to 582 # access the item a last time. 583 self.emit('delete', key, txn) 584 585 # Then, remove the index and entry itself 586 _idxdel (self._idx, key, txn) 587 self._db.delete (id, txn) 588 589 full = KeyArray(s=self._meta.get('full', txn=txn)) 590 del full[key] 591 self._meta.put('full', full.tostring(), txn=txn) 592 593 except: 594 self._env.txn_abort(txn) 595 raise 596 597 self._env.txn_commit(txn) 598 self.emit('delete-item', key) 599 return
600 601
602 - def has_key (self, k):
603 id = str (k) 604 try: 605 self._db.get (id) 606 except db.DBNotFoundError: 607 return False 608 return True
609
610 - def _idxadd (self, id, val, txn):
611 # We need to insert the current record in both the backward 612 # and forward indexes. 613 def words(): 614 for attribs in val.values(): 615 for attrib in attribs: 616 617 for idx in attrib.index(): 618 yield idx
619 620 _idxadd(self._idx, id, words(), txn) 621 return
622
623 - def _insert(self, key, val, txn):
624 id = str(key) 625 626 self._idxadd(key, val, txn) 627 628 val = copy.copy (val) 629 val.key = key 630 631 val = _ps(val) 632 633 self._db.put(id, val, txn=txn) 634 return id
635
636 - def _q_all(self):
637 return KeyArray(s=self._meta.get('full'))
638
639 - def _q_anyword(self, query):
640 word = query.word.lower().encode('utf-8') 641 return KeyArray(s=self._idx[0].get(word))
642
643 - def _q_to_rs(self, res):
644 rs = self.rs.new() 645 rs._from_array(res) 646 return rs
647
648 - def __getitem__ (self, key):
649 return _pl(self._db.get(str(key)))
650
651 - def _entries_get(self):
652 return RSDB(self)
653 654 entries = property(_entries_get) 655
656 - def index(self):
657 pass
658 659
660 -def dbdestroy(path, nobackup=False):
661 # sanity checks 662 if not os.path.isdir (path): 663 raise ValueError ('%s is not a directory' % path) 664 665 if not os.path.exists (os.path.join (path, 'database')): 666 raise ValueError ('%s is not a pybliographer database' % path) 667 668 shutil.rmtree (path) 669 return
670 671
672 -def dbcreate(path, schema, args={}):
673 return Database (path=path, schema=schema, create=True, args=args)
674 675
676 -def dbopen(path, args={}):
677 678 try: 679 return Database(path=path, create=False, args=args) 680 681 except db.DBNoSuchFileError, msg: 682 raise Store.StoreError (_("cannot open '%s': %s") % ( 683 path, msg))
684 685
686 -def dbimport(target, source, args={}):
687 688 db = Database (path=target, schema=None, create=True, args=args) 689 690 691 try: 692 db.xmlread (open (source)) 693 694 except IOError, msg: 695 696 dbdestroy (target) 697 raise Store.StoreError (_("cannot open '%s': %s") % (file, msg)) 698 699 return db
700 701 description = _("Berkeley DB storage") 702