001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.protocol; 028 029 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; 030 import static org.opends.server.loggers.debug.DebugLogger.getTracer; 031 032 import java.io.IOException; 033 import java.io.InputStream; 034 import java.io.OutputStream; 035 import java.net.Socket; 036 import java.net.SocketException; 037 import java.util.zip.DataFormatException; 038 039 import org.opends.server.loggers.debug.DebugTracer; 040 041 import javax.net.ssl.SSLSocket; 042 043 /** 044 * This class implements a protocol session using TLS. 045 */ 046 public class TLSSocketSession implements ProtocolSession 047 { 048 /** 049 * The tracer object for the debug logger. 050 */ 051 private static final DebugTracer TRACER = getTracer(); 052 053 private Socket plainSocket; 054 private SSLSocket secureSocket; 055 private InputStream input; 056 private OutputStream output; 057 private InputStream plainInput; 058 private OutputStream plainOutput; 059 byte[] rcvLengthBuf = new byte[8]; 060 061 /** 062 * The time the last message published to this session. 063 */ 064 private long lastPublishTime = 0; 065 066 067 /** 068 * The time the last message was received on this session. 069 */ 070 private long lastReceiveTime = 0; 071 072 073 /** 074 * Creates a new TLSSocketSession. 075 * 076 * @param socket The regular Socket on which the SocketSession will be 077 * based. 078 * @param secureSocket The secure Socket on which the SocketSession will be 079 * based. 080 * @throws IOException When an IException happens on the socket. 081 */ 082 public TLSSocketSession(Socket socket, SSLSocket secureSocket) 083 throws IOException 084 { 085 plainSocket = socket; 086 this.secureSocket = secureSocket; 087 plainInput = plainSocket.getInputStream(); 088 plainOutput = plainSocket.getOutputStream(); 089 input = secureSocket.getInputStream(); 090 output = secureSocket.getOutputStream(); 091 } 092 093 094 /** 095 * {@inheritDoc} 096 */ 097 public void close() throws IOException 098 { 099 if (debugEnabled()) 100 { 101 TRACER.debugInfo("Closing SocketSession." + 102 Thread.currentThread().getStackTrace()); 103 } 104 if (plainSocket != null && !plainSocket.isClosed()) 105 { 106 plainSocket.close(); 107 } 108 if (secureSocket != null && !secureSocket.isClosed()) 109 { 110 secureSocket.close(); 111 } 112 } 113 114 /** 115 * {@inheritDoc} 116 */ 117 public synchronized void publish(ReplicationMessage msg) 118 throws IOException 119 { 120 byte[] buffer = msg.getBytes(); 121 String str = String.format("%08x", buffer.length); 122 byte[] sendLengthBuf = str.getBytes(); 123 124 output.write(sendLengthBuf); 125 output.write(buffer); 126 output.flush(); 127 128 lastPublishTime = System.currentTimeMillis(); 129 } 130 131 /** 132 * {@inheritDoc} 133 */ 134 public ReplicationMessage receive() throws IOException, 135 ClassNotFoundException, DataFormatException 136 { 137 /* Read the first 8 bytes containing the packet length */ 138 int length = 0; 139 140 /* Let's start the stop-watch before waiting on read */ 141 /* for the heartbeat check to be operationnal */ 142 lastReceiveTime = System.currentTimeMillis(); 143 144 while (length<8) 145 { 146 int read = input.read(rcvLengthBuf, length, 8-length); 147 if (read == -1) 148 { 149 lastReceiveTime=0; 150 throw new IOException("no more data"); 151 } 152 else 153 { 154 length += read; 155 } 156 } 157 158 int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16); 159 160 try 161 { 162 length = 0; 163 byte[] buffer = new byte[totalLength]; 164 while (length < totalLength) 165 { 166 length += input.read(buffer, length, totalLength - length); 167 } 168 /* We do not want the heartbeat to close the session when */ 169 /* we are processing a message even a time consuming one. */ 170 lastReceiveTime=0; 171 return ReplicationMessage.generateMsg(buffer); 172 } 173 catch (OutOfMemoryError e) 174 { 175 throw new IOException("Packet too large, can't allocate " 176 + totalLength + " bytes."); 177 } 178 } 179 180 /** 181 * {@inheritDoc} 182 */ 183 public void stopEncryption() 184 { 185 input = plainInput; 186 output = plainOutput; 187 } 188 189 /** 190 * {@inheritDoc} 191 */ 192 public boolean isEncrypted() 193 { 194 return !(input == plainInput); 195 } 196 197 /** 198 * {@inheritDoc} 199 */ 200 public long getLastPublishTime() 201 { 202 return lastPublishTime; 203 } 204 205 /** 206 * {@inheritDoc} 207 */ 208 public long getLastReceiveTime() 209 { 210 if (lastReceiveTime==0) 211 { 212 return System.currentTimeMillis(); 213 } 214 return lastReceiveTime; 215 } 216 217 /** 218 * {@inheritDoc} 219 */ 220 public String getRemoteAddress() 221 { 222 return plainSocket.getInetAddress().getHostAddress(); 223 } 224 225 /** 226 * {@inheritDoc} 227 */ 228 public void setSoTimeout(int timeout) throws SocketException 229 { 230 plainSocket.setSoTimeout(timeout); 231 } 232 }