001    /**
002     *
003     * Copyright 2004 Hiram Chirino
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    package org.activemq.ra;
019    
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.LinkedList;
023    import java.util.List;
024    
025    import javax.jms.JMSException;
026    import javax.jms.ServerSession;
027    import javax.jms.ServerSessionPool;
028    import javax.jms.Session;
029    import javax.resource.spi.UnavailableException;
030    import javax.resource.spi.endpoint.MessageEndpoint;
031    
032    import org.activemq.ActiveMQQueueSession;
033    import org.activemq.ActiveMQSession;
034    import org.activemq.ActiveMQTopicSession;
035    import org.activemq.message.ActiveMQMessage;
036    import org.apache.commons.logging.Log;
037    import org.apache.commons.logging.LogFactory;
038    
039    /**
040     * @version $Revision: 1.1.1.1 $ $Date: 2005/03/11 21:15:11 $
041     */
042    public class ServerSessionPoolImpl implements ServerSessionPool {
043        
044        private static final Log log = LogFactory.getLog(ServerSessionPoolImpl.class);
045    
046        private final ActiveMQAsfEndpointWorker activeMQAsfEndpointWorker;
047        private final int maxSessions;
048    
049        private ArrayList idleSessions = new ArrayList();
050        private LinkedList activeSessions = new LinkedList();
051        private boolean closing = false;
052    
053        public ServerSessionPoolImpl(ActiveMQAsfEndpointWorker activeMQAsfEndpointWorker, int maxSessions) {
054            this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker;
055            this.maxSessions=maxSessions;
056        }
057    
058        private ServerSessionImpl createServerSessionImpl() throws JMSException {
059            ActiveMQActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec();
060            int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession();
061            final ActiveMQSession session = (ActiveMQSession) activeMQAsfEndpointWorker.connection.createSession(activeMQAsfEndpointWorker.transacted,acknowledge);            
062            MessageEndpoint endpoint;
063            try {                
064                    int batchSize = 0;
065                    if (activationSpec.getEnableBatchBooleanValue()) {
066                            batchSize = activationSpec.getMaxMessagesPerBatchIntValue();
067                    }
068                if( activationSpec.isUseRAManagedTransactionEnabled() ) {
069                    // The RA will manage the transaction commit.
070                    endpoint = createEndpoint(null);   
071                    return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize);
072                } else {
073                    // Give the container an object to manage to transaction with.
074                    endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext()));                
075                    return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize);
076                }
077            } catch (UnavailableException e) {
078                // The container could be limiting us on the number of endpoints
079                // that are being created.
080                    log.debug("Could not create an endpoint.", e);
081                session.close();
082                return null;
083            }
084        }
085    
086        private MessageEndpoint createEndpoint(LocalAndXATransaction txResourceProxy) throws UnavailableException {
087            MessageEndpoint endpoint;
088            endpoint = activeMQAsfEndpointWorker.endpointFactory.createEndpoint(txResourceProxy);
089            MessageEndpointProxy endpointProxy = new MessageEndpointProxy(endpoint);
090            return endpointProxy;
091        }
092    
093        /**
094         */
095        synchronized public ServerSession getServerSession() throws JMSException {
096            log.debug("ServerSession requested.");
097            if (closing) {
098                throw new JMSException("Session Pool Shutting Down.");
099            }
100    
101            if (idleSessions.size() > 0) {
102                ServerSessionImpl ss = (ServerSessionImpl) idleSessions.remove(idleSessions.size() - 1);
103                activeSessions.addLast(ss);
104                log.debug("Using idle session: " + ss);
105                return ss;
106            } else {
107                // Are we at the upper limit?
108                if (activeSessions.size() >= maxSessions) {
109                    // then reuse the allready created sessions..
110                    // This is going to queue up messages into a session for
111                    // processing.
112                    return getExistingServerSession();
113                }
114                ServerSessionImpl ss = createServerSessionImpl();
115                // We may not be able to create a session due to the container
116                // restricting us.
117                if (ss == null) {
118                    if( idleSessions.size() == 0 ) {
119                            throw new JMSException("Endpoint factory did not allows to any endpoints.");
120                    }
121                                     
122                    return getExistingServerSession();
123                }
124                activeSessions.addLast(ss);
125                log.debug("Created a new session: " + ss);
126                return ss;
127            }
128        }
129    
130        /**
131         * @param message
132         * @throws JMSException
133         */
134        private void dispatchToSession(ActiveMQMessage message) throws JMSException {
135    
136            ServerSession serverSession = getServerSession();
137            Session nestedSession = serverSession.getSession();
138            ActiveMQSession session = null;
139            if (nestedSession instanceof ActiveMQSession) {
140                session = (ActiveMQSession) nestedSession;
141            } else if (nestedSession instanceof ActiveMQTopicSession) {
142                ActiveMQTopicSession topicSession = (ActiveMQTopicSession) nestedSession;
143                session = (ActiveMQSession) topicSession.getNext();
144            } else if (nestedSession instanceof ActiveMQQueueSession) {
145                ActiveMQQueueSession queueSession = (ActiveMQQueueSession) nestedSession;
146                session = (ActiveMQSession) queueSession.getNext();
147            } else {
148                throw new JMSException("Invalid instance of session obtained from server session." +
149                "The instance should be one of the following: ActiveMQSession, ActiveMQTopicSession, ActiveMQQueueSession. " +
150                "Found instance of " + nestedSession.getClass().getName());
151            }
152            session.dispatch(message);
153            serverSession.start();
154        }
155    
156        
157        /**
158         * @return
159         */
160        private ServerSession getExistingServerSession() {
161            ServerSessionImpl ss = (ServerSessionImpl) activeSessions.removeFirst();
162            activeSessions.addLast(ss);
163            log.debug("Reusing an active session: " + ss);
164            return ss;
165        }
166    
167        synchronized public void returnToPool(ServerSessionImpl ss) {
168            log.debug("Session returned to pool: " + ss);
169            activeSessions.remove(ss);
170            idleSessions.add(ss);
171            notify();
172        }
173    
174        synchronized public void removeFromPool(ServerSessionImpl ss) {
175            activeSessions.remove(ss);
176            try {
177                    ActiveMQSession session = (ActiveMQSession) ss.getSession();
178                    List l = session.getUnconsumedMessages();
179                    for (Iterator i = l.iterator(); i.hasNext();) {
180                                    dispatchToSession((ActiveMQMessage) i.next());                          
181                    }
182            } catch (Throwable t) {
183                        log.error("Error redispatching unconsumed messages from stale session", t);         
184            }
185            ss.close();
186            notify();
187        }
188    
189        public void close() {
190            synchronized (this) {
191                closing = true;
192                closeIdleSessions();
193                while( activeSessions.size() > 0 ) {
194                    try {
195                        wait();
196                    } catch (InterruptedException e) {
197                        Thread.currentThread().interrupt();
198                        return;
199                    }
200                    closeIdleSessions();
201                }
202            }
203        }
204    
205        private void closeIdleSessions() {
206            for (Iterator iter = idleSessions.iterator(); iter.hasNext();) {
207                ServerSessionImpl ss = (ServerSessionImpl) iter.next();
208                ss.close();
209            }
210        }
211    
212    }