001    /**
002     * 
003     * Copyright 2004 Protique Ltd
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.transport.jabber;
019    
020    import org.activemq.io.AbstractWireFormat;
021    import org.activemq.io.WireFormat;
022    import org.activemq.io.util.ByteArray;
023    import org.activemq.message.ActiveMQBytesMessage;
024    import org.activemq.message.ActiveMQDestination;
025    import org.activemq.message.ActiveMQMessage;
026    import org.activemq.message.ActiveMQObjectMessage;
027    import org.activemq.message.ActiveMQTextMessage;
028    import org.activemq.message.ConnectionInfo;
029    import org.activemq.message.ConsumerInfo;
030    import org.activemq.message.Packet;
031    import org.activemq.util.IdGenerator;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    
035    import javax.jms.Destination;
036    import javax.jms.JMSException;
037    import javax.xml.namespace.QName;
038    import javax.xml.stream.XMLStreamConstants;
039    import javax.xml.stream.XMLStreamException;
040    import javax.xml.stream.XMLStreamReader;
041    import java.io.DataInput;
042    import java.io.DataOutput;
043    import java.io.IOException;
044    import java.io.PrintWriter;
045    import java.io.Serializable;
046    import java.net.InetAddress;
047    import java.util.Iterator;
048    import java.util.List;
049    import java.util.Map;
050    
051    /**
052     * A wire format which uses XMPP format of messages
053     *
054     * @version $Revision: 1.1 $
055     */
056    public class JabberWireFormat extends AbstractWireFormat {
057        private static final Log log = LogFactory.getLog(JabberWireFormat.class);
058    
059        private static final String NAMESPACE = "http://etherx.jabber.org/streams";
060        private static final String QUEUE_PREFIX = "queue:";
061        private static final String TOPIC_PREFIX = "topic:";
062        private static final String TEMP_QUEUE_PREFIX = "tempQueue:";
063        private static final String TEMP_TOPIC_PREFIX = "tempTopic:";
064    
065        private static final QName STREAM_QNAME = new QName(NAMESPACE, "stream", "stream");
066        private static final QName MESSAGE_QNAME = new QName("jabber:client","message","message");
067        private static final QName AUTH_QNAME = new QName("jabber:iq:auth", "query", "query");
068    
069        private IdGenerator idGenerator = new IdGenerator();
070        private String clientID = idGenerator.generateId();
071        private ConnectionInfo connectionInfo;
072        private PrintWriter writer;
073        private String userName;
074        private boolean validStream = false;
075    
076    
077        public WireFormat copy() {
078            return new JabberWireFormat();
079        }
080    
081        public Packet readPacket(int firstByte, DataInput in) throws IOException {
082            return null;  /** TODO */
083        }
084    
085    
086        /**
087         * Reads a packet from the XML stream
088         * @param reader
089         * @param returnPackets
090         * @throws XMLStreamException
091         * @throws JMSException
092         */
093        public void readPacket(XMLStreamReader reader, List returnPackets) throws XMLStreamException, JMSException {
094            String sessionId = getAttributeValue("id", reader);
095            
096            if (reader.next() == XMLStreamConstants.START_ELEMENT) {
097                
098                QName name = reader.getName();
099                
100                if (!validStream) {
101                    if (name.equals(STREAM_QNAME)) {
102                        validStream = true;
103                    }
104                    else {
105                        String errStr = "Bad initial QName for stream. Received: " + name + " while expecting: "
106                                + STREAM_QNAME;
107                        log.warn(errStr);
108                        throw new JMSException(errStr);
109                    }
110                }
111                else {
112                    QName test = new QName("jabber:iq:auth", "query", "query");
113                    
114                    if (name.equals(AUTH_QNAME)) {
115                        if (reader.hasNext() && reader.next()==XMLStreamConstants.START_ELEMENT){
116                            name = reader.getName();
117                            this.userName = reader.getElementText();
118                            
119                            //skip past the end
120                            if (reader.hasNext()) {
121                                reader.next();
122                            }
123                            if (reader.hasNext() && reader.next()==XMLStreamConstants.START_ELEMENT){
124                                if (sessionId != null){
125                                    writer.println(" <iq id='" + sessionId +"' type='result'/>");
126                                    writer.flush();
127                                 }
128                            }else {
129                                //write back a request for the password
130                                writer.println("<iq id='" + sessionId + "'");
131                                writer.println(" type = 'result'>");
132                                writer.println("<query xmlns='jabber:iq:auth'><username>" + this.userName + "</username><password/><digest/><resource/></query></iq>");
133                                writer.flush();
134                                returnPackets.add(createConnectionInfo());
135                                returnPackets.add(createConsumerPacket());
136                            }
137                        }    
138                    }else if (name.equals(MESSAGE_QNAME)){
139                        Packet pack = readMessage(reader);
140                        if (pack != null){
141                            returnPackets.add(pack);
142                        }
143                    }else {
144                        //general catch all - just say ok ..
145                        if (sessionId != null){
146                           writer.println(" <iq id='" + sessionId +"' type='result'/>");
147                           writer.flush();
148                        }
149                    }
150                }
151            }
152        }
153        
154        private String getAttributeValue(String attributeName, XMLStreamReader reader) {
155            String result = null;
156            for (int i = 0;i < reader.getAttributeCount();i++) {
157                if (reader.getAttributeName(i).toString().equals(attributeName)) {
158                    result = reader.getAttributeValue(i);
159                    break;
160                }
161            }
162            return result;
163        }
164        
165       
166    
167    
168        public Packet writePacket(Packet packet, DataOutput out) throws IOException, JMSException {
169            switch (packet.getPacketType()) {
170                case Packet.ACTIVEMQ_MESSAGE:
171                    writeMessage((ActiveMQMessage) packet, "", out);
172                    break;
173    
174                case Packet.ACTIVEMQ_TEXT_MESSAGE:
175                    writeTextMessage((ActiveMQTextMessage) packet, out);
176                    break;
177    
178                case Packet.ACTIVEMQ_BYTES_MESSAGE:
179                    writeBytesMessage((ActiveMQBytesMessage) packet, out);
180                    break;
181    
182                case Packet.ACTIVEMQ_OBJECT_MESSAGE:
183                    writeObjectMessage((ActiveMQObjectMessage) packet, out);
184                    break;
185    
186                case Packet.ACTIVEMQ_CONNECTION_INFO:
187                    
188                case Packet.ACTIVEMQ_MAP_MESSAGE:
189                case Packet.ACTIVEMQ_STREAM_MESSAGE:
190    
191    
192                case Packet.ACTIVEMQ_BROKER_INFO:
193                case Packet.ACTIVEMQ_MSG_ACK:
194                case Packet.CONSUMER_INFO:
195                case Packet.DURABLE_UNSUBSCRIBE:
196                case Packet.INT_RESPONSE_RECEIPT_INFO:
197                case Packet.PRODUCER_INFO:
198                case Packet.RECEIPT_INFO:
199                case Packet.RESPONSE_RECEIPT_INFO:
200                case Packet.SESSION_INFO:
201                case Packet.TRANSACTION_INFO:
202                case Packet.XA_TRANSACTION_INFO:
203                default:
204                    log.debug("Ignoring message type: " + packet.getPacketType() + " packet: " + packet);
205            }
206            writer.flush();
207            return null;
208        }
209    
210        /**
211         * Can this wireformat process packets of this version
212         *
213         * @param version the version number to test
214         * @return true if can accept the version
215         */
216        public boolean canProcessWireFormatVersion(int version) {
217            return true;
218        }
219    
220        /**
221         * @return the current version of this wire format
222         */
223        public int getCurrentWireFormatVersion() {
224            return 1;
225        }
226    
227        public PrintWriter getWriter() {
228            return writer;
229        }
230    
231        public void setWriter(PrintWriter writer) {
232            this.writer = writer;
233        }
234    
235        // Implementation methods
236        //-------------------------------------------------------------------------
237        protected Packet createConnectionInfo() {
238            connectionInfo = new ConnectionInfo();
239            connectionInfo.setStarted(true);
240            connectionInfo.setClientId(this.clientID);
241            connectionInfo.setClientVersion("" + getCurrentWireFormatVersion());
242            connectionInfo.setUserName(userName);
243            return connectionInfo;
244        }
245        
246        protected Packet createConsumerPacket(){
247            ConsumerInfo info = new ConsumerInfo();
248            info.setClientId(this.clientID);
249            info.setConsumerNo(0);
250            info.setStarted(true);
251            info.setStartTime(System.currentTimeMillis());
252            info.setDestination(createDestination("chat",this.userName));
253            return info;
254        }
255    
256    
257        
258        
259        protected void initialize() throws IOException {
260            //start the stream - 
261            String hostName = InetAddress.getLocalHost().toString();
262            writer.println("<?xml version='1.0'?>");
263            writer.println("<stream:stream");
264            writer.println("  xmlns='jabber:client'");
265            writer.println("  xml:lang='en'");
266            writer.println("  xmlns:stream='http://etherx.jabber.org/streams'");
267            writer.println("  from='" + hostName + "'");
268            writer.println("  id='" + clientID + "'>");
269            writer.flush();
270       }
271    
272        protected Packet readMessage(XMLStreamReader reader) throws XMLStreamException, JMSException {
273            ActiveMQTextMessage message = new ActiveMQTextMessage();
274            message.setJMSMessageID(idGenerator.generateId());
275            QName name = reader.getName();
276            String to = getAttributeValue("to", reader);
277            String type = getAttributeValue("type",reader);
278                   
279            if (type != null){
280                message.setJMSType(type);
281            }
282                   
283            if (to != null && to.length() > 0) {
284                message.setJMSDestination(createDestination(type,to));
285            }
286            
287            if (this.userName != null && this.userName .length() > 0) {
288                message.setJMSReplyTo(createDestination("chat",this.userName));
289            }
290    
291            while (reader.hasNext()) {
292                switch (reader.nextTag()) {
293                    case XMLStreamConstants.START_ELEMENT:
294                        if (!readElement(reader, message)) {
295                            log.debug("Unknown element: " + reader.getName());
296                        }
297                        break;
298                    case XMLStreamConstants.END_ELEMENT:
299                    case XMLStreamConstants.END_DOCUMENT:
300                        return message;
301                }
302            }
303            return message;
304        }
305    
306        protected boolean readElement(XMLStreamReader reader, ActiveMQTextMessage message) throws JMSException, XMLStreamException {
307            QName name = reader.getName();
308            String localPart = name.getLocalPart();
309            if (localPart.equals("body")) {
310                message.setText(reader.getElementText());
311                return true;
312            }
313            else if (localPart.equals("thread")) {
314                message.setJMSCorrelationID(reader.getElementText());
315                return true;
316            }
317            else {
318                return false;
319            }
320        }
321    
322        protected String readXMLAsText(XMLStreamReader reader) throws XMLStreamException {
323            StringBuffer buffer = new StringBuffer();
324            int elementCount = 0;
325            while (reader.hasNext()) {
326                switch (reader.nextTag()) {
327                    case XMLStreamConstants.START_ELEMENT:
328                        if (elementCount++ > 0) {
329                            writeStartElement(reader);
330                        }
331                        break;
332    
333                    case XMLStreamConstants.CHARACTERS:
334                        buffer.append(reader.getText());
335                        break;
336    
337                    case XMLStreamConstants.END_ELEMENT:
338                        if (--elementCount <= 0) {
339                            return buffer.toString();
340                        }
341                        writeEndElement(reader);
342                        break;
343    
344                    case XMLStreamConstants.END_DOCUMENT:
345                        return buffer.toString();
346                }
347            }
348            return buffer.toString();
349        }
350    
351        protected void writeStartElement(XMLStreamReader reader) {
352            writer.print("<");
353            writeQName(reader.getName());
354            for (int i = 0, size = reader.getNamespaceCount(); i < size; i++) {
355                writer.print("xmlns");
356                String prefix = reader.getNamespacePrefix(i);
357                if (prefix != null && prefix.length() > 0) {
358                    writer.print(":");
359                    writer.print(prefix);
360                }
361                writer.print("='");
362                writer.print(reader.getNamespaceURI(i));
363                writer.print("'");
364            }
365            for (int i = 0, size = reader.getAttributeCount(); i < size; i++) {
366                writer.print("xmlns");
367                writeQName(reader.getAttributeName(i));
368                writer.print("='");
369                writer.print(reader.getAttributeValue(i));
370                writer.print("'");
371            }
372            writer.println(">");
373        }
374    
375        protected void writeEndElement(XMLStreamReader reader) {
376            writer.print("</");
377            writeQName(reader.getName());
378            writer.println(">");
379        }
380    
381        protected void writeQName(QName name) {
382            String prefix = name.getPrefix();
383            if (prefix != null && prefix.length() > 0) {
384                writer.print(prefix);
385                writer.print(":");
386            }
387            writer.print(name.getLocalPart());
388        }
389    
390        protected ActiveMQDestination createDestination(String typeName,String text) {
391            int type = ActiveMQDestination.ACTIVEMQ_QUEUE;
392            if (text.startsWith(TOPIC_PREFIX)) {
393                type = ActiveMQDestination.ACTIVEMQ_TOPIC;
394                text = text.substring(TOPIC_PREFIX.length());
395            }
396            else if (text.startsWith(QUEUE_PREFIX)) {
397                type = ActiveMQDestination.ACTIVEMQ_QUEUE;
398                text = text.substring(QUEUE_PREFIX.length());
399            }
400            else if (text.startsWith(TEMP_QUEUE_PREFIX)) {
401                type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE;
402                text = text.substring(TEMP_QUEUE_PREFIX.length());
403            }
404            else if (text.startsWith(TEMP_TOPIC_PREFIX)) {
405                type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC;
406                text = text.substring(TEMP_TOPIC_PREFIX.length());
407            }else {
408                if (typeName != null){
409                    if (typeName.equals("groupchat")){
410                        type = ActiveMQDestination.ACTIVEMQ_TOPIC;
411                    }
412                    //else default is a queue - (assume default typeName is 'chat')
413                }
414            }
415            text = text.trim();
416            if (text.length() == 0) {
417                return null;
418            }
419            return ActiveMQDestination.createDestination(type, text);
420        }
421    
422        protected String toString(Destination destination) {
423            if (destination instanceof ActiveMQDestination) {
424                ActiveMQDestination activeDestination = (ActiveMQDestination) destination;
425                String physicalName = activeDestination.getPhysicalName();
426                switch (activeDestination.getDestinationType()) {
427                    case ActiveMQDestination.ACTIVEMQ_QUEUE:
428                        return QUEUE_PREFIX + physicalName;
429    
430                    case ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE:
431                        return TEMP_QUEUE_PREFIX + physicalName;
432    
433                    case ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC:
434                        return TEMP_TOPIC_PREFIX + physicalName;
435                }
436                return physicalName;
437            }
438            return destination != null ? destination.toString() : "";
439        }
440    
441        protected void writeObjectMessage(ActiveMQObjectMessage message, DataOutput out) throws JMSException, IOException {
442            Serializable object = message.getObject();
443            String text = (object != null) ? object.toString() : "";
444            writeMessage(message, text, out);
445        }
446    
447        protected void writeTextMessage(ActiveMQTextMessage message, DataOutput out) throws JMSException, IOException {
448            writeMessage(message, message.getText(), out);
449        }
450    
451        protected void writeBytesMessage(ActiveMQBytesMessage message, DataOutput out) throws IOException {
452            ByteArray data = message.getBodyAsBytes();
453            String text = encodeBinary(data.getBuf(), data.getOffset(), data.getLength());
454            writeMessage(message, text, out);
455        }
456    
457        protected void writeMessage(ActiveMQMessage message, String body, DataOutput out) throws IOException {
458            String type = getXmppType(message);
459    
460            writer.print("<");
461            writer.print(type);
462            writer.print(" to='");
463            writer.print(toString(message.getJMSDestination()));
464            writer.print("' from='");
465            writer.print(toString(message.getJMSReplyTo()));
466            String messageID = message.getJMSMessageID();
467            if (messageID != null) {
468                writer.print("' id='");
469                writer.print(messageID);
470            }
471    
472            Map properties = message.getProperties();
473            if (properties != null) {
474                for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
475                    Map.Entry entry = (Map.Entry) iter.next();
476                    Object key = entry.getKey();
477                    Object value = entry.getValue();
478                    if (value != null) {
479                        writer.print("' ");
480                        writer.print(key.toString());
481                        writer.print("='");
482                        writer.print(value.toString());
483                    }
484                }
485            }
486    
487            writer.println("'>");
488    
489            String id = message.getJMSCorrelationID();
490            if (id != null) {
491                writer.print("<thread>");
492                writer.print(id);
493                writer.print("</thread>");
494            }
495            writer.print("<body>");
496            writer.print(body);
497            writer.println("</body>");
498            writer.print("</");
499            writer.print(type);
500            writer.println(">");
501        }
502    
503        protected String encodeBinary(byte[] data, int offset, int length) {
504            // TODO
505            throw new RuntimeException("Not implemented yet!");
506        }
507    
508        protected String getXmppType(ActiveMQMessage message) {
509            String type = message.getJMSType();
510            if (type == null) {
511                type = "message";
512            }
513            return type;
514        }
515    }