001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.server;
028    import org.opends.messages.Message;
029    
030    import static org.opends.server.loggers.ErrorLogger.logError;
031    import static org.opends.messages.ReplicationMessages.*;
032    import static org.opends.server.loggers.debug.DebugLogger.*;
033    import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
034    
035    
036    import java.io.IOException;
037    
038    import org.opends.server.api.DirectoryThread;
039    import org.opends.server.replication.protocol.AckMessage;
040    import org.opends.server.replication.protocol.DoneMessage;
041    import org.opends.server.replication.protocol.EntryMessage;
042    import org.opends.server.replication.protocol.ErrorMessage;
043    import org.opends.server.replication.protocol.ResetGenerationId;
044    import org.opends.server.replication.protocol.InitializeRequestMessage;
045    import org.opends.server.replication.protocol.InitializeTargetMessage;
046    import org.opends.server.replication.protocol.ProtocolSession;
047    import org.opends.server.replication.protocol.ReplicationMessage;
048    import org.opends.server.replication.protocol.UpdateMessage;
049    import org.opends.server.replication.protocol.WindowMessage;
050    import org.opends.server.replication.protocol.WindowProbe;
051    import org.opends.server.replication.protocol.ReplServerInfoMessage;
052    import org.opends.server.replication.protocol.MonitorMessage;
053    import org.opends.server.replication.protocol.MonitorRequestMessage;
054    import org.opends.server.loggers.debug.DebugTracer;
055    
056    
057    /**
058     * This class implement the part of the replicationServer that is reading
059     * the connection from the LDAP servers to get all the updates that
060     * were done on this replica and forward them to other servers.
061     *
062     * A single thread is dedicated to this work.
063     * It waits in a blocking mode on the connection from the LDAP server
064     * and upon receiving an update puts in into the replicationServer cache
065     * from where the other servers will grab it.
066     */
067    public class ServerReader extends DirectoryThread
068    {
069      /**
070       * The tracer object for the debug logger.
071       */
072      private static final DebugTracer TRACER = getTracer();
073    
074      private short serverId;
075      private ProtocolSession session;
076      private ServerHandler handler;
077      private ReplicationServerDomain replicationServerDomain;
078    
079      /**
080       * Constructor for the LDAP server reader part of the replicationServer.
081       *
082       * @param session The ProtocolSession from which to read the data.
083       * @param serverId The server ID of the server from which we read messages.
084       * @param handler The server handler for this server reader.
085       * @param replicationServerDomain The ReplicationServerDomain for this server
086       *        reader.
087       */
088      public ServerReader(ProtocolSession session, short serverId,
089                          ServerHandler handler,
090                          ReplicationServerDomain replicationServerDomain)
091      {
092        super(handler.toString() + " reader");
093        this.session = session;
094        this.serverId = serverId;
095        this.handler = handler;
096        this.replicationServerDomain = replicationServerDomain;
097      }
098    
099      /**
100       * Create a loop that reads changes and hands them off to be processed.
101       */
102      public void run()
103      {
104        if (debugEnabled())
105        {
106          TRACER.debugInfo(
107              "In RS " + replicationServerDomain.getReplicationServer().
108              getMonitorInstanceName() +
109              (handler.isReplicationServer()?" RS ":" LS")+
110              " reader starting for serverId=" + serverId);
111        }
112        /*
113         * wait on input stream
114         * grab all incoming messages and publish them to the
115         * replicationServerDomain
116         */
117        try
118        {
119          while (true)
120          {
121            ReplicationMessage msg = session.receive();
122    
123            /*
124            if (debugEnabled())
125            {
126              TRACER.debugInfo(
127                  "In RS " + replicationServerDomain.getReplicationServer().
128                  getMonitorInstanceName() +
129                  (handler.isReplicationServer()?" From RS ":" From LS")+
130                  " with serverId=" + serverId + " receives " + msg);
131            }
132            */
133            if (msg instanceof AckMessage)
134            {
135              AckMessage ack = (AckMessage) msg;
136              handler.checkWindow();
137              replicationServerDomain.ack(ack, serverId);
138            }
139            else if (msg instanceof UpdateMessage)
140            {
141              // Ignore update received from a replica with
142              // a bad generation ID
143              long referenceGenerationId =
144                      replicationServerDomain.getGenerationId();
145              if ((referenceGenerationId>0) &&
146                  (referenceGenerationId != handler.getGenerationId()))
147              {
148                logError(ERR_IGNORING_UPDATE_FROM.get(
149                    msg.toString(),
150                    handler.getMonitorInstanceName()));
151              }
152              else
153              {
154                UpdateMessage update = (UpdateMessage) msg;
155                handler.decAndCheckWindow();
156                replicationServerDomain.put(update, handler);
157              }
158            }
159            else if (msg instanceof WindowMessage)
160            {
161              WindowMessage windowMsg = (WindowMessage) msg;
162              handler.updateWindow(windowMsg);
163            }
164            else if (msg instanceof InitializeRequestMessage)
165            {
166              InitializeRequestMessage initializeMsg =
167                (InitializeRequestMessage) msg;
168              handler.process(initializeMsg);
169            }
170            else if (msg instanceof InitializeTargetMessage)
171            {
172              InitializeTargetMessage initializeMsg = (InitializeTargetMessage) msg;
173              handler.process(initializeMsg);
174            }
175            else if (msg instanceof EntryMessage)
176            {
177              EntryMessage entryMsg = (EntryMessage) msg;
178              handler.process(entryMsg);
179            }
180            else if (msg instanceof DoneMessage)
181            {
182              DoneMessage doneMsg = (DoneMessage) msg;
183              handler.process(doneMsg);
184            }
185            else if (msg instanceof ErrorMessage)
186            {
187              ErrorMessage errorMsg = (ErrorMessage) msg;
188              handler.process(errorMsg);
189            }
190            else if (msg instanceof ResetGenerationId)
191            {
192              ResetGenerationId genIdMsg = (ResetGenerationId) msg;
193              replicationServerDomain.resetGenerationId(this.handler, genIdMsg);
194            }
195            else if (msg instanceof WindowProbe)
196            {
197              WindowProbe windowProbeMsg = (WindowProbe) msg;
198              handler.process(windowProbeMsg);
199            }
200            else if (msg instanceof ReplServerInfoMessage)
201            {
202              ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
203              handler.receiveReplServerInfo(infoMsg);
204              replicationServerDomain.receiveReplServerInfo(infoMsg, handler);
205            }
206            else if (msg instanceof MonitorRequestMessage)
207            {
208              MonitorRequestMessage replServerMonitorRequestMsg =
209                (MonitorRequestMessage) msg;
210              handler.process(replServerMonitorRequestMsg);
211            }
212            else if (msg instanceof MonitorMessage)
213            {
214              MonitorMessage replServerMonitorMsg = (MonitorMessage) msg;
215              handler.process(replServerMonitorMsg);
216            }
217            else if (msg == null)
218            {
219              /*
220               * The remote server has sent an unknown message,
221               * close the conenction.
222               */
223              Message message = NOTE_READER_NULL_MSG.get(handler.toString());
224              logError(message);
225              return;
226            }
227          }
228        } catch (IOException e)
229        {
230          /*
231           * The connection has been broken
232           * Log a message and exit from this loop
233           * So that this handler is stopped.
234           */
235          if (debugEnabled())
236            TRACER.debugInfo(
237              "In RS " + replicationServerDomain.getReplicationServer().
238              getMonitorInstanceName() +
239              " reader IO EXCEPTION for serverID=" + serverId
240              + stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
241          Message message = NOTE_SERVER_DISCONNECT.get(handler.toString());
242          logError(message);
243        } catch (ClassNotFoundException e)
244        {
245          if (debugEnabled())
246            TRACER.debugInfo(
247              "In RS <" + replicationServerDomain.getReplicationServer().
248              getMonitorInstanceName() +
249              " reader CNF EXCEPTION serverID=" + serverId
250              + stackTraceToSingleLineString(e));
251          /*
252           * The remote server has sent an unknown message,
253           * close the connection.
254           */
255          Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString());
256          logError(message);
257        } catch (Exception e)
258        {
259          if (debugEnabled())
260            TRACER.debugInfo(
261              "In RS <" + replicationServerDomain.getReplicationServer().
262              getMonitorInstanceName() +
263              " server reader EXCEPTION serverID=" + serverId
264              + stackTraceToSingleLineString(e));
265          /*
266           * The remote server has sent an unknown message,
267           * close the connection.
268           */
269          Message message = NOTE_READER_EXCEPTION.get(handler.toString());
270          logError(message);
271        }
272        finally
273        {
274          /*
275           * The thread only exit the loop above is some error condition
276           * happen.
277           * Attempt to close the socket and stop the server handler.
278           */
279          if (debugEnabled())
280            TRACER.debugInfo(
281              "In RS " + replicationServerDomain.getReplicationServer().
282              getMonitorInstanceName() +
283              " server reader for serverID=" + serverId +
284              " is closing the session");
285          try
286          {
287            session.close();
288          } catch (IOException e)
289          {
290           // ignore
291          }
292          replicationServerDomain.stopServer(handler);
293        }
294        if (debugEnabled())
295          TRACER.debugInfo(
296              "In RS " + replicationServerDomain.getReplicationServer().
297              getMonitorInstanceName() +
298              (handler.isReplicationServer()?" RS":" LDAP") +
299              " server reader stopped for serverID=" + serverId);
300      }
301    }