1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """
22 Store implementation on top of Berkeley DB (>= 4.1)
23
24 This store is suitable for large databases, or for cases where the
25 startup time is more important.
26 """
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 from gettext import gettext as _
57
58 import os, shutil, copy, sys, traceback, string, weakref
59 from sets import Set
60
61 import cPickle as pickle
62 import logging
63
64 log = logging.getLogger('pyblio.stores.bsddb')
65
66 from Pyblio.Arrays import KeyArray, match_arrays
67 from Pyblio.Stores import resultset
68
69
70
71
72
74 v = [ int (x) for x in txt.split ('.') ]
75 v = v + [0] * (5 - len (v))
76
77 return tuple (v)
78
79 _REQUIRED = (4,3,0,0,0)
80
88
89 try:
90 import bsddb3
91 db = _checkver (bsddb3)
92
93 except ImportError, msg:
94
95 import bsddb
96 db = _checkver (bsddb)
97
98 from Pyblio import Store, Schema, Callback, Attribute, Exceptions, Tools, Query, Sort
99
100 _pl = pickle.loads
101 _ps = pickle.dumps
102
103
104
106 """ Mark id as matching all the words. """
107 sid = str(id)
108
109 f, b = _idx
110
111 bw = Set([w.encode('utf-8') for w in words])
112
113 for word in bw:
114
115 a = KeyArray(s=f.get(word))
116 a.add(id)
117
118 f.put(word, a.tostring())
119
120 b.put(sid, _ps(bw))
121 return
122
123
125 """ Remove any secondary index belonging to the entry """
126 txn = None
127
128 sid = str(id)
129
130 f, b = _idx
131
132 try:
133 bw = _pl(b.get(sid))
134 except TypeError:
135 bw = Set()
136
137 b.delete(sid)
138
139 for word in bw:
140 a = KeyArray(s=f.get(word))
141 try:
142 del a[id]
143 f.put(word, a.tostring())
144 except IndexError:
145 pass
146
147 return
148
149
150
151
152 -class RSDB(Callback.Publisher):
153 """ Virtual result set that loops over the full database"""
154
165
167 c = self._db._db.cursor()
168 d = c.first()
169
170 while d:
171 yield _pl(d[1])
172 d = c.next()
173
174 c.close()
175 return
176
178
179 c = self._db._db.cursor()
180 d = c.first()
181
182 while d:
183 yield Store.Key(d[0])
184 d = c.next()
185
186 c.close()
187 return
188
189 __iter__ = iterkeys
190
192 c = self._db._db.cursor()
193 d = c.first()
194
195 while d:
196 yield Store.Key(d[0]), _pl(d[1])
197 d = c.next()
198
199 c.close()
200 return
201
203 return self._db._db.stat()['nkeys']
204
205 - def view(self, criterion):
207
209 self.emit('add-item', k)
210 return
211
213 self.emit('delete-item', k)
214 return
215
217 self.emit('update-item', k)
218 return
219
220
223 _db.register('delete', self._on_delete_item)
224
225 self._db = weakref.ref(_db)
226 txn = _db._env.txn_begin(parent=txn)
227 try:
228 self._rs = db.DB(_db._env.e)
229 self._rs.open('resultset', 'sets', db.DB_HASH, db.DB_CREATE, txn=txn)
230 _db._env.txn_commit(txn)
231 except:
232 _db._env.txn_abort(txn)
233 raise
234 return
235
237 self._rs.close()
238 return
239
241 self._rs.sync()
242 return
243
245 data = self._rs.get(str(k))
246 if data is None:
247 raise KeyError("unknown resultset %r" % k)
248 contents, name = _pl(data)
249 rs = resultset.ResultSet(k, self._db(), contents=contents)
250 rs.name = name
251 return rs
252
254 _db = self._db()
255 txn = _db._env.txn_begin ()
256 try:
257 self._rs.delete(str(k), txn)
258
259 (last, avail) = _pl(_db._meta.get('rs', txn=txn))
260 del avail[k]
261 _db._meta.put('rs', _ps ((last, avail)), txn = txn)
262 except:
263 _db._env.txn_abort(txn)
264 raise
265 _db._env.txn_commit(txn)
266 return
267
269 (last, avail) = _pl(self._db()._meta.get('rs'))
270 for k, name in avail.iteritems():
271 yield k, self[k]
272
274 for k, v in self.iteritems():
275 yield v
276
278 for k, v in self.iteritems():
279 yield k
280
281 __iter__ = itervalues
282
283 - def new(self, rsid=None, txn=None):
284 """ Create an empty result set """
285 _db = self._db()
286 txn = _db._env.txn_begin(parent=txn)
287 try:
288
289 (last, avail) = _pl(_db._meta.get('rs', txn=txn))
290 (last, rsid) = Tools.id_make(last, rsid)
291 _db._meta.put('rs', _ps((last, avail)), txn=txn)
292 rs = resultset.ResultSet(rsid, _db)
293 except:
294 _db._env.txn_abort(txn)
295 raise
296 _db._env.txn_commit(txn)
297 return rs
298
299 - def update(self, result_set, txn=None):
300 _db = self._db()
301 txn = _db._env.txn_begin(parent=txn)
302 try:
303 (last, avail) = _pl(_db._meta.get('rs', txn=txn))
304 avail[result_set.id] = result_set.name
305 _db._meta.put('rs', _ps((last, avail)), txn=txn)
306
307 self._rs.put(str(result_set.id),
308 _ps((result_set._contents,
309 result_set.name)), txn=txn)
310 except:
311 _db._env.txn_abort(txn)
312 raise
313 _db._env.txn_commit(txn)
314
316 for v in self.itervalues():
317 if key in v:
318 del v[key]
319 self.update(v, txn=txn)
320
321
322
324 """ I pretend to be a DBEnv, with overloadable txn management
325 functions. I work when transactions are enabled. """
326
328 self.e = db.DBEnv(*args, **kargs)
329
332
333 - def open(self, *args, **kargs):
334 return self.e.open(*args, **kargs)
335
338
341
342
344 """ I pretend to be a DBEnv, with overloadable txn management
345 functions. I work when transactions are disabled. """
346
349
352
355
356 _units = {
357 'k': 1024,
358 'M': 1024 ** 2,
359 'G': 1024 ** 3,
360 }
361
362
363 -class Database(Query.Queryable, Store.Database, Callback.Publisher):
364 """ A Pyblio database stored in a Berkeley DB engine """
365
366 - def __init__ (self, path, schema=None, create=False, args={}):
367
368 Callback.Publisher.__init__(self)
369
370 self._use_txn = args.get('transactions', True)
371
372
373
374 self._env = {True: _TxnEnv,
375 False: _NoTxnEnv}[self._use_txn]()
376
377 cache = args.get('cachesize', '10M')
378
379 if cache[-1] in _units.keys():
380 cache = int(cache[:-1]) * _units[cache[-1]]
381 else:
382 cache = int(cache)
383
384 gbytes = cache / _units['G']
385 bytes = cache - (gbytes * _units['G'])
386
387 self._env.e.set_cachesize(gbytes, bytes)
388
389 log.debug('environment configured with (%d Gb, %d b) cache, transactions: %s' % (
390 gbytes, bytes, str(self._use_txn)))
391
392 if create:
393 try:
394 os.mkdir(path)
395
396 except OSError, msg:
397 raise Store.StoreError(_("cannot create '%s': %s") % (
398 path, msg))
399
400 flag = db.DB_CREATE
401 oflag = db.DB_CREATE | db.DB_INIT_MPOOL
402
403 if self._use_txn:
404 oflag |= db.DB_INIT_TXN
405
406 self._env.open(path, oflag)
407
408 else:
409 flag = db.DB_CREATE
410 oflag = db.DB_INIT_MPOOL
411
412 if self._use_txn:
413 oflag |= db.DB_INIT_TXN
414
415 self._env.open(path, oflag)
416
417 self._path = path
418
419 txn = self._env.txn_begin()
420
421 try:
422
423 self._db = db.DB(self._env.e)
424 self._db.open('database', 'entries', db.DB_HASH, flag, txn=txn)
425
426
427 self._meta = db.DB(self._env.e)
428 self._meta.open('database', 'meta', db.DB_HASH, flag, txn=txn)
429
430 if create:
431 self._schema = schema
432 self._meta.put('schema', _ps (schema), txn=txn)
433 self._meta.put('rs', _ps ((1, {})), txn=txn)
434 self._meta.put('view', _ps ((1, {}, {})), txn=txn)
435 self._meta.put('full', KeyArray().tostring(), txn=txn)
436 self._meta.put('serial', '1', txn=txn)
437
438 self._header_set(None, txn)
439
440 else:
441 self._schema = _pl(self._meta.get('schema', txn = txn))
442
443
444
445
446
447 f = db.DB(self._env.e)
448 f.open('index', 'f', db.DB_BTREE, flag)
449
450
451 b = db.DB(self._env.e)
452 b.open('index', 'b', db.DB_BTREE, flag)
453
454 self._idx = (f, b)
455
456
457 self.rs = ResultSetStore(self, txn)
458 except:
459 self._env.txn_abort(txn)
460 raise
461
462 self._env.txn_commit(txn)
463 return
464
466 return _pl(self._meta.get('header'))
467
469 txn = self._env.txn_begin(txn)
470
471 try:
472 self._meta.put('header', _ps(header), txn=txn)
473 except:
474 self._env.txn_abort(txn)
475 raise
476
477 self._env.txn_commit(txn)
478 return
479
480 header = property(_header_get, _header_set)
481
484
498
499 schema = property (_schema_get, _schema_set)
500
501
503
504 self._db.sync()
505 self._meta.sync()
506
507 for i in self._idx:
508 i.sync()
509
510 self.rs._save()
511 return
512
513
514 - def add(self, val, key=None):
515
516 val = self.validate(val)
517
518
519 txn = self._env.txn_begin()
520
521 try:
522 serial = int(self._meta.get('serial', txn=txn))
523 full = KeyArray(s=self._meta.get('full', txn=txn))
524
525 serial, key = Tools.id_make(serial, key)
526 full.add(key)
527
528 self._meta.put('full', full.tostring(), txn=txn)
529 self._meta.put('serial', str(serial), txn=txn)
530
531 key = Store.Key(key)
532 val.key = key
533
534 self._insert(key, val, txn)
535
536 self.emit('add', val, txn)
537 except:
538 self._env.txn_abort(txn)
539 raise
540
541 self._env.txn_commit(txn)
542 self.emit('add-item', val)
543 return key
544
545
547 assert self.has_key (key), \
548 _('entry %s does not exist') % `key`
549
550 val = self.validate(val)
551 val.key = key
552
553 txn = self._env.txn_begin ()
554
555 try:
556
557
558 self.emit('update', key, val, txn)
559
560 _idxdel(self._idx, key, txn)
561 self._insert(key, val, txn)
562
563 except:
564 etype, value, tb = sys.exc_info ()
565 traceback.print_exception (etype, value, tb)
566
567 self._env.txn_abort(txn)
568 raise
569
570 self._env.txn_commit(txn)
571 self.emit('update-item', key)
572 return
573
574
576 id = str (key)
577
578 txn = self._env.txn_begin ()
579
580 try:
581
582
583 self.emit('delete', key, txn)
584
585
586 _idxdel (self._idx, key, txn)
587 self._db.delete (id, txn)
588
589 full = KeyArray(s=self._meta.get('full', txn=txn))
590 del full[key]
591 self._meta.put('full', full.tostring(), txn=txn)
592
593 except:
594 self._env.txn_abort(txn)
595 raise
596
597 self._env.txn_commit(txn)
598 self.emit('delete-item', key)
599 return
600
601
603 id = str (k)
604 try:
605 self._db.get (id)
606 except db.DBNotFoundError:
607 return False
608 return True
609
611
612
613 def words():
614 for attribs in val.values():
615 for attrib in attribs:
616
617 for idx in attrib.index():
618 yield idx
619
620 _idxadd(self._idx, id, words(), txn)
621 return
622
624 id = str(key)
625
626 self._idxadd(key, val, txn)
627
628 val = copy.copy (val)
629 val.key = key
630
631 val = _ps(val)
632
633 self._db.put(id, val, txn=txn)
634 return id
635
638
642
647
649 return _pl(self._db.get(str(key)))
650
653
654 entries = property(_entries_get)
655
658
659
661
662 if not os.path.isdir (path):
663 raise ValueError ('%s is not a directory' % path)
664
665 if not os.path.exists (os.path.join (path, 'database')):
666 raise ValueError ('%s is not a pybliographer database' % path)
667
668 shutil.rmtree (path)
669 return
670
671
674
675
677
678 try:
679 return Database(path=path, create=False, args=args)
680
681 except db.DBNoSuchFileError, msg:
682 raise Store.StoreError (_("cannot open '%s': %s") % (
683 path, msg))
684
685
700
701 description = _("Berkeley DB storage")
702