001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.transport.vm;
019    
020    import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
021    import EDU.oswego.cs.dl.util.concurrent.Channel;
022    import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
023    import org.apache.commons.logging.Log;
024    import org.apache.commons.logging.LogFactory;
025    import org.activemq.broker.BrokerConnector;
026    import org.activemq.message.Packet;
027    import org.activemq.message.PacketListener;
028    import org.activemq.transport.TransportChannelListener;
029    import org.activemq.transport.TransportChannelSupport;
030    
031    import javax.jms.JMSException;
032    
033    /**
034     * A VM implementation of a TransportChannel
035     *
036     * @version $Revision: 1.1.1.1 $
037     */
038    public class VmTransportChannel extends TransportChannelSupport implements Runnable {
039    
040        private static final Log log = LogFactory.getLog(VmTransportChannel.class);
041        private static final Object TERMINATE = new Object();
042        private static int lastThreadId = 0; // To number the threads.
043    
044        // properties
045        private Channel sendChannel;
046        private Channel receiveChannel;
047        private int sendCapacity = 10;
048        private int receiveCapacity = 10;
049        private boolean asyncSend = false;
050    
051        // state
052        private SynchronizedBoolean closed;
053        private SynchronizedBoolean started;
054        private Thread thread; //need to change this - and use a thread pool
055        private PacketListener sendListener;
056        private VmTransportChannel clientSide;
057    
058        public VmTransportChannel() {
059            closed = new SynchronizedBoolean(false);
060            started = new SynchronizedBoolean(false);
061        }
062    
063        public VmTransportChannel(Channel sendChannel, Channel receiveChannel) {
064            this();
065            this.sendChannel = sendChannel;
066            this.receiveChannel = receiveChannel;
067        }
068    
069        public VmTransportChannel(int capacity) {
070            this(new BoundedLinkedQueue(capacity), new BoundedLinkedQueue(capacity));
071        }
072    
073        public void start() throws JMSException {
074            if (started.commit(false, true)) {
075                if (isAsyncSend()) {
076                    // lets force the lazy construction
077                    // as we sometimes need to create these early when
078                    // wiring together with a server side channel
079                    getSendChannel();
080                    getReceiveChannel();
081    
082                    thread = new Thread(this, "VM Transport: " + getNextThreadId());
083                    if (isServerSide()) {
084                        thread.setDaemon(true);
085                    }
086                    thread.start();
087                }
088            }
089        }
090    
091        public void stop() {
092            if (closed.commit(false, true)) {
093                super.stop();
094                try {
095                    // to close the channel, lets send a null
096                    if (sendChannel != null) {
097                        sendChannel.put(TERMINATE);
098                    }
099                    if (receiveChannel != null) {
100                        receiveChannel.put(TERMINATE);
101                    }
102    
103                    if (thread != null) {
104                        // lets wait for the receive thread to terminate
105                        thread.join();
106                    }
107                }
108                catch (Exception e) {
109                    log.trace(toString() + " now closed with exception: " + e);
110                }
111            }
112        }
113        
114            public void forceDisconnect() {
115                    throw new IllegalStateException("Disconnection not applicable for VM transport");
116            }
117        
118        /**
119         * Asynchronously send a Packet
120         *
121         * @param packet
122         * @throws JMSException
123         */
124        public void asyncSend(Packet packet) throws JMSException {
125            if (sendChannel != null) {
126                while (true) {
127                    try {
128                        sendChannel.put(packet);
129                        break;
130                    }
131                    catch (InterruptedException e) {
132                        // continue
133                    }
134                }
135            }
136            else {
137                if (sendListener == null) {
138                    if (clientSide != null) {
139                        sendListener = clientSide.createPacketListenerSender();
140                    }
141                }
142                if (sendListener != null) {
143                    sendListener.consume(packet);
144                }
145                else {
146                    throw new JMSException("No sendListener available");
147                }
148            }
149        }
150    
151    
152        public boolean isMulticast() {
153            return false;
154        }
155    
156        /**
157         * reads packets from a Socket
158         */
159        public void run() {
160            while (!closed.get()) {
161                try {
162                    Object answer = receiveChannel.take();
163                    if (answer == TERMINATE) {
164                        log.trace("The socket peer is now closed");
165                        stop();
166                        return;
167                    }
168                    else if (answer != null) {
169                        Packet packet = (Packet) answer;
170                        // we might have just got a packet in but we've already shut down
171                        if (closed.get()) {
172                            break;
173                        }
174                        doConsumePacket(packet);
175                    }
176                }
177                catch (InterruptedException e) {
178                    // continue
179                }
180            }
181        }
182    
183        /**
184         * pretty print for object
185         *
186         * @return String representation of this object
187         */
188        public String toString() {
189            return "VmTransportChannel: " + sendChannel;
190        }
191    
192        /**
193         * Connects the client side transport channel with the broker
194         */
195        public void connect(BrokerConnector brokerConnector) throws JMSException {
196            TransportChannelListener listener = (TransportChannelListener) brokerConnector;
197            VmTransportChannel serverSide = createServerSide();
198            listener.addClient(serverSide);
199            serverSide.start();
200        }
201    
202        /**
203         * Creates the server side version of this client side channel. On the server side
204         * the client's side sendChannel is the receiveChannel and vice versa
205         *
206         * @return
207         */
208        public VmTransportChannel createServerSide() throws JMSException {
209            VmTransportChannel channel = new VmTransportChannel(getReceiveChannel(), getSendChannel());
210            channel.clientSide = this;
211            return channel;
212        }
213    
214        public void setPacketListener(PacketListener listener) {
215            super.setPacketListener(listener);
216            if (clientSide != null) {
217                clientSide.sendListener = listener;
218    
219            }
220        }
221        
222        /**
223         * Can this wireformat process packets of this version
224         * @param version the version number to test
225         * @return true if can accept the version
226         */
227        public boolean canProcessWireFormatVersion(int version){
228            return true;
229        }
230        
231        /**
232         * Does the transport support wire format version info
233         * @return
234         */
235        public boolean doesSupportWireFormatVersioning(){
236            return false;
237        }
238        
239        /**
240         * @return the current version of this wire format
241         */
242        public int getCurrentWireFormatVersion(){
243            return -1;
244        }
245        
246        /**
247         * some transports/wire formats will implement their own fragementation
248         * @return true unless a transport/wire format supports it's own fragmentation
249         */
250        public boolean doesSupportMessageFragmentation(){
251            return false;
252        }
253        
254        
255        /**
256         * Some transports/wireformats will not be able to understand compressed messages
257         * @return true unless a transport/wire format cannot understand compression
258         */
259        public boolean doesSupportMessageCompression(){
260            return false;
261        }
262    
263        // Properties
264        //-------------------------------------------------------------------------
265        public int getReceiveCapacity() {
266            return receiveCapacity;
267        }
268    
269        public void setReceiveCapacity(int receiveCapacity) {
270            this.receiveCapacity = receiveCapacity;
271        }
272    
273        public int getSendCapacity() {
274            return sendCapacity;
275        }
276    
277        public void setSendCapacity(int sendCapacity) {
278            this.sendCapacity = sendCapacity;
279        }
280    
281        public boolean isAsyncSend() {
282            return asyncSend;
283        }
284    
285        public void setAsyncSend(boolean asyncSend) {
286            this.asyncSend = asyncSend;
287        }
288    
289        public Channel getSendChannel() {
290            if (isAsyncSend()) {
291                if (sendChannel == null) {
292                    sendChannel = createChannel(getSendCapacity());
293                }
294            }
295            return sendChannel;
296        }
297    
298        public void setSendChannel(Channel sendChannel) {
299            this.sendChannel = sendChannel;
300        }
301    
302        public Channel getReceiveChannel() {
303            if (isAsyncSend()) {
304                if (receiveChannel == null) {
305                    receiveChannel = createChannel(getReceiveCapacity());
306                }
307            }
308            return receiveChannel;
309        }
310    
311        public void setReceiveChannel(Channel receiveChannel) {
312            this.receiveChannel = receiveChannel;
313        }
314    
315        // Implementation methods
316        //-------------------------------------------------------------------------
317        protected static synchronized int getNextThreadId() {
318            return lastThreadId++;
319        }
320    
321        protected Channel createChannel(int capacity) {
322            return new BoundedLinkedQueue(capacity);
323        }
324    
325        /**
326         * Creates a sender PacketListener which handles any receipts then delegates
327         * to the ultimate PacketListener (typically the JMS client)
328         *
329         * @return
330         */
331        protected PacketListener createPacketListenerSender() {
332            return new PacketListener() {
333                public void consume(Packet packet) {
334                    doConsumePacket(packet, getPacketListener());
335                }
336            };
337        }
338    
339        protected void doClose(Exception ex) {
340            if (!closed.get()) {
341                JMSException jmsEx = new JMSException("Error reading socket: " + ex.getMessage());
342                jmsEx.setLinkedException(ex);
343                onAsyncException(jmsEx);
344                stop();
345            }
346        }
347    
348        public PacketListener getSendListener() {
349            return sendListener;
350        }
351    
352        public void setSendListener(PacketListener sendListener) {
353            this.sendListener = sendListener;
354        }
355    }