1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 r"""indexerconnection.py: A connection to the search engine for indexing.
19
20 """
21 __docformat__ = "restructuredtext en"
22
23 import _checkxapian
24 import cPickle
25 import xapian
26
27 from datastructures import *
28 import errors
29 from fieldactions import *
30 import fieldmappings
31 import memutils
32 from replaylog import log
33
35 """A connection to the search engine for indexing.
36
37 """
38
40 """Create a new connection to the index.
41
42 There may only be one indexer connection for a particular database open
43 at a given time. Therefore, if a connection to the database is already
44 open, this will raise a xapian.DatabaseLockError.
45
46 If the database doesn't already exist, it will be created.
47
48 """
49 self._index = log(xapian.WritableDatabase, indexpath, xapian.DB_CREATE_OR_OPEN)
50 self._indexpath = indexpath
51
52
53 self._field_actions = {}
54 self._field_mappings = fieldmappings.FieldMappings()
55 self._facet_hierarchy = {}
56 self._facet_query_table = {}
57 self._next_docid = 0
58 self._config_modified = False
59 self._load_config()
60
61
62
63 self._mem_buffered = 0
64 self.set_max_mem_use()
65
67 """Set the maximum memory to use.
68
69 This call allows the amount of memory to use to buffer changes to be
70 set. This will affect the speed of indexing, but should not result in
71 other changes to the indexing.
72
73 Note: this is an approximate measure - the actual amount of memory used
74 max exceed the specified amount. Also, note that future versions of
75 xapian are likely to implement this differently, so this setting may be
76 entirely ignored.
77
78 The absolute amount of memory to use (in bytes) may be set by setting
79 max_mem. Alternatively, the proportion of the available memory may be
80 set by setting max_mem_proportion (this should be a value between 0 and
81 1).
82
83 Setting too low a value will result in excessive flushing, and very
84 slow indexing. Setting too high a value will result in excessive
85 buffering, leading to swapping, and very slow indexing.
86
87 A reasonable default for max_mem_proportion for a system which is
88 dedicated to indexing is probably 0.5: if other tasks are also being
89 performed on the system, the value should be lowered.
90
91 """
92 if self._index is None:
93 raise errors.IndexerError("IndexerConnection has been closed")
94 if max_mem is not None and max_mem_proportion is not None:
95 raise errors.IndexerError("Only one of max_mem and "
96 "max_mem_proportion may be specified")
97
98 if max_mem is None and max_mem_proportion is None:
99 self._max_mem = None
100
101 if max_mem_proportion is not None:
102 physmem = memutils.get_physical_memory()
103 if physmem is not None:
104 max_mem = int(physmem * max_mem_proportion)
105
106 self._max_mem = max_mem
107
109 """Store the configuration for the database.
110
111 Currently, this stores the configuration in a file in the database
112 directory, so changes to it are not protected by transactions. When
113 support is available in xapian for storing metadata associated with
114 databases. this will be used instead of a file.
115
116 """
117 assert self._index is not None
118
119 config_str = cPickle.dumps((
120 self._field_actions,
121 self._field_mappings.serialise(),
122 self._facet_hierarchy,
123 self._facet_query_table,
124 self._next_docid,
125 ), 2)
126 log(self._index.set_metadata, '_xappy_config', config_str)
127
128 self._config_modified = False
129
131 """Load the configuration for the database.
132
133 """
134 assert self._index is not None
135
136 config_str = log(self._index.get_metadata, '_xappy_config')
137 if len(config_str) == 0:
138 return
139
140 try:
141 (self._field_actions, mappings, self._facet_hierarchy, self._facet_query_table, self._next_docid) = cPickle.loads(config_str)
142 except ValueError:
143
144 (self._field_actions, mappings, self._next_docid) = cPickle.loads(config_str)
145 self._facet_hierarchy = {}
146 self._facet_query_table = {}
147 self._field_mappings = fieldmappings.FieldMappings(mappings)
148
149 self._config_modified = False
150
152 """Allocate a new ID.
153
154 """
155 while True:
156 idstr = "%x" % self._next_docid
157 self._next_docid += 1
158 if not self._index.term_exists('Q' + idstr):
159 break
160 self._config_modified = True
161 return idstr
162
164 """Add an action to be performed on a field.
165
166 Note that this change to the configuration will not be preserved on
167 disk until the next call to flush().
168
169 """
170 if self._index is None:
171 raise errors.IndexerError("IndexerConnection has been closed")
172 if fieldname in self._field_actions:
173 actions = self._field_actions[fieldname]
174 else:
175 actions = FieldActions(fieldname)
176 self._field_actions[fieldname] = actions
177 actions.add(self._field_mappings, fieldtype, **kwargs)
178 self._config_modified = True
179
181 """Clear all actions for the specified field.
182
183 This does not report an error if there are already no actions for the
184 specified field.
185
186 Note that this change to the configuration will not be preserved on
187 disk until the next call to flush().
188
189 """
190 if self._index is None:
191 raise errors.IndexerError("IndexerConnection has been closed")
192 if fieldname in self._field_actions:
193 del self._field_actions[fieldname]
194 self._config_modified = True
195
197 """Get a list of field names which have actions defined.
198
199 """
200 if self._index is None:
201 raise errors.IndexerError("IndexerConnection has been closed")
202 return self._field_actions.keys()
203
205 """Process an UnprocessedDocument with the settings in this database.
206
207 The resulting ProcessedDocument is returned.
208
209 Note that this processing will be automatically performed if an
210 UnprocessedDocument is supplied to the add() or replace() methods of
211 IndexerConnection. This method is exposed to allow the processing to
212 be performed separately, which may be desirable if you wish to manually
213 modify the processed document before adding it to the database, or if
214 you want to split processing of documents from adding documents to the
215 database for performance reasons.
216
217 """
218 if self._index is None:
219 raise errors.IndexerError("IndexerConnection has been closed")
220 result = ProcessedDocument(self._field_mappings)
221 result.id = document.id
222 context = ActionContext(self._index)
223
224 for field in document.fields:
225 try:
226 actions = self._field_actions[field.name]
227 except KeyError:
228
229 continue
230 actions.perform(result, field.value, context)
231
232 return result
233
235 """Get an estimate of the bytes used by the terms in a document.
236
237 (This is a very rough estimate.)
238
239 """
240 count = 0
241 for item in xapdoc.termlist():
242
243
244 count += len(item.term) * 2
245
246
247
248 count += 8
249
250
251
252 return count * 5
253
254 - def add(self, document):
255 """Add a new document to the search engine index.
256
257 If the document has a id set, and the id already exists in
258 the database, an exception will be raised. Use the replace() method
259 instead if you wish to overwrite documents.
260
261 Returns the id of the newly added document (making up a new
262 unique ID if no id was set).
263
264 The supplied document may be an instance of UnprocessedDocument, or an
265 instance of ProcessedDocument.
266
267 """
268 if self._index is None:
269 raise errors.IndexerError("IndexerConnection has been closed")
270 if not hasattr(document, '_doc'):
271
272 document = self.process(document)
273
274
275 orig_id = document.id
276 if orig_id is None:
277 id = self._allocate_id()
278 document.id = id
279 else:
280 id = orig_id
281 if self._index.term_exists('Q' + id):
282 raise errors.IndexerError("Document ID of document supplied to add() is not unique.")
283
284
285 xapdoc = document.prepare()
286 self._index.add_document(xapdoc)
287
288 if self._max_mem is not None:
289 self._mem_buffered += self._get_bytes_used_by_doc_terms(xapdoc)
290 if self._mem_buffered > self._max_mem:
291 self.flush()
292
293 if id is not orig_id:
294 document.id = orig_id
295 return id
296
298 """Replace a document in the search engine index.
299
300 If the document does not have a id set, an exception will be
301 raised.
302
303 If the document has a id set, and the id does not already
304 exist in the database, this method will have the same effect as add().
305
306 """
307 if self._index is None:
308 raise errors.IndexerError("IndexerConnection has been closed")
309 if not hasattr(document, '_doc'):
310
311 document = self.process(document)
312
313
314 id = document.id
315 if id is None:
316 raise errors.IndexerError("No document ID set for document supplied to replace().")
317
318 xapdoc = document.prepare()
319 self._index.replace_document('Q' + id, xapdoc)
320
321 if self._max_mem is not None:
322 self._mem_buffered += self._get_bytes_used_by_doc_terms(xapdoc)
323 if self._mem_buffered > self._max_mem:
324 self.flush()
325
327 """Make a synonym key (ie, the term or group of terms to store in
328 xapian).
329
330 """
331 if field is not None:
332 prefix = self._field_mappings.get_prefix(field)
333 else:
334 prefix = ''
335 original = original.lower()
336
337 return ' '.join((prefix + word for word in original.split(' ')))
338
339 - def add_synonym(self, original, synonym, field=None,
340 original_field=None, synonym_field=None):
341 """Add a synonym to the index.
342
343 - `original` is the word or words which will be synonym expanded in
344 searches (if multiple words are specified, each word should be
345 separated by a single space).
346 - `synonym` is a synonym for `original`.
347 - `field` is the field which the synonym is specific to. If no field
348 is specified, the synonym will be used for searches which are not
349 specific to any particular field.
350
351 """
352 if self._index is None:
353 raise errors.IndexerError("IndexerConnection has been closed")
354 if original_field is None:
355 original_field = field
356 if synonym_field is None:
357 synonym_field = field
358 key = self._make_synonym_key(original, original_field)
359
360
361 value = self._make_synonym_key(synonym, synonym_field)
362 self._index.add_synonym(key, value)
363
365 """Remove a synonym from the index.
366
367 - `original` is the word or words which will be synonym expanded in
368 searches (if multiple words are specified, each word should be
369 separated by a single space).
370 - `synonym` is a synonym for `original`.
371 - `field` is the field which this synonym is specific to. If no field
372 is specified, the synonym will be used for searches which are not
373 specific to any particular field.
374
375 """
376 if self._index is None:
377 raise errors.IndexerError("IndexerConnection has been closed")
378 key = self._make_synonym_key(original, field)
379 self._index.remove_synonym(key, synonym.lower())
380
382 """Remove all synonyms for a word (or phrase).
383
384 - `field` is the field which this synonym is specific to. If no field
385 is specified, the synonym will be used for searches which are not
386 specific to any particular field.
387
388 """
389 if self._index is None:
390 raise errors.IndexerError("IndexerConnection has been closed")
391 key = self._make_synonym_key(original, field)
392 self._index.clear_synonyms(key)
393
395 """Raise an error if facet is not a declared facet field.
396
397 """
398 for action in self._field_actions[facet]._actions:
399 if action == FieldActions.FACET:
400 return
401 raise errors.IndexerError("Field %r is not indexed as a facet" % facet)
402
404 """Add a subfacet-facet relationship to the facet hierarchy.
405
406 Any existing relationship for that subfacet is replaced.
407
408 Raises a KeyError if either facet or subfacet is not a field,
409 and an IndexerError if either facet or subfacet is not a facet field.
410 """
411 if self._index is None:
412 raise errors.IndexerError("IndexerConnection has been closed")
413 self._assert_facet(facet)
414 self._assert_facet(subfacet)
415 self._facet_hierarchy[subfacet] = facet
416 self._config_modified = True
417
419 """Remove any existing facet hierarchy relationship for a subfacet.
420
421 """
422 if self._index is None:
423 raise errors.IndexerError("IndexerConnection has been closed")
424 if subfacet in self._facet_hierarchy:
425 del self._facet_hierarchy[subfacet]
426 self._config_modified = True
427
429 """Get a list of subfacets of a facet.
430
431 """
432 if self._index is None:
433 raise errors.IndexerError("IndexerConnection has been closed")
434 return [k for k, v in self._facet_hierarchy.iteritems() if v == facet]
435
436 FacetQueryType_Preferred = 1;
437 FacetQueryType_Never = 2;
439 """Set the association between a query type and a facet.
440
441 The value of `association` must be one of
442 IndexerConnection.FacetQueryType_Preferred,
443 IndexerConnection.FacetQueryType_Never or None. A value of None removes
444 any previously set association.
445
446 """
447 if self._index is None:
448 raise errors.IndexerError("IndexerConnection has been closed")
449 if query_type is None:
450 raise errors.IndexerError("Cannot set query type information for None")
451 self._assert_facet(facet)
452 if query_type not in self._facet_query_table:
453 self._facet_query_table[query_type] = {}
454 if association is None:
455 if facet in self._facet_query_table[query_type]:
456 del self._facet_query_table[query_type][facet]
457 else:
458 self._facet_query_table[query_type][facet] = association;
459 if self._facet_query_table[query_type] == {}:
460 del self._facet_query_table[query_type]
461 self._config_modified = True
462
464 """Get the set of facets associated with a query type.
465
466 Only those facets associated with the query type in the specified
467 manner are returned; `association` must be one of
468 IndexerConnection.FacetQueryType_Preferred or
469 IndexerConnection.FacetQueryType_Never.
470
471 If the query type has no facets associated with it, None is returned.
472
473 """
474 if self._index is None:
475 raise errors.IndexerError("IndexerConnection has been closed")
476 if query_type not in self._facet_query_table:
477 return None
478 facet_dict = self._facet_query_table[query_type]
479 return set([facet for facet, assoc in facet_dict.iteritems() if assoc == association])
480
505
519
521 """Delete a document from the search engine index.
522
523 If the id does not already exist in the database, this method
524 will have no effect (and will not report an error).
525
526 """
527 if self._index is None:
528 raise errors.IndexerError("IndexerConnection has been closed")
529 self._index.delete_document('Q' + id)
530
532 """Apply recent changes to the database.
533
534 If an exception occurs, any changes since the last call to flush() may
535 be lost.
536
537 """
538 if self._index is None:
539 raise errors.IndexerError("IndexerConnection has been closed")
540 if self._config_modified:
541 self._store_config()
542 self._index.flush()
543 self._mem_buffered = 0
544
546 """Close the connection to the database.
547
548 It is important to call this method before allowing the class to be
549 garbage collected, because it will ensure that any un-flushed changes
550 will be flushed. It also ensures that the connection is cleaned up
551 promptly.
552
553 No other methods may be called on the connection after this has been
554 called. (It is permissible to call close() multiple times, but
555 only the first call will have any effect.)
556
557 If an exception occurs, the database will be closed, but changes since
558 the last call to flush may be lost.
559
560 """
561 if self._index is None:
562 return
563 try:
564 self.flush()
565 finally:
566
567
568
569
570
571
572
573 self._index = None
574 self._indexpath = None
575 self._field_actions = None
576 self._config_modified = False
577
579 """Count the number of documents in the database.
580
581 This count will include documents which have been added or removed but
582 not yet flushed().
583
584 """
585 if self._index is None:
586 raise errors.IndexerError("IndexerConnection has been closed")
587 return self._index.get_doccount()
588
590 """Get an iterator which returns all the ids in the database.
591
592 The unqiue_ids are currently returned in binary lexicographical sort
593 order, but this should not be relied on.
594
595 """
596 if self._index is None:
597 raise errors.IndexerError("IndexerConnection has been closed")
598 return PrefixedTermIter('Q', self._index.allterms())
599
601 """Get the document with the specified unique ID.
602
603 Raises a KeyError if there is no such document. Otherwise, it returns
604 a ProcessedDocument.
605
606 """
607 if self._index is None:
608 raise errors.IndexerError("IndexerConnection has been closed")
609 postlist = self._index.postlist('Q' + id)
610 try:
611 plitem = postlist.next()
612 except StopIteration:
613
614 raise KeyError('Unique ID %r not found' % id)
615 try:
616 postlist.next()
617 raise errors.IndexerError("Multiple documents "
618 "found with same unique ID")
619 except StopIteration:
620
621 pass
622
623 result = ProcessedDocument(self._field_mappings)
624 result.id = id
625 result._doc = self._index.get_document(plitem.docid)
626 return result
627
629 """Get an iterator over the synonyms.
630
631 - `prefix`: if specified, only synonym keys with this prefix will be
632 returned.
633
634 The iterator returns 2-tuples, in which the first item is the key (ie,
635 a 2-tuple holding the term or terms which will be synonym expanded,
636 followed by the fieldname specified (or None if no fieldname)), and the
637 second item is a tuple of strings holding the synonyms for the first
638 item.
639
640 These return values are suitable for the dict() builtin, so you can
641 write things like:
642
643 >>> conn = IndexerConnection('foo')
644 >>> conn.add_synonym('foo', 'bar')
645 >>> conn.add_synonym('foo bar', 'baz')
646 >>> conn.add_synonym('foo bar', 'foo baz')
647 >>> dict(conn.iter_synonyms())
648 {('foo', None): ('bar',), ('foo bar', None): ('baz', 'foo baz')}
649
650 """
651 if self._index is None:
652 raise errors.IndexerError("IndexerConnection has been closed")
653 return SynonymIter(self._index, self._field_mappings, prefix)
654
656 """Get an iterator over the facet hierarchy.
657
658 The iterator returns 2-tuples, in which the first item is the
659 subfacet and the second item is its parent facet.
660
661 The return values are suitable for the dict() builtin, for example:
662
663 >>> conn = IndexerConnection('db')
664 >>> conn.add_field_action('foo', FieldActions.FACET)
665 >>> conn.add_field_action('bar', FieldActions.FACET)
666 >>> conn.add_field_action('baz', FieldActions.FACET)
667 >>> conn.add_subfacet('foo', 'bar')
668 >>> conn.add_subfacet('baz', 'bar')
669 >>> dict(conn.iter_subfacets())
670 {'foo': 'bar', 'baz': 'bar'}
671
672 """
673 if self._index is None:
674 raise errors.IndexerError("IndexerConnection has been closed")
675 if 'facets' in _checkxapian.missing_features:
676 raise errors.IndexerError("Facets unsupported with this release of xapian")
677 return self._facet_hierarchy.iteritems()
678
680 """Get an iterator over query types and their associated facets.
681
682 Only facets associated with the query types in the specified manner
683 are returned; `association` must be one of IndexerConnection.FacetQueryType_Preferred
684 or IndexerConnection.FacetQueryType_Never.
685
686 The iterator returns 2-tuples, in which the first item is the query
687 type and the second item is the associated set of facets.
688
689 The return values are suitable for the dict() builtin, for example:
690
691 >>> conn = IndexerConnection('db')
692 >>> conn.add_field_action('foo', FieldActions.FACET)
693 >>> conn.add_field_action('bar', FieldActions.FACET)
694 >>> conn.add_field_action('baz', FieldActions.FACET)
695 >>> conn.set_facet_for_query_type('type1', 'foo', conn.FacetQueryType_Preferred)
696 >>> conn.set_facet_for_query_type('type1', 'bar', conn.FacetQueryType_Never)
697 >>> conn.set_facet_for_query_type('type1', 'baz', conn.FacetQueryType_Never)
698 >>> conn.set_facet_for_query_type('type2', 'bar', conn.FacetQueryType_Preferred)
699 >>> dict(conn.iter_facet_query_types(conn.FacetQueryType_Preferred))
700 {'type1': set(['foo']), 'type2': set(['bar'])}
701 >>> dict(conn.iter_facet_query_types(conn.FacetQueryType_Never))
702 {'type1': set(['bar', 'baz'])}
703
704 """
705 if self._index is None:
706 raise errors.IndexerError("IndexerConnection has been closed")
707 if 'facets' in _checkxapian.missing_features:
708 raise errors.IndexerError("Facets unsupported with this release of xapian")
709 return FacetQueryTypeIter(self._facet_query_table, association)
710
712 """Iterate through all the terms with a given prefix.
713
714 """
716 """Initialise the prefixed term iterator.
717
718 - `prefix` is the prefix to return terms for.
719 - `termiter` is a xapian TermIterator, which should be at its start.
720
721 """
722
723
724
725
726
727
728
729
730 assert(len(prefix) == 1)
731
732 self._started = False
733 self._prefix = prefix
734 self._prefixlen = len(prefix)
735 self._termiter = termiter
736
739
741 """Get the next term with the specified prefix.
742
743 """
744 if not self._started:
745 term = self._termiter.skip_to(self._prefix).term
746 self._started = True
747 else:
748 term = self._termiter.next().term
749 if len(term) < self._prefixlen or term[:self._prefixlen] != self._prefix:
750 raise StopIteration
751 return term[self._prefixlen:]
752
753
755 """Iterate through a list of synonyms.
756
757 """
758 - def __init__(self, index, field_mappings, prefix):
759 """Initialise the synonym iterator.
760
761 - `index` is the index to get the synonyms from.
762 - `field_mappings` is the FieldMappings object for the iterator.
763 - `prefix` is the prefix to restrict the returned synonyms to.
764
765 """
766 self._index = index
767 self._field_mappings = field_mappings
768 self._syniter = self._index.synonym_keys(prefix)
769
772
774 """Get the next synonym.
775
776 """
777 synkey = self._syniter.next()
778 pos = 0
779 for char in synkey:
780 if char.isupper(): pos += 1
781 else: break
782 if pos == 0:
783 fieldname = None
784 terms = synkey
785 else:
786 prefix = synkey[:pos]
787 fieldname = self._field_mappings.get_fieldname_from_prefix(prefix)
788 terms = ' '.join((term[pos:] for term in synkey.split(' ')))
789 synval = tuple(self._index.synonyms(synkey))
790 return ((terms, fieldname), synval)
791
793 """Iterate through all the query types and their associated facets.
794
795 """
796 - def __init__(self, facet_query_table, association):
797 """Initialise the query type facet iterator.
798
799 Only facets associated with each query type in the specified
800 manner are returned (`association` must be one of
801 IndexerConnection.FacetQueryType_Preferred or
802 IndexerConnection.FacetQueryType_Never).
803
804 """
805 self._table_iter = facet_query_table.iteritems()
806 self._association = association
807
810
812 """Get the next (query type, facet set) 2-tuple.
813
814 """
815 query_type, facet_dict = self._table_iter.next()
816 facet_list = [facet for facet, association in facet_dict.iteritems() if association == self._association]
817 if len(facet_list) == 0:
818 return self.next()
819 return (query_type, set(facet_list))
820
821 if __name__ == '__main__':
822 import doctest, sys
823 doctest.testmod (sys.modules[__name__])
824