001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.service.impl; 020 021 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 022 import org.activemq.broker.BrokerClient; 023 import org.activemq.service.Dispatcher; 024 import org.activemq.service.MessageContainerManager; 025 import org.activemq.service.Subscription; 026 027 /** 028 * A dispatcher of messages to some JMS connection. 029 * <p/> 030 * Typically this uses either IO or NIO to shovel the messages down 031 * a socket as fast as possible - in either a push or pull way. 032 * 033 * @version $Revision: 1.1.1.1 $ 034 */ 035 public class DispatcherImpl implements Dispatcher { 036 037 private SynchronizedBoolean started = new SynchronizedBoolean(false); 038 private DispatchWorker worker = new DispatchWorker(); //this should be pooled 039 private Thread runner; 040 041 042 /** 043 * Register the MessageContainerManager for the Dispatcher 044 * 045 * @param mcm 046 */ 047 public void register(MessageContainerManager mcm) { 048 worker.register(mcm); 049 } 050 051 /** 052 * Called to indicate that there is work to do on a Subscription this will wake up a Dispatch Worker if it is 053 * waiting for messages to dispatch 054 * 055 * @param sub the Subscription that now has messages to dispatch 056 */ 057 public void wakeup(Subscription sub) { 058 worker.wakeup(); 059 } 060 061 /** 062 * Called to indicate that there is work to do this will wake up a Dispatch Worker if it is 063 * waiting for messages to dispatch 064 */ 065 public void wakeup() { 066 worker.wakeup(); 067 } 068 069 /** 070 * Add an active subscription 071 * 072 * @param client 073 * @param sub 074 */ 075 public void addActiveSubscription(BrokerClient client, Subscription sub) { 076 worker.addActiveSubscription(client, sub); 077 } 078 079 /** 080 * remove an active subscription 081 * 082 * @param client 083 * @param sub 084 */ 085 public void removeActiveSubscription(BrokerClient client, Subscription sub) { 086 worker.removeActiveSubscription(client, sub); 087 } 088 089 /** 090 * start the DispatchWorker 091 * 092 * @see org.activemq.service.Service#start() 093 */ 094 public void start() { 095 if (started.commit(false, true)) { 096 worker.start(); 097 runner = new Thread(worker, "Dispatch Worker"); 098 runner.setDaemon(true); 099 runner.setPriority(Thread.NORM_PRIORITY + 1); 100 runner.start(); 101 } 102 } 103 104 /** 105 * stop the DispatchWorker 106 * 107 * @see org.activemq.service.Service#stop() 108 */ 109 public void stop() { 110 worker.stop(); 111 started.set(false); 112 } 113 }