001 /** 002 * 003 * Copyright 2004 Hiram Chirino 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.activemq.transport.activeio; 018 019 import java.io.IOException; 020 021 import org.activeio.Packet; 022 import org.activeio.PacketData; 023 import org.activeio.packet.AppendedPacket; 024 import org.activeio.packet.EOSPacket; 025 026 /** 027 * @version $Revision: 1.1.1.1 $ 028 */ 029 abstract public class PacketAggregator { 030 031 private static final int HEADER_LENGTH = 5; 032 033 Packet incompleteUpPacket; 034 boolean headerLoaded; 035 private int upPacketLength; 036 037 public void addRawPacket(Packet packet) throws IOException { 038 039 // Passthrough the EOS packet. 040 if( packet == EOSPacket.EOS_PACKET ) { 041 packetAssembled(packet); 042 return; 043 } 044 045 if (incompleteUpPacket != null) { 046 packet = AppendedPacket.join(incompleteUpPacket, packet); 047 incompleteUpPacket = null; 048 } 049 050 while (true) { 051 052 if (!headerLoaded) { 053 headerLoaded = packet.remaining() >= HEADER_LENGTH; 054 if( headerLoaded ) { 055 PacketData data = new PacketData(packet.duplicate()); 056 data.readByte(); 057 upPacketLength = data.readInt(); 058 if( upPacketLength < 0 ) { 059 throw new IOException("Up packet lenth was invalid: "+upPacketLength); 060 } 061 upPacketLength+=HEADER_LENGTH; 062 } 063 if( !headerLoaded ) 064 break; 065 } 066 067 if (packet.remaining() < upPacketLength ) 068 break; 069 070 // Get ready to create a slice to send up. 071 int origLimit = packet.limit(); 072 packet.limit(upPacketLength); 073 packetAssembled(packet.slice()); 074 075 // Get a slice of the remaining since that will dump 076 // the first packets of an AppendedPacket 077 packet.position(upPacketLength); 078 packet.limit(origLimit); 079 packet = packet.slice(); 080 081 // Need to load a header again now. 082 headerLoaded = false; 083 } 084 if (packet.hasRemaining()) { 085 incompleteUpPacket = packet; 086 } 087 088 } 089 090 protected abstract void packetAssembled(Packet packet); 091 }