001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.protocol;
028    
029    import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
030    import static org.opends.server.loggers.debug.DebugLogger.getTracer;
031    
032    import java.io.IOException;
033    import java.io.InputStream;
034    import java.io.OutputStream;
035    import java.net.Socket;
036    import java.net.SocketException;
037    import java.util.zip.DataFormatException;
038    
039    import org.opends.server.loggers.debug.DebugTracer;
040    
041    import javax.net.ssl.SSLSocket;
042    
043    /**
044     * This class implements a protocol session using TLS.
045     */
046    public class TLSSocketSession implements ProtocolSession
047    {
048      /**
049       * The tracer object for the debug logger.
050       */
051      private static final DebugTracer TRACER = getTracer();
052    
053      private Socket plainSocket;
054      private SSLSocket secureSocket;
055      private InputStream input;
056      private OutputStream output;
057      private InputStream plainInput;
058      private OutputStream plainOutput;
059      byte[] rcvLengthBuf = new byte[8];
060    
061      /**
062       * The time the last message published to this session.
063       */
064      private long lastPublishTime = 0;
065    
066    
067      /**
068       * The time the last message was received on this session.
069       */
070      private long lastReceiveTime = 0;
071    
072    
073      /**
074       * Creates a new TLSSocketSession.
075       *
076       * @param socket       The regular Socket on which the SocketSession will be
077       *                     based.
078       * @param secureSocket The secure Socket on which the SocketSession will be
079       *                     based.
080       * @throws IOException When an IException happens on the socket.
081       */
082      public TLSSocketSession(Socket socket, SSLSocket secureSocket)
083           throws IOException
084      {
085        plainSocket = socket;
086        this.secureSocket = secureSocket;
087        plainInput = plainSocket.getInputStream();
088        plainOutput = plainSocket.getOutputStream();
089        input = secureSocket.getInputStream();
090        output = secureSocket.getOutputStream();
091      }
092    
093    
094      /**
095       * {@inheritDoc}
096       */
097      public void close() throws IOException
098      {
099        if (debugEnabled())
100        {
101          TRACER.debugInfo("Closing SocketSession." +
102              Thread.currentThread().getStackTrace());
103        }
104        if (plainSocket != null && !plainSocket.isClosed())
105        {
106          plainSocket.close();
107        }
108        if (secureSocket != null && !secureSocket.isClosed())
109        {
110          secureSocket.close();
111        }
112      }
113    
114      /**
115       * {@inheritDoc}
116       */
117      public synchronized void publish(ReplicationMessage msg)
118             throws IOException
119      {
120        byte[] buffer = msg.getBytes();
121        String str = String.format("%08x", buffer.length);
122        byte[] sendLengthBuf = str.getBytes();
123    
124        output.write(sendLengthBuf);
125        output.write(buffer);
126        output.flush();
127    
128        lastPublishTime = System.currentTimeMillis();
129      }
130    
131      /**
132       * {@inheritDoc}
133       */
134      public ReplicationMessage receive() throws IOException,
135          ClassNotFoundException, DataFormatException
136      {
137        /* Read the first 8 bytes containing the packet length */
138        int length = 0;
139    
140        /* Let's start the stop-watch before waiting on read */
141        /* for the heartbeat check to be operationnal        */
142        lastReceiveTime = System.currentTimeMillis();
143    
144        while (length<8)
145        {
146          int read = input.read(rcvLengthBuf, length, 8-length);
147          if (read == -1)
148          {
149            lastReceiveTime=0;
150            throw new IOException("no more data");
151          }
152          else
153          {
154            length += read;
155          }
156        }
157    
158        int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
159    
160        try
161        {
162          length = 0;
163          byte[] buffer = new byte[totalLength];
164          while (length < totalLength)
165          {
166            length += input.read(buffer, length, totalLength - length);
167          }
168          /* We do not want the heartbeat to close the session when */
169          /* we are processing a message even a time consuming one. */
170          lastReceiveTime=0;
171          return ReplicationMessage.generateMsg(buffer);
172        }
173        catch (OutOfMemoryError e)
174        {
175          throw new IOException("Packet too large, can't allocate "
176                                + totalLength + " bytes.");
177        }
178      }
179    
180      /**
181       * {@inheritDoc}
182       */
183      public void stopEncryption()
184      {
185        input = plainInput;
186        output = plainOutput;
187      }
188    
189      /**
190       * {@inheritDoc}
191       */
192      public boolean isEncrypted()
193      {
194        return !(input == plainInput);
195      }
196    
197      /**
198       * {@inheritDoc}
199       */
200      public long getLastPublishTime()
201      {
202        return lastPublishTime;
203      }
204    
205      /**
206       * {@inheritDoc}
207       */
208      public long getLastReceiveTime()
209      {
210        if (lastReceiveTime==0)
211        {
212          return System.currentTimeMillis();
213        }
214        return lastReceiveTime;
215      }
216    
217      /**
218       * {@inheritDoc}
219       */
220      public String getRemoteAddress()
221      {
222        return plainSocket.getInetAddress().getHostAddress();
223      }
224    
225      /**
226       * {@inheritDoc}
227       */
228      public void setSoTimeout(int timeout) throws SocketException
229      {
230        plainSocket.setSoTimeout(timeout);
231      }
232    }