001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.protocol;
028    
029    import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
030    import static org.opends.server.loggers.debug.DebugLogger.getTracer;
031    
032    import java.io.IOException;
033    import java.io.InputStream;
034    import java.io.OutputStream;
035    import java.net.Socket;
036    import java.net.SocketException;
037    import java.util.zip.DataFormatException;
038    
039    import org.opends.server.loggers.debug.DebugTracer;
040    import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
041    
042    /**
043     * This class Implement a protocol session using a basic socket and relying on
044     * the innate encoding/decoding capabilities of the ReplicationMessage
045     * by using the getBytes() and generateMsg() methods of those classes.
046     */
047    public class SocketSession implements ProtocolSession
048    {
049      /**
050       * The tracer object for the debug logger.
051       */
052      private static final DebugTracer TRACER = getTracer();
053    
054      private Socket socket;
055      private InputStream input;
056      private OutputStream output;
057      byte[] rcvLengthBuf = new byte[8];
058    
059      /**
060       * The time the last message published to this session.
061       */
062      private long lastPublishTime = 0;
063    
064    
065      /**
066       * The time the last message was received on this session.
067       */
068      private long lastReceiveTime = 0;
069    
070    
071      /**
072       * Creates a new SocketSession based on the provided socket.
073       *
074       * @param socket The Socket on which the SocketSession will be based.
075       * @throws IOException When an IException happens on the socket.
076       */
077      public SocketSession(Socket socket) throws IOException
078      {
079        this.socket = socket;
080        /*
081         * Use a window instead of the TCP flow control.
082         * Therefore set a very large value for send and receive buffer sizes.
083         */
084        input = socket.getInputStream();
085        output = socket.getOutputStream();
086      }
087    
088      /**
089       * {@inheritDoc}
090       */
091      public void close() throws IOException
092      {
093        if (debugEnabled())
094        {
095          TRACER.debugInfo("Closing SocketSession."
096              + stackTraceToSingleLineString(new Exception()));
097        }
098        socket.close();
099      }
100    
101      /**
102       * {@inheritDoc}
103       */
104      public synchronized void publish(ReplicationMessage msg)
105             throws IOException
106      {
107        byte[] buffer = msg.getBytes();
108        String str = String.format("%08x", buffer.length);
109    
110        if (debugEnabled())
111        {
112          TRACER.debugInfo("SocketSession publish <" + str + ">");
113        }
114    
115        byte[] sendLengthBuf = str.getBytes();
116    
117        output.write(sendLengthBuf);
118        output.write(buffer);
119        output.flush();
120    
121        lastPublishTime = System.currentTimeMillis();
122      }
123    
124      /**
125       * {@inheritDoc}
126       */
127      public ReplicationMessage receive() throws IOException,
128          ClassNotFoundException, DataFormatException
129      {
130        /* Read the first 8 bytes containing the packet length */
131        int length = 0;
132    
133        /* Let's start the stop-watch before waiting on read */
134        /* for the heartbeat check to be operationnal        */
135        lastReceiveTime = System.currentTimeMillis();
136    
137        while (length<8)
138        {
139          int read = input.read(rcvLengthBuf, length, 8-length);
140          if (read == -1)
141          {
142            lastReceiveTime=0;
143            throw new IOException("no more data");
144          }
145          else
146          {
147            length += read;
148          }
149        }
150    
151        int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
152    
153        try
154        {
155          length = 0;
156          byte[] buffer = new byte[totalLength];
157          while (length < totalLength)
158          {
159            length += input.read(buffer, length, totalLength - length);
160          }
161          /* We do not want the heartbeat to close the session when */
162          /* we are processing a message even a time consuming one. */
163          lastReceiveTime=0;
164          return ReplicationMessage.generateMsg(buffer);
165        }
166        catch (OutOfMemoryError e)
167        {
168          throw new IOException("Packet too large, can't allocate "
169                                + totalLength + " bytes.");
170        }
171      }
172    
173      /**
174       * {@inheritDoc}
175       */
176      public void stopEncryption()
177      {
178        // There is no security layer.
179      }
180    
181      /**
182       * {@inheritDoc}
183       */
184      public boolean isEncrypted()
185      {
186        return false;
187      }
188    
189      /**
190       * {@inheritDoc}
191       */
192      public long getLastPublishTime()
193      {
194        return lastPublishTime;
195      }
196    
197      /**
198       * {@inheritDoc}
199       */
200      public long getLastReceiveTime()
201      {
202        if (lastReceiveTime==0)
203        {
204          return System.currentTimeMillis();
205        }
206        return lastReceiveTime;
207      }
208    
209      /**
210       * {@inheritDoc}
211       */
212      public String getRemoteAddress()
213      {
214        return socket.getInetAddress().getHostAddress();
215      }
216    
217      /**
218       * {@inheritDoc}
219       */
220      public void setSoTimeout(int timeout) throws SocketException
221      {
222        socket.setSoTimeout(timeout);
223      }
224    }