001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.server;
028    import org.opends.messages.Message;
029    
030    import static org.opends.server.loggers.ErrorLogger.logError;
031    import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
032    import static org.opends.server.loggers.debug.DebugLogger.getTracer;
033    import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
034    import static org.opends.messages.ReplicationMessages.*;
035    
036    import java.io.IOException;
037    import java.net.SocketException;
038    import java.util.NoSuchElementException;
039    
040    import org.opends.server.api.DirectoryThread;
041    import org.opends.server.loggers.debug.DebugTracer;
042    import org.opends.server.replication.protocol.ProtocolSession;
043    import org.opends.server.replication.protocol.UpdateMessage;
044    
045    
046    /**
047     * This class defines a server writer, which is used to send changes to a
048     * directory server.
049     */
050    public class ServerWriter extends DirectoryThread
051    {
052      /**
053       * The tracer object for the debug logger.
054       */
055      private static final DebugTracer TRACER = getTracer();
056    
057      private ProtocolSession session;
058      private ServerHandler handler;
059      private ReplicationServerDomain replicationServerDomain;
060      private short serverId;
061    
062      /**
063       * Create a ServerWriter.
064       * Then ServerWriter then waits on the ServerHandler for new updates
065       * and forward them to the server
066       *
067       * @param session the ProtocolSession that will be used to send updates.
068       * @param serverId the Identifier of the server.
069       * @param handler handler for which the ServerWriter is created.
070       * @param replicationServerDomain The ReplicationServerDomain of this
071       *        ServerWriter.
072       */
073      public ServerWriter(ProtocolSession session, short serverId,
074                          ServerHandler handler,
075                          ReplicationServerDomain replicationServerDomain)
076      {
077        super(handler.toString() + " writer");
078    
079        this.serverId = serverId;
080        this.session = session;
081        this.handler = handler;
082        this.replicationServerDomain = replicationServerDomain;
083      }
084    
085      /**
086       * Run method for the ServerWriter.
087       * Loops waiting for changes from the ReplicationServerDomain and forward them
088       * to the other servers
089       */
090      public void run()
091      {
092        if (debugEnabled())
093        {
094          if (handler.isReplicationServer())
095          {
096            TRACER.debugInfo("Replication server writer starting " + serverId);
097          }
098          else
099          {
100            TRACER.debugInfo("LDAP server writer starting " + serverId);
101          }
102        }
103        try
104        {
105          while (true)
106          {
107            UpdateMessage update = replicationServerDomain.take(this.handler);
108            if (update == null)
109              return;       /* this connection is closing */
110    
111            // Ignore update to be sent to a replica with a bad generation ID
112            long referenceGenerationId = replicationServerDomain.getGenerationId();
113            if ((referenceGenerationId != handler.getGenerationId())
114                || (referenceGenerationId == -1)
115                || (handler.getGenerationId() == -1))
116            {
117              logError(ERR_IGNORING_UPDATE_TO.get(
118                  update.getDn(),
119                  this.handler.getMonitorInstanceName()));
120              continue;
121            }
122    
123            /*
124            if (debugEnabled())
125            {
126              TRACER.debugInfo(
127                "In " + replicationServerDomain.getReplicationServer().
128                  getMonitorInstanceName() +
129                ", writer to " + this.handler.getMonitorInstanceName() +
130                " publishes msg=[" + update.toString() + "]"+
131                " refgenId=" + referenceGenerationId +
132                " server=" + handler.getServerId() +
133                " generationId=" + handler.getGenerationId());
134            }
135            */
136            session.publish(update);
137          }
138        }
139        catch (NoSuchElementException e)
140        {
141          /*
142           * The remote host has disconnected and this particular Tree is going to
143           * be removed, just ignore the exception and let the thread die as well
144           */
145          Message message = NOTE_SERVER_DISCONNECT.get(handler.toString());
146          logError(message);
147        }
148        catch (SocketException e)
149        {
150          /*
151           * The remote host has disconnected and this particular Tree is going to
152           * be removed, just ignore the exception and let the thread die as well
153           */
154          Message message = NOTE_SERVER_DISCONNECT.get(handler.toString());
155          logError(message);
156        }
157        catch (Exception e)
158        {
159          /*
160           * An unexpected error happened.
161           * Log an error and close the connection.
162           */
163          Message message = ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler.toString() +
164                            " " +  stackTraceToSingleLineString(e));
165          logError(message);
166        }
167        finally {
168          try
169          {
170            session.close();
171          } catch (IOException e)
172          {
173           // Can't do much more : ignore
174          }
175          replicationServerDomain.stopServer(handler);
176    
177          if (debugEnabled())
178          {
179            if (handler.isReplicationServer())
180            {
181              TRACER.debugInfo("Replication server writer stopping " + serverId);
182            }
183            else
184            {
185              TRACER.debugInfo("LDAP server writer stopping " + serverId);
186            }
187          }
188        }
189      }
190    }