001 package org.activemq.ra; 002 003 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 004 005 /** 006 */ 007 public class CircularQueue { 008 009 private final int size; 010 011 private final SynchronizedBoolean stopping; 012 013 014 // For pooling objects 015 private final Object[] contents; 016 final private Object mutex = new Object(); 017 //where the next worker to be supplied currently is. 018 private int start=0; 019 //where the next worker to be inserted will go 020 private int end=0; 021 022 public CircularQueue(int size, SynchronizedBoolean stopping) { 023 this.size = size; 024 contents = new Object[size]; 025 this.stopping = stopping; 026 } 027 028 public Object get() { 029 synchronized(mutex) { 030 while( true ) { 031 Object ew = contents[start]; 032 if (ew != null) { 033 start++; 034 if(start == contents.length) { 035 start=0; 036 } 037 return ew; 038 } else { 039 try { 040 mutex.wait(); 041 if(stopping.get()) { 042 return null; 043 } 044 } catch (InterruptedException e) { 045 return null; 046 } 047 } 048 } 049 } 050 } 051 052 public void returnObject(Object worker) { 053 synchronized(mutex) { 054 contents[end++] = worker; 055 if( end == contents.length) { 056 end=0; 057 } 058 mutex.notify(); 059 } 060 } 061 062 public int size() { 063 return contents.length; 064 } 065 066 public void drain() { 067 int i = 0; 068 while (i < size) { 069 if (get() != null) { 070 i++; 071 } 072 } 073 } 074 075 076 public void notifyWaiting() { 077 synchronized(mutex) { 078 mutex.notifyAll(); 079 } 080 } 081 082 }