001    /** 
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.service.impl;
019    
020    import org.activemq.service.QueueList;
021    import org.activemq.service.QueueListEntry;
022    import org.activemq.util.FastInputStream;
023    import org.activemq.util.FastOutputStream;
024    import org.activemq.util.JMSExceptionHelper;
025    
026    import javax.jms.JMSException;
027    import java.io.ByteArrayInputStream;
028    import java.io.ByteArrayOutputStream;
029    import java.io.DataInputStream;
030    import java.io.DataOutputStream;
031    import java.io.IOException;
032    import java.io.Serializable;
033    
034    /**
035     * A base class which is useful for implementation inheritence when implementing
036     * a persistent QueueList
037     *
038     * @version $Revision: 1.1.1.1 $
039     */
040    public abstract class QueueListSupport implements QueueList {
041        protected static final Long HEAD_KEY = new Long(0);
042    
043        public static class Header implements Serializable {
044            private static final long serialVersionUID = 64734383295040L;
045    
046            public Long headKey;
047            public Long tailKey;
048            public long lastKeyValue;
049            public int size;
050    
051            public byte[] asBytes() throws IOException {
052                ByteArrayOutputStream buffer = new ByteArrayOutputStream();
053                DataOutputStream out = new DataOutputStream(buffer);
054                out.writeLong(unwrapLong(headKey));
055                out.writeLong(unwrapLong(tailKey));
056                out.writeLong(lastKeyValue);
057                out.writeInt(size);
058                return buffer.toByteArray();
059            }
060    
061            public void fromBytes(byte[] data) throws IOException {
062                DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
063                this.headKey = wrapLong(in.readLong());
064                this.tailKey = wrapLong(in.readLong());
065                this.lastKeyValue = in.readLong();
066                this.size = in.readInt();
067            }
068        }
069    
070        public static class Node implements Serializable, QueueListEntry {
071            private static final long serialVersionUID = 4609474001468609536L;
072    
073            public Long previousKey;
074            public Long nextKey;
075            public Object value;
076    
077            // not stored, but cached when read from the table
078            public transient Long key;
079    
080            public Object getElement() {
081                return value;
082            }
083    
084            public byte[] asBytes() throws IOException {
085                FastOutputStream buffer = new FastOutputStream();
086                DataOutputStream out = new DataOutputStream(buffer);
087                out.writeLong(unwrapLong(previousKey));
088                out.writeLong(unwrapLong(nextKey));
089                out.writeUTF((String) value);
090                return buffer.toByteArray();
091            }
092    
093            public void fromBytes(byte[] data) throws IOException {
094                DataInputStream in = new DataInputStream(new FastInputStream(data));
095                this.previousKey = wrapLong(in.readLong());
096                this.nextKey = wrapLong(in.readLong());
097                this.value = in.readUTF();
098            }
099        }
100    
101        public Object getFirst() throws JMSException {
102            try {
103                Long key = getHeader().headKey;
104                if (key != null) {
105                    Node node = getNode(key);
106                    if (node != null) {
107                        return node.getElement();
108                    }
109                }
110                return null;
111            }
112            catch (IOException e) {
113                throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
114            }
115        }
116    
117        public Object getLast() throws JMSException {
118            try {
119                Long key = getHeader().tailKey;
120                if (key != null) {
121                    Node node = getNode(key);
122                    if (node != null) {
123                        return node.getElement();
124                    }
125                }
126                return null;
127            }
128            catch (IOException e) {
129                throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
130            }
131        }
132    
133        public Object removeFirst() throws JMSException {
134            try {
135                Header header = getHeader();
136                Long key = header.headKey;
137                if (key != null) {
138                    Node node = getNode(key);
139                    if (node != null) {
140                        doRemoveNode(node);
141                        header.headKey = node.nextKey;
142                        --header.size;
143                        updateHeader(header);
144                        return node.getElement();
145                    }
146                }
147                return null;
148            }
149            catch (IOException e) {
150                throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
151            }
152        }
153    
154        public Object removeLast() throws JMSException {
155            try {
156                Header header = getHeader();
157                Long key = header.tailKey;
158                if (key != null) {
159                    Node node = getNode(key);
160                    if (node != null) {
161                        doRemoveNode(node);
162                        header.tailKey = node.previousKey;
163                        --header.size;
164                        updateHeader(header);
165                        return node.getElement();
166                    }
167                }
168                return null;
169            }
170            catch (IOException e) {
171                throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
172            }
173        }
174    
175        public QueueListEntry addFirst(Object value) throws JMSException {
176            try {
177                Header header = getHeader();
178                Node node = createNode();
179                node.value = value;
180                Long nextKey = header.headKey;
181                node.nextKey = nextKey;
182                Long key = createKey(header);
183                node.key = key;
184                updateNode(node);
185                updateNextNode(nextKey, key);
186                header.headKey = key;
187                if (header.tailKey == null) {
188                    header.tailKey = key;
189                }
190                header.size++;
191                updateHeader(header);
192                return node;
193            }
194            catch (IOException e) {
195                throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
196            }
197        }
198    
199        public QueueListEntry addLast(Object value) throws JMSException {
200            try {
201                Header header = getHeader();
202                return doAddLast(value, header);
203            }
204            catch (IOException e) {
205                throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
206            }
207        }
208    
209        public boolean contains(Object value) throws JMSException {
210            return indexOf(value) != -1;
211        }
212    
213        public int size() throws JMSException {
214            try {
215                return getHeader().size;
216            }
217            catch (IOException e) {
218                throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
219            }
220        }
221    
222        public boolean isEmpty() throws JMSException {
223            int size = size();
224            return size == 0;
225        }
226    
227        public QueueListEntry add(Object value) throws JMSException {
228            return addLast(value);
229        }
230    
231        public Object get(int index) throws JMSException {
232            try {
233                Node node = getNode(index);
234                if (node != null) {
235                    return node.value;
236                }
237                return null;
238            }
239            catch (IOException e) {
240                throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
241            }
242        }
243    
244        public Object set(int index, Object element) throws JMSException {
245            try {
246                Node node = getNode(index);
247                if (node != null) {
248                    Object previous = node.value;
249                    node.value = element;
250                    updateNode(node);
251                    return previous;
252                }
253                return null;
254            }
255            catch (IOException e) {
256                throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
257            }
258        }
259    
260        public void add(int index, Object element) throws JMSException {
261            if (index == 0) {
262                addFirst(element);
263            }
264            else {
265                try {
266                    Header header = getHeader();
267                    Node nextNode = getNode(header, index);
268                    doAddBefore(header, nextNode, element);
269                }
270                catch (IOException e) {
271                    throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
272                }
273            }
274        }
275    
276        public Object remove(int index) throws JMSException {
277            try {
278                Node node = getNode(index);
279                if (node != null) {
280                    removeNode(node);
281                }
282                return null;
283            }
284            catch (IOException e) {
285                throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
286            }
287        }
288    
289        public int indexOf(Object value) throws JMSException {
290            try {
291                Header header = getHeader();
292                Long key = header.headKey;
293                for (int i = 0; key != null; i++) {
294                    Node node = getNode(key);
295                    if (node == null) {
296                        break;
297                    }
298                    if (value == node.value || (value != null && value.equals(node.value))) {
299                        return i;
300                    }
301                    key = node.nextKey;
302                }
303                return -1;
304            }
305            catch (IOException e) {
306                throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
307            }
308        }
309    
310        public int lastIndexOf(Object value) throws JMSException {
311            try {
312                Header header = getHeader();
313                Long key = header.tailKey;
314                for (int i = header.size - 1; key != null; i--) {
315                    Node node = getNode(key);
316                    if (node == null) {
317                        break;
318                    }
319                    if (value == node.value || (value != null && value.equals(node.value))) {
320                        return i;
321                    }
322                    key = node.previousKey;
323                }
324                return -1;
325            }
326            catch (IOException e) {
327                throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
328            }
329        }
330    
331        public QueueListEntry getFirstEntry() throws JMSException {
332            try {
333                return getNode(getHeader().headKey);
334            }
335            catch (IOException e) {
336                throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
337            }
338        }
339    
340        public QueueListEntry getLastEntry() throws JMSException {
341            try {
342                return getNode(getHeader().tailKey);
343            }
344            catch (IOException e) {
345                throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
346            }
347        }
348    
349        public QueueListEntry getNextEntry(QueueListEntry entry) throws JMSException {
350            Node node = (Node) entry;
351            try {
352                return getNode(node.nextKey);
353            }
354            catch (IOException e) {
355                throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
356            }
357        }
358    
359        public QueueListEntry getPrevEntry(QueueListEntry entry) throws JMSException {
360            Node node = (Node) entry;
361            try {
362                return getNode(node.previousKey);
363            }
364            catch (IOException e) {
365                throw JMSExceptionHelper.newJMSException("Failed to read from table: " + e, e);
366            }
367        }
368    
369        public QueueListEntry addBefore(Object value, QueueListEntry entry) throws JMSException {
370            try {
371                return doAddBefore(getHeader(), (Node) entry, value);
372            }
373            catch (IOException e) {
374                throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
375            }
376        }
377    
378        public void remove(QueueListEntry entry) throws JMSException {
379            try {
380                removeNode((Node) entry);
381            }
382            catch (IOException e) {
383                throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
384            }
385        }
386    
387        public Object[] toArray() throws JMSException {
388            try {
389                Header header = getHeader();
390                int size = header.size;
391                if (size == 0) {
392                    return EMPTY_ARRAY;
393                }
394                Long key = header.headKey;
395                Object[] answer = new Object[size];
396                for (int i = 0; i < size && key != null; i++) {
397                    Node node = getNode(key);
398                    if (node != null) {
399                        answer[i] = node.value;
400                        key = node.nextKey;
401                    }
402                }
403                return answer;
404            }
405            catch (IOException e) {
406                throw JMSExceptionHelper.newJMSException("Failed to write to table: " + e, e);
407            }
408        }
409    
410        public void rotate() throws JMSException {
411            // TODO could tune this by just swizzling pointers
412            Object obj = removeFirst();
413            if (obj != null) {
414                addLast(obj);
415            }
416        }
417    
418        protected Long createKey(Header header) throws IOException, JMSException {
419            long value = header.lastKeyValue;
420            while (true) {
421                if (value == Long.MAX_VALUE) {
422                    value = 1;
423                }
424                else {
425                    value++;
426                }
427                Long key = new Long(value);
428                if (getNode(key) == null) {
429                    header.lastKeyValue = value;
430                    return key;
431                }
432            }
433        }
434    
435        protected boolean removeNode(Node node) throws IOException, JMSException {
436            Header header = null;
437            boolean updatedPrevious = false;
438            if (node.previousKey != null) {
439                // lets point the previous node to our next node
440                Node previousNode = getNode(node.previousKey);
441                if (previousNode != null) {
442                    previousNode.nextKey = node.nextKey;
443                    updateNode(previousNode);
444                    updatedPrevious = true;
445                }
446            }
447            if (!updatedPrevious) {
448                // lets update the head record
449                header = getHeader();
450                header.headKey = node.nextKey;
451            }
452    
453            boolean updatedNext = false;
454            if (node.nextKey != null) {
455                // lets point the next node to our previous node
456                Node nextNode = getNode(node.nextKey);
457                if (nextNode != null) {
458                    nextNode.previousKey = node.previousKey;
459                    updateNode(nextNode);
460                    updatedNext = true;
461                }
462            }
463            if (!updatedNext) {
464                // lets update the tail record
465                header = getHeader();
466                header.tailKey = node.previousKey;
467            }
468            return true;
469        }
470    
471        /**
472         * Looks up the header object, lazily creating one if the current table is empty
473         *
474         * @return
475         * @throws java.io.IOException
476         */
477        protected abstract Header getHeader() throws IOException, JMSException;
478    
479        /**
480         * Writes the header back to disk after its been changed
481         *
482         * @param header
483         * @throws java.io.IOException
484         */
485        protected abstract void updateHeader(Header header) throws IOException, JMSException;
486    
487        /**
488         * Updates the node
489         *
490         * @param node
491         * @throws java.io.IOException
492         */
493        protected abstract void updateNode(Node node) throws IOException, JMSException;
494    
495        protected abstract Node getNode(Long key) throws IOException, JMSException;
496    
497        protected Node getNode(int index) throws IOException, JMSException {
498            Header header = getHeader();
499            return getNode(header, index);
500        }
501    
502        protected Node getNode(Header header, int index) throws IOException, JMSException {
503            Node node = null;
504            int middle = header.size / 2;
505            if (index > middle) {
506                // lets navigate backwards
507                Long key = header.tailKey;
508                for (int i = header.size; i > index && key != null; i--) {
509                    node = getNode(key);
510                    if (node != null) {
511                        key = node.previousKey;
512                    }
513                }
514            }
515            else {
516                Long key = header.headKey;
517                for (int i = 0; i <= index && key != null; i++) {
518                    node = getNode(key);
519                    if (node != null) {
520                        key = node.nextKey;
521                    }
522                }
523            }
524            return node;
525        }
526    
527        protected Node doAddLast(Object value, Header header) throws IOException, JMSException {
528            Node node = createNode();
529            Long key = createKey(header);
530            node.key = key;
531            node.value = value;
532            Long previousKey = header.tailKey;
533            node.previousKey = previousKey;
534            updateNode(node);
535            updatePreviousNode(previousKey, key);
536            header.tailKey = key;
537            if (header.headKey == null) {
538                header.headKey = key;
539            }
540            header.size++;
541            updateHeader(header);
542            return node;
543        }
544    
545        protected void updateNextNode(Long nextKey, Long key) throws IOException, JMSException {
546            if (nextKey != null) {
547                Node nextNode = getNode(nextKey);
548                if (nextNode == null) {
549                    throw new IOException("Missing node for key: " + nextKey);
550                }
551                nextNode.previousKey = key;
552                updateNode(nextNode);
553            }
554        }
555    
556        protected void updatePreviousNode(Long previousKey, Long key) throws IOException, JMSException {
557            if (previousKey != null) {
558                Node previousNode = getNode(previousKey);
559                if (previousNode == null) {
560                    throw new IOException("Missing previous node for key: " + previousKey);
561                }
562                previousNode.nextKey = key;
563                updateNode(previousNode);
564            }
565        }
566    
567        protected Node doAddBefore(Header header, Node nextNode, Object element) throws JMSException, IOException {
568            if (nextNode == null) {
569                return doAddLast(element, header);
570            }
571            else {
572                // lets add before this nextNode
573                Long key = createKey(header);
574                Node node = createNode();
575                node.value = element;
576                node.key = key;
577                Long previousKey = nextNode.previousKey;
578                node.previousKey = previousKey;
579                node.nextKey = nextNode.key;
580                nextNode.previousKey = key;
581                header.size++;
582    
583                updateNode(node);
584                updateNode(nextNode);
585                updatePreviousNode(previousKey, key);
586                updateHeader(header);
587                return node;
588            }
589        }
590    
591        protected abstract void doRemoveNode(Node node) throws IOException, JMSException;
592    
593        protected static Long wrapLong(long value) {
594            // lets use 0 for null
595            if (value == 0) {
596                return null;
597            }
598            // TODO use a cache?
599            return new Long(value);
600        }
601    
602        protected static long unwrapLong(Long key) {
603            if (key != null) {
604                return key.longValue();
605            }
606            // lets use 0 for null
607            return 0;
608        }
609    
610        protected Node createNode() {
611            return new Node();
612        }
613    }