001 /** 002 * 003 * Copyright 2005 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport.jabber; 020 import java.io.BufferedInputStream; 021 import java.io.DataInputStream; 022 import java.io.DataOutputStream; 023 import java.io.IOException; 024 import java.io.PrintWriter; 025 import java.net.Socket; 026 import java.net.URI; 027 import java.util.ArrayList; 028 import java.util.List; 029 import javax.jms.JMSException; 030 import javax.xml.stream.XMLInputFactory; 031 import javax.xml.stream.XMLStreamException; 032 import javax.xml.stream.XMLStreamReader; 033 import org.activemq.message.Packet; 034 import org.activemq.transport.TransportStatusEvent; 035 import org.activemq.transport.tcp.TcpBufferedOutputStream; 036 import org.activemq.transport.tcp.TcpTransportChannel; 037 import org.activemq.transport.tcp.TcpTransportServerChannel; 038 import org.apache.commons.logging.Log; 039 import org.apache.commons.logging.LogFactory; 040 import EDU.oswego.cs.dl.util.concurrent.Executor; 041 042 /** 043 * A transport for using Jabber (XMPP) to talk to ActiveMQ 044 * 045 * @version $Revision: 1.1 $ 046 */ 047 public class JabberTransportChannel extends TcpTransportChannel { 048 private static final Log log = LogFactory.getLog(JabberTransportChannel.class); 049 private XMLInputFactory inputFactory; 050 private BufferedInputStream in; 051 052 public JabberTransportChannel() { 053 super(new JabberWireFormat()); 054 } 055 056 public JabberTransportChannel(URI remoteLocation) throws JMSException { 057 super(new JabberWireFormat(), remoteLocation); 058 } 059 060 public JabberTransportChannel(URI remoteLocation, URI localLocation) throws JMSException { 061 super(new JabberWireFormat(), remoteLocation, localLocation); 062 } 063 064 public JabberTransportChannel(TcpTransportServerChannel serverChannel, Socket socket, Executor executor) 065 throws JMSException { 066 super(serverChannel, new JabberWireFormat(), socket, executor); 067 } 068 069 public JabberTransportChannel(Socket socket, Executor executor) throws JMSException { 070 super(new JabberWireFormat(), socket, executor); 071 } 072 073 public void run() { 074 System.out.println("Jabber consumer thread starting"); 075 log.trace("Jabber consumer thread starting"); 076 int count = 0; 077 try { 078 if (inputFactory == null) { 079 inputFactory = XMLInputFactory.newInstance(); 080 } 081 XMLStreamReader reader = inputFactory.createXMLStreamReader(in, "UTF-8"); 082 //initialize dialog 083 getJabberWireFormat().initialize(); 084 List list = new ArrayList(); 085 while (!isClosed()) { 086 list.clear(); 087 if (isServerSide() && ++count > 500) { 088 count = 0; 089 Thread.yield(); 090 } 091 if (!reader.hasNext()) { 092 stop(); 093 break; 094 } 095 getJabberWireFormat().readPacket(reader, list); 096 for (int i = 0;i < list.size();i++) { 097 Packet packet = (Packet) list.get(i); 098 if (packet != null) { 099 doConsumePacket(packet); 100 } 101 } 102 } 103 stop(); 104 } 105 catch (XMLStreamException e) { 106 doClose(e); 107 } 108 catch (JMSException e) { 109 doClose(e); 110 } 111 catch (IOException e) { 112 doClose(e); 113 } 114 } 115 116 public JabberWireFormat getJabberWireFormat() { 117 return (JabberWireFormat) getWireFormat(); 118 } 119 120 protected void initializeStreams() throws IOException { 121 System.out.println("Creating input stream"); 122 this.in = new BufferedInputStream(socket.getInputStream(), 8192); 123 this.dataIn = new DataInputStream(in); 124 System.out.println("creating output stream"); 125 TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), 8192); 126 this.dataOut = new DataOutputStream(buffOut); 127 System.out.println("Creating print writer..."); 128 PrintWriter writer = new PrintWriter(socket.getOutputStream()); 129 getJabberWireFormat().setWriter(writer); 130 System.out.println("Firing event"); 131 fireStatusEvent(new TransportStatusEvent(this, TransportStatusEvent.CONNECTED)); 132 } 133 }