001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 019 package org.activemq.io.util; 020 import java.io.File; 021 import java.io.IOException; 022 import java.util.List; 023 import javax.jms.JMSException; 024 import org.apache.commons.logging.Log; 025 import org.apache.commons.logging.LogFactory; 026 import org.activemq.io.WireFormat; 027 import org.activemq.io.impl.DefaultWireFormat; 028 import org.activemq.message.ActiveMQMessage; 029 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 030 031 /** 032 * Implements a controlled thread safe queue, with ActiveMQMessages being spooled to disk for reading asynchronously. 033 */ 034 public class SpooledBoundedActiveMQMessageQueue { 035 private String name; 036 private DataContainer container; 037 private WireFormat wireFormat; 038 private long maxDataLength; 039 private boolean closed; 040 private boolean stopped; 041 private SynchronizedInt size = new SynchronizedInt(0); 042 private Object inLock = new Object(); 043 private Object outLock = new Object(); 044 private static int WAIT_TIMEOUT = 250; 045 private static final Log log = LogFactory.getLog(SpooledBoundedActiveMQMessageQueue.class); 046 047 /** 048 * Constructor for SpooledBoundedActiveMQMessageQueue 049 * 050 * @param dir 051 * @param name 052 * @param maxDataLength 053 * @param maxBlockSize 054 * @throws IOException 055 */ 056 public SpooledBoundedActiveMQMessageQueue(File dir, String name, long maxDataLength, int maxBlockSize) throws IOException { 057 //ensure name can be used as a file name 058 char[] chars = name.toCharArray(); 059 for (int i = 0;i < chars.length;i++) { 060 if (!Character.isLetterOrDigit(chars[i])) { 061 chars[i] = '_'; 062 } 063 } 064 this.name = new String(chars); 065 this.maxDataLength = maxDataLength; 066 this.wireFormat = new DefaultWireFormat(); 067 this.container = new DataContainer(dir, this.name, maxBlockSize); 068 //as the DataContainer is temporary, clean-up any old files 069 this.container.deleteAll(); 070 } 071 072 /** 073 * Constructor for SpooledBoundedActiveMQMessageQueue 074 * 075 * @param dir 076 * @param name 077 * @throws IOException 078 */ 079 public SpooledBoundedActiveMQMessageQueue(File dir, String name) throws IOException { 080 this(dir, name, 1024 * 1024 * 64, 8192); 081 } 082 083 /** 084 * Place a ActiveMQMessage at the head of the Queue 085 * 086 * @param packet 087 * @throws JMSException 088 */ 089 public void enqueue(ActiveMQMessage packet) throws JMSException { 090 if (!isFull()) { 091 enqueueNoBlock(packet); 092 } 093 else { 094 synchronized (inLock) { 095 try { 096 while (isFull()) { 097 inLock.wait(WAIT_TIMEOUT); 098 } 099 } 100 catch (InterruptedException ie) { 101 } 102 } 103 enqueueNoBlock(packet); 104 } 105 } 106 107 /** 108 * Enqueue a ActiveMQMessage without checking usage limits 109 * 110 * @param packet 111 * @throws JMSException 112 */ 113 public void enqueueNoBlock(ActiveMQMessage packet) throws JMSException { 114 byte[] data; 115 try { 116 data = wireFormat.toBytes(packet); 117 size.increment(); 118 container.write(data); 119 } 120 catch (IOException e) { 121 JMSException jmsEx = new JMSException("enqueNoBlock failed: " + e.getMessage()); 122 jmsEx.setLinkedException(e); 123 throw jmsEx; 124 } 125 synchronized (outLock) { 126 outLock.notify(); 127 } 128 } 129 130 /** 131 * @return the first dequeued ActiveMQMessage or blocks until one is available 132 * @throws JMSException 133 * @throws InterruptedException 134 */ 135 public ActiveMQMessage dequeue() throws JMSException, InterruptedException { 136 ActiveMQMessage result = null; 137 synchronized (outLock) { 138 while ((result = dequeueNoWait()) == null) { 139 outLock.wait(WAIT_TIMEOUT); 140 } 141 } 142 return result; 143 } 144 145 /** 146 * @return the ActiveMQMessage from the head of the Queue or null if the Queue is empty 147 * @param timeInMillis maximum time to wait to dequeue a ActiveMQMessage 148 * @throws JMSException 149 * @throws InterruptedException 150 */ 151 public ActiveMQMessage dequeue(long timeInMillis) throws JMSException, InterruptedException { 152 ActiveMQMessage result = dequeueNoWait(); 153 if (result == null) { 154 synchronized (outLock) { 155 outLock.wait(timeInMillis); 156 result = dequeueNoWait(); 157 } 158 } 159 return result; 160 } 161 162 /** 163 * @return the ActiveMQMessage from the head of the Queue or null if the Queue is empty 164 * @throws JMSException 165 * @throws InterruptedException 166 */ 167 public ActiveMQMessage dequeueNoWait() throws JMSException, InterruptedException { 168 ActiveMQMessage result = null; 169 if (stopped) { 170 synchronized (outLock) { 171 while (stopped && !closed) { 172 outLock.wait(WAIT_TIMEOUT); 173 } 174 } 175 } 176 byte[] data; 177 try { 178 data = container.read(); 179 if (data != null) { 180 result = (ActiveMQMessage)wireFormat.fromBytes(data); 181 size.decrement(); 182 } 183 } 184 catch (IOException e) { 185 JMSException jmsEx = new JMSException("fromBytes failed"); 186 jmsEx.setLinkedException(e); 187 jmsEx.initCause(e); 188 throw jmsEx; 189 } 190 if (result != null && !isFull()) { 191 synchronized (inLock) { 192 inLock.notify(); 193 } 194 } 195 return result; 196 } 197 198 /** 199 * @return true if this queue has reached it's data length limit 200 */ 201 public boolean isFull() { 202 return container.length() >= maxDataLength; 203 } 204 205 /** 206 * close this queue 207 */ 208 public void close() { 209 try { 210 closed = true; 211 container.close(); 212 } 213 catch (IOException ioe) { 214 log.warn("Couldn't close queue", ioe); 215 } 216 } 217 218 /** 219 * @return the name of this BoundedActiveMQMessageQueue 220 */ 221 public String getName() { 222 return name; 223 } 224 225 /** 226 * @return number of ActiveMQMessages held by this queue 227 */ 228 public int size() { 229 return size.get(); 230 } 231 232 /** 233 * @return true if the queue is enabled for dequeing (default = true) 234 */ 235 public boolean isStarted() { 236 return stopped == false; 237 } 238 239 /** 240 * disable dequeueing 241 */ 242 public void stop() { 243 synchronized (outLock) { 244 stopped = true; 245 } 246 } 247 248 /** 249 * enable dequeueing 250 */ 251 public void start() { 252 stopped = false; 253 synchronized (outLock) { 254 outLock.notifyAll(); 255 } 256 synchronized (inLock) { 257 inLock.notifyAll(); 258 } 259 } 260 261 /** 262 * @return true if this queue is empty 263 */ 264 public boolean isEmpty() { 265 return size.get() == 0; 266 } 267 268 /** 269 * clear the queue 270 */ 271 public void clear() { 272 } 273 274 /** 275 * @return a copy of the contents 276 */ 277 public List getContents() { 278 return null; 279 } 280 }