001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2008 Sun Microsystems, Inc.
026     */
027    
028    
029    package org.opends.server.backends.jeb.importLDIF;
030    
031    import org.opends.server.types.Entry;
032    import org.opends.server.backends.jeb.Index;
033    import org.opends.server.backends.jeb.EntryID;
034    import com.sleepycat.je.Transaction;
035    import com.sleepycat.je.DatabaseException;
036    import com.sleepycat.je.DatabaseEntry;
037    import com.sleepycat.je.dbi.MemoryBudget;
038    import static org.opends.server.loggers.ErrorLogger.logError;
039    import org.opends.messages.Message;
040    import static org.opends.messages.JebMessages.*;
041    import java.util.*;
042    import java.util.concurrent.locks.ReentrantLock;
043    
044    
045    /**
046     * Manages a shared cache among worker threads that caches substring
047     * key/value pairs to avoid DB cache access. Once the cache is above it's
048     * memory usage limit, it will start slowly flushing keys (similar to the
049     * JEB eviction process) until it is under the limit.
050     */
051    
052    public class BufferManager {
053    
054      //Memory usage counter.
055      private long memoryUsage=0;
056    
057      //Memory limit.
058      private long memoryLimit;
059    
060      //Next element in the cache to start flushing at during next flushAll cycle.
061      private KeyHashElement nextElem;
062    
063      //Extra bytes to flushAll.
064      private final int extraBytes  = 1024 * 1024;
065    
066      //Counters for statistics, total is number of accesses, hit is number of
067      //keys found in cache.
068      private long total=0, hit=0;
069    
070      //Actual map used to buffer keys.
071      private TreeMap<KeyHashElement, KeyHashElement> elementMap =
072                            new TreeMap<KeyHashElement, KeyHashElement>();
073    
074      //The current backup map being used.
075      private int currentMap = 1;
076    
077      //Reference to use when the maps are switched.
078      private TreeMap<KeyHashElement, KeyHashElement> backupMap;
079    
080      //The two backup maps to insert into if the main element map is being used.
081      private TreeMap<KeyHashElement, KeyHashElement> backupMap2 =
082                            new TreeMap<KeyHashElement, KeyHashElement>();
083      private TreeMap<KeyHashElement, KeyHashElement> backupMap1 =
084                            new TreeMap<KeyHashElement, KeyHashElement>();
085    
086      //Overhead values determined from using JHAT. They appear to be the same
087      //for both 32 and 64 bit machines. Close enough.
088      private final static int TREEMAP_ENTRY_OVERHEAD = 29;
089      private final static int KEY_ELEMENT_OVERHEAD = 32;
090    
091      //Lock used to get main element map.
092      private ReentrantLock lock = new ReentrantLock();
093    
094      //Object to synchronize on if backup maps are being written.
095      private Object backupSynchObj = new Object();
096    
097      /**
098       * Create buffer manager instance.
099       *
100       * @param memoryLimit The memory limit.
101       * @param importThreadCount  The count of import worker threads.
102       */
103      public BufferManager(long memoryLimit, int importThreadCount) {
104        this.memoryLimit = memoryLimit;
105        this.nextElem = null;
106        this.backupMap = backupMap1;
107      }
108    
109      /**
110       * Insert an entry ID into the buffer using the both the specified index and
111       * entry to build a key set. Will flush the buffer if over the memory limit.
112       *
113       * @param index  The index to use.
114       * @param entry The entry used to build the key set.
115       * @param entryID The entry ID to insert into the key set.
116       * @param txn A transaction.
117       * @param keySet Keyset hash to store the keys in.
118       * @throws DatabaseException If a problem happened during a flushAll cycle.
119       */
120    
121      void insert(Index index, Entry entry,
122                  EntryID entryID, Transaction txn, Set<byte[]> keySet)
123              throws DatabaseException {
124    
125        keySet.clear();
126        index.indexer.indexEntry(entry, keySet);
127        if(!lock.tryLock()) {
128          insertBackupMap(keySet, index, entryID);
129          return;
130        }
131        insertKeySet(keySet, index, entryID, elementMap, true);
132        if(!backupMap.isEmpty()) {
133           mergeMap();
134        }
135        //If over the memory limit, flush some keys from the cache to make room.
136        if(memoryUsage > memoryLimit) {
137          flushUntilUnderLimit();
138        }
139        lock.unlock();
140      }
141    
142      /**
143       * Insert an entry ID into buffer using specified id2children and id2subtree
144       * indexes.
145       *
146       * @param id2children The id2children index to use.
147       * @param id2subtree The id2subtree index to use.
148       * @param entry The entry used to build the key set.
149       * @param entryID The entry ID to insert into the key set.
150       * @param txn  A transaction.
151       * @param childKeySet id2children key set hash to use.
152       * @param subKeySet subtree key set hash to use.
153       * @throws DatabaseException If a problem occurs during processing.
154       */
155      void insert(Index id2children, Index id2subtree, Entry entry,
156                  EntryID entryID, Transaction txn, Set<byte[]> childKeySet,
157                  Set<byte[]> subKeySet) throws DatabaseException {
158        childKeySet.clear();
159        id2children.indexer.indexEntry(entry, childKeySet);
160        subKeySet.clear();
161        id2subtree.indexer.indexEntry(entry, subKeySet);
162        if(!lock.tryLock()) {
163          insertBackupMap(childKeySet, id2children, subKeySet, id2subtree, entryID);
164          return;
165        }
166        insertKeySet(childKeySet, id2children, entryID, elementMap, true);
167        insertKeySet(subKeySet, id2subtree, entryID, elementMap, true);
168        lock.unlock();
169      }
170    
171      /**
172       * Insert into a backup tree if can't get a lock on the main table.
173       * @param childrenKeySet The id2children keyset to use.
174       * @param id2children The id2children index to use.
175       * @param subtreeKeySet The subtree keyset to use.
176       * @param id2subtree The id2subtree index to use.
177       * @param entryID The entry ID to insert into the key set.
178       */
179      void insertBackupMap(Set<byte[]> childrenKeySet, Index id2children,
180                        Set<byte[]> subtreeKeySet,
181                        Index id2subtree, EntryID entryID) {
182        synchronized(backupSynchObj) {
183          insertKeySet(childrenKeySet, id2children, entryID, backupMap,  false);
184          insertKeySet(subtreeKeySet, id2subtree, entryID, backupMap,  false);
185        }
186      }
187    
188    
189      /**
190       * Insert specified keyset, index and entry ID into the backup map.
191       *
192       * @param keySet The keyset to use.
193       * @param index The index to use.
194       * @param entryID The entry ID to use.
195       */
196      void insertBackupMap(Set<byte[]> keySet, Index index, EntryID entryID) {
197           synchronized(backupSynchObj) {
198             insertKeySet(keySet, index, entryID, backupMap,  false);
199        }
200      }
201    
202    
203      /**
204       * Merge the backup map with the element map after switching the backup
205       * map reference to an empty map.
206       */
207      void mergeMap() {
208        TreeMap<KeyHashElement, KeyHashElement> tmpMap;
209        synchronized(backupSynchObj) {
210          tmpMap = backupMap;
211          if(currentMap == 1) {
212             backupMap = backupMap2;
213             tmpMap = backupMap1;
214             currentMap = 2;
215          } else {
216             backupMap = backupMap1;
217             tmpMap = backupMap2;
218             currentMap = 1;
219          }
220        }
221        TreeSet<KeyHashElement>  tSet =
222                new TreeSet<KeyHashElement>(tmpMap.keySet());
223        for (KeyHashElement elem : tSet) {
224          total++;
225          if(!elementMap.containsKey(elem)) {
226            elementMap.put(elem, elem);
227            memoryUsage += TREEMAP_ENTRY_OVERHEAD + elem.getMemorySize();
228          } else {
229            KeyHashElement curElem = elementMap.get(elem);
230            if(curElem.isDefined() || curElem.getIndex().getMaintainCount()) {
231              int oldSize = curElem.getMemorySize();
232              curElem.merge(elem);
233              memoryUsage += (curElem.getMemorySize() - oldSize);
234              hit++;
235            }
236          }
237        }
238        tmpMap.clear();
239      }
240    
241      /**
242       * Insert a keySet into the element map using the provided index and entry ID.
243       * @param keySet The key set to add to the map.
244       * @param index  The index that eventually will contain the entry IDs.
245       * @param entryID The entry ID to add to the entry ID set.
246       * @param map The map to add the keys to
247       * @param trackStats <CODE>True</CODE> if memory and usage should be tracked.
248       */
249      private void insertKeySet(Set<byte[]> keySet, Index index, EntryID entryID,
250                                TreeMap<KeyHashElement, KeyHashElement> map,
251                                boolean trackStats) {
252        KeyHashElement elem = new KeyHashElement();
253        int entryLimit = index.getIndexEntryLimit();
254        for(byte[] key : keySet) {
255          elem.reset(key, index);
256          if(trackStats) {
257            total++;
258          }
259          if(!map.containsKey(elem)) {
260            KeyHashElement newElem = new KeyHashElement(key, index, entryID);
261            map.put(newElem, newElem);
262            if(trackStats) {
263              memoryUsage += TREEMAP_ENTRY_OVERHEAD + newElem.getMemorySize();
264            }
265          } else {
266            KeyHashElement curElem = map.get(elem);
267            if(curElem.isDefined() || index.getMaintainCount()) {
268              int oldSize = curElem.getMemorySize();
269              curElem.addEntryID(entryID, entryLimit);
270              if(trackStats) {
271                memoryUsage += (curElem.getMemorySize() - oldSize);
272                hit++;
273              }
274            }
275          }
276        }
277      }
278    
279      /**
280       * Flush the buffer to DB until the buffer is under the memory limit.
281       *
282       * @throws DatabaseException If a problem happens during an index insert.
283       */
284      private void flushUntilUnderLimit() throws DatabaseException {
285        Iterator<KeyHashElement> iter;
286        if(nextElem == null) {
287          iter = elementMap.keySet().iterator();
288        } else {
289          iter = elementMap.tailMap(nextElem).keySet().iterator();
290        }
291        DatabaseEntry dbEntry = new DatabaseEntry();
292        DatabaseEntry entry = new DatabaseEntry();
293        while((memoryUsage + extraBytes) > memoryLimit) {
294          if(iter.hasNext()) {
295            KeyHashElement curElem = iter.next();
296            //Never flush undefined elements.
297            if(curElem.isDefined()) {
298              int oldSize = curElem.getMemorySize();
299              Index index = curElem.getIndex();
300              dbEntry.setData(curElem.getKey());
301              index.insert(null, dbEntry, curElem.getIDSet(), entry);
302              if(curElem.isDefined()) {
303                 memoryUsage -= TREEMAP_ENTRY_OVERHEAD + curElem.getMemorySize();
304                 iter.remove();
305              } else {
306                //Went undefined don't remove the element, just substract the
307                //memory size difference.
308                memoryUsage -= (oldSize - curElem.getMemorySize());
309              }
310            }
311          } else {
312            //Wrapped around, start at the first element.
313            nextElem = elementMap.firstKey();
314            iter = elementMap.keySet().iterator();
315          }
316        }
317        //Start at this element next flushAll cycle.
318        nextElem = iter.next();
319      }
320    
321      /**
322       * Called from main thread to prepare for final buffer flush at end of
323       * ldif load.
324       */
325      void prepareFlush() {
326        Message msg =
327               NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH.get(elementMap.size(), total, hit);
328        logError(msg);
329      }
330    
331      /**
332       * Writes all of the buffer elements to DB. The specific id is used to
333       * share the buffer among the worker threads so this function can be
334       * multi-threaded.
335       *
336       * @throws DatabaseException If an error occurred during the insert.
337       */
338      void flushAll() throws DatabaseException {
339        mergeMap();
340        TreeSet<KeyHashElement>  tSet =
341                new TreeSet<KeyHashElement>(elementMap.keySet());
342        DatabaseEntry dbEntry = new DatabaseEntry();
343        DatabaseEntry entry = new DatabaseEntry();
344        for (KeyHashElement curElem : tSet) {
345          Index index = curElem.getIndex();
346          dbEntry.setData(curElem.getKey());
347          index.insert(null, dbEntry, curElem.getIDSet(), entry);
348        }
349      }
350    
351      /**
352       *  Class used to represent an element in the buffer.
353       */
354      class KeyHashElement implements Comparable {
355    
356        //Bytes representing the key.
357        private  byte[] key;
358    
359        //Hash code returned from the System.identityHashCode method on the index
360        //object.
361        private int indexHashCode;
362    
363        //Index related to the element.
364        private Index index;
365    
366        //The set of IDs related to the key.
367        private ImportIDSet importIDSet;
368    
369        //Used to speed up lookup.
370        private int keyHashCode;
371    
372        /**
373         * Empty constructor for use when the element is being reused.
374         */
375        public KeyHashElement() {}
376    
377        /**
378         * Reset the element. Used when the element is being reused.
379         *
380         * @param key The new key to reset to.
381         * @param index The new index to reset to.
382         */
383        public void reset(byte[] key, Index index) {
384          this.key = key;
385          this.index = index;
386          this.indexHashCode = System.identityHashCode(index);
387          this.keyHashCode = Arrays.hashCode(key);
388          if(this.importIDSet != null) {
389            this.importIDSet.reset();
390          }
391        }
392    
393        /**
394         * Create instance of an element for the specified key and index, the add
395         * the specified entry ID to the ID set.
396         *
397         * @param key The key.
398         * @param index The index.
399         * @param entryID The entry ID to start off with.
400         */
401        public KeyHashElement(byte[] key, Index index, EntryID entryID) {
402          this.key = key;
403          this.index = index;
404          //Use the integer set for right now. This is good up to 2G number of
405          //entries. There is also a LongImportSet, but it currently isn't used.
406          this.importIDSet = new IntegerImportIDSet(entryID);
407          //Used if there when there are conflicts if two or more indexes have
408          //the same key.
409          this.indexHashCode = System.identityHashCode(index);
410          this.keyHashCode = Arrays.hashCode(key);
411        }
412    
413        /**
414         * Add an entry ID to the set.
415         *
416         * @param entryID  The entry ID to add.
417         * @param entryLimit The entry limit
418         */
419        void addEntryID(EntryID entryID, int entryLimit) {
420          importIDSet.addEntryID(entryID, entryLimit, index.getMaintainCount());
421        }
422    
423        /**
424         * Return the index.
425         *
426         * @return The index.
427         */
428        Index getIndex(){
429          return index;
430        }
431    
432        /**
433         * Return the key.
434         *
435         * @return The key.
436         */
437        byte[] getKey() {
438          return key;
439        }
440    
441        /**
442         * Return value of the key hash code.
443         *
444         * @return The key hash code value.
445         */
446        int getKeyHashCode() {
447          return keyHashCode;
448        }
449    
450        /**
451         * Return the ID set.
452          * @return The import ID set.
453         */
454        ImportIDSet getIDSet() {
455          return importIDSet;
456        }
457    
458        /**
459         * Return if the ID set is defined or not.
460         *
461         * @return <CODE>True</CODE> if the ID set is defined.
462         */
463        boolean isDefined() {
464          return importIDSet.isDefined();
465        }
466    
467        /**
468         * Compare the bytes of two keys.  The is slow, only use if the hashcode
469         * had a collision.
470         *
471         * @param a  Key a.
472         * @param b  Key b.
473         * @return  0 if the keys are equal, -1 if key a is less than key b, 1 if
474         *          key a is greater than key b.
475         */
476        private int compare(byte[] a, byte[] b) {
477          int i;
478          for (i = 0; i < a.length && i < b.length; i++) {
479            if (a[i] > b[i]) {
480              return 1;
481            }
482            else if (a[i] < b[i]) {
483              return -1;
484            }
485          }
486          if (a.length == b.length) {
487            return 0;
488          }
489          if (a.length > b.length){
490            return 1;
491          }
492          else {
493            return -1;
494          }
495        }
496    
497        /**
498         * Compare two element keys. First check the precomputed hashCode. If
499         * the hashCodes are equal, do a second byte per byte comparision in case
500         * there was a  collision.
501         *
502         * @param elem The element to compare.
503         * @return  0 if the keys are equal, -1 if key a is less than key b, 1 if
504         *          key a is greater than key b.
505         */
506        private int compare(KeyHashElement elem) {
507          if(keyHashCode == elem.getKeyHashCode()) {
508            return compare(key, elem.key);
509          } else {
510            if(keyHashCode < elem.getKeyHashCode()) {
511              return -1;
512            } else {
513              return 1;
514            }
515          }
516        }
517    
518        /**
519         * Compare the specified object to the current object. If the keys are
520         * equal, then the indexHashCode value is used as a tie-breaker.
521         *
522         * @param o The object representing a KeyHashElement.
523         * @return 0 if the objects are equal, -1 if the current object is less
524         *         than the specified object, 1 otherwise.
525         */
526        public int compareTo(Object o) {
527          if (o == null) {
528            throw new NullPointerException();
529          }
530          KeyHashElement inElem = (KeyHashElement) o;
531          int keyCompare = compare(inElem);
532          if(keyCompare == 0) {
533            if(indexHashCode == inElem.indexHashCode) {
534              return 0;
535            } else if(indexHashCode < inElem.indexHashCode) {
536              return -1;
537            } else {
538              return 1;
539            }
540          } else {
541            return keyCompare;
542          }
543        }
544    
545        /**
546         * Return the current total memory size of the element.
547         * @return The memory size estimate of a KeyHashElement.
548         */
549        int getMemorySize() {
550          return  KEY_ELEMENT_OVERHEAD +
551                  MemoryBudget.byteArraySize(key.length) +
552                  importIDSet.getMemorySize();
553        }
554    
555        /**
556         * Merge the specified element with this element.
557         * @param e The element to merge.
558         */
559        public void merge(KeyHashElement e) {
560          importIDSet.merge(e.importIDSet, e.getIndex().getIndexEntryLimit(),
561                  e.getIndex().getMaintainCount());
562        }
563      }
564    }