001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.transport.activeio; 020 021 import java.io.IOException; 022 023 import javax.jms.JMSException; 024 025 import org.activeio.AcceptListener; 026 import org.activeio.AsyncChannel; 027 import org.activeio.AsyncChannelServer; 028 import org.activeio.Channel; 029 import org.activeio.adapter.SynchToAsynchChannelAdapter; 030 import org.apache.commons.logging.Log; 031 import org.apache.commons.logging.LogFactory; 032 import org.activemq.io.WireFormat; 033 import org.activemq.transport.TransportServerChannelSupport; 034 import org.activemq.util.JMSExceptionHelper; 035 036 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 037 038 /** 039 * Binds to a well known port and listens for Sockets ... 040 * 041 * @version $Revision: 1.1.1.1 $ 042 */ 043 public class ActiveIOTransportServerChannel extends TransportServerChannelSupport implements AcceptListener { 044 private static final Log log = LogFactory.getLog(ActiveIOTransportServerChannel.class); 045 046 private final WireFormat wireFormat; 047 048 private final AsyncChannelServer server; 049 050 private final SynchronizedBoolean closed = new SynchronizedBoolean(false); 051 052 /** 053 * @param wireFormat 054 * @param server 055 */ 056 public ActiveIOTransportServerChannel(WireFormat wireFormat, AsyncChannelServer server) { 057 super(server.getBindURI()); 058 this.wireFormat = wireFormat; 059 this.server = server; 060 server.setAcceptListener(this); 061 } 062 063 public void start() throws JMSException { 064 try { 065 super.start(); 066 server.start(); 067 } catch (IOException e) { 068 throw JMSExceptionHelper.newJMSException(e.getMessage(), e); 069 } 070 } 071 072 public void stop() throws JMSException { 073 if (closed.commit(false, true)) { 074 super.stop(); 075 server.dispose(); 076 } 077 } 078 079 /** 080 * @return pretty print of this 081 */ 082 public String toString() { 083 return "ActiveIOTransportServerChannel@" + getUrl(); 084 } 085 086 public void onAccept(Channel c) { 087 if (closed.get()) { 088 c.dispose(); 089 return; 090 } 091 092 AsyncChannel channel = SynchToAsynchChannelAdapter.adapt(c); 093 // If the channel is not allready buffered.. lets buffer it. 094 //if (channel.narrow(WriteBufferedAsynchChannel.class) == null 095 // && channel.narrow(WriteBufferedSynchChannel.class) == null) { 096 // channel = new WriteBufferedAsynchChannel(channel); 097 //} 098 addClient(new ActiveIOTransportChannel(wireFormat.copy(), channel)); 099 } 100 101 public void onAcceptError(IOException error) { 102 if (!closed.get()) { 103 log.warn("Caught exception accepting connection: " + error, error); 104 try { 105 stop(); 106 } catch (JMSException e) { 107 } 108 } 109 110 } 111 112 }