001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.server; 028 import org.opends.messages.Message; 029 030 import static org.opends.server.loggers.ErrorLogger.logError; 031 import static org.opends.messages.ReplicationMessages.*; 032 import static org.opends.server.loggers.debug.DebugLogger.*; 033 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; 034 035 036 import java.io.IOException; 037 038 import org.opends.server.api.DirectoryThread; 039 import org.opends.server.replication.protocol.AckMessage; 040 import org.opends.server.replication.protocol.DoneMessage; 041 import org.opends.server.replication.protocol.EntryMessage; 042 import org.opends.server.replication.protocol.ErrorMessage; 043 import org.opends.server.replication.protocol.ResetGenerationId; 044 import org.opends.server.replication.protocol.InitializeRequestMessage; 045 import org.opends.server.replication.protocol.InitializeTargetMessage; 046 import org.opends.server.replication.protocol.ProtocolSession; 047 import org.opends.server.replication.protocol.ReplicationMessage; 048 import org.opends.server.replication.protocol.UpdateMessage; 049 import org.opends.server.replication.protocol.WindowMessage; 050 import org.opends.server.replication.protocol.WindowProbe; 051 import org.opends.server.replication.protocol.ReplServerInfoMessage; 052 import org.opends.server.replication.protocol.MonitorMessage; 053 import org.opends.server.replication.protocol.MonitorRequestMessage; 054 import org.opends.server.loggers.debug.DebugTracer; 055 056 057 /** 058 * This class implement the part of the replicationServer that is reading 059 * the connection from the LDAP servers to get all the updates that 060 * were done on this replica and forward them to other servers. 061 * 062 * A single thread is dedicated to this work. 063 * It waits in a blocking mode on the connection from the LDAP server 064 * and upon receiving an update puts in into the replicationServer cache 065 * from where the other servers will grab it. 066 */ 067 public class ServerReader extends DirectoryThread 068 { 069 /** 070 * The tracer object for the debug logger. 071 */ 072 private static final DebugTracer TRACER = getTracer(); 073 074 private short serverId; 075 private ProtocolSession session; 076 private ServerHandler handler; 077 private ReplicationServerDomain replicationServerDomain; 078 079 /** 080 * Constructor for the LDAP server reader part of the replicationServer. 081 * 082 * @param session The ProtocolSession from which to read the data. 083 * @param serverId The server ID of the server from which we read messages. 084 * @param handler The server handler for this server reader. 085 * @param replicationServerDomain The ReplicationServerDomain for this server 086 * reader. 087 */ 088 public ServerReader(ProtocolSession session, short serverId, 089 ServerHandler handler, 090 ReplicationServerDomain replicationServerDomain) 091 { 092 super(handler.toString() + " reader"); 093 this.session = session; 094 this.serverId = serverId; 095 this.handler = handler; 096 this.replicationServerDomain = replicationServerDomain; 097 } 098 099 /** 100 * Create a loop that reads changes and hands them off to be processed. 101 */ 102 public void run() 103 { 104 if (debugEnabled()) 105 { 106 TRACER.debugInfo( 107 "In RS " + replicationServerDomain.getReplicationServer(). 108 getMonitorInstanceName() + 109 (handler.isReplicationServer()?" RS ":" LS")+ 110 " reader starting for serverId=" + serverId); 111 } 112 /* 113 * wait on input stream 114 * grab all incoming messages and publish them to the 115 * replicationServerDomain 116 */ 117 try 118 { 119 while (true) 120 { 121 ReplicationMessage msg = session.receive(); 122 123 /* 124 if (debugEnabled()) 125 { 126 TRACER.debugInfo( 127 "In RS " + replicationServerDomain.getReplicationServer(). 128 getMonitorInstanceName() + 129 (handler.isReplicationServer()?" From RS ":" From LS")+ 130 " with serverId=" + serverId + " receives " + msg); 131 } 132 */ 133 if (msg instanceof AckMessage) 134 { 135 AckMessage ack = (AckMessage) msg; 136 handler.checkWindow(); 137 replicationServerDomain.ack(ack, serverId); 138 } 139 else if (msg instanceof UpdateMessage) 140 { 141 // Ignore update received from a replica with 142 // a bad generation ID 143 long referenceGenerationId = 144 replicationServerDomain.getGenerationId(); 145 if ((referenceGenerationId>0) && 146 (referenceGenerationId != handler.getGenerationId())) 147 { 148 logError(ERR_IGNORING_UPDATE_FROM.get( 149 msg.toString(), 150 handler.getMonitorInstanceName())); 151 } 152 else 153 { 154 UpdateMessage update = (UpdateMessage) msg; 155 handler.decAndCheckWindow(); 156 replicationServerDomain.put(update, handler); 157 } 158 } 159 else if (msg instanceof WindowMessage) 160 { 161 WindowMessage windowMsg = (WindowMessage) msg; 162 handler.updateWindow(windowMsg); 163 } 164 else if (msg instanceof InitializeRequestMessage) 165 { 166 InitializeRequestMessage initializeMsg = 167 (InitializeRequestMessage) msg; 168 handler.process(initializeMsg); 169 } 170 else if (msg instanceof InitializeTargetMessage) 171 { 172 InitializeTargetMessage initializeMsg = (InitializeTargetMessage) msg; 173 handler.process(initializeMsg); 174 } 175 else if (msg instanceof EntryMessage) 176 { 177 EntryMessage entryMsg = (EntryMessage) msg; 178 handler.process(entryMsg); 179 } 180 else if (msg instanceof DoneMessage) 181 { 182 DoneMessage doneMsg = (DoneMessage) msg; 183 handler.process(doneMsg); 184 } 185 else if (msg instanceof ErrorMessage) 186 { 187 ErrorMessage errorMsg = (ErrorMessage) msg; 188 handler.process(errorMsg); 189 } 190 else if (msg instanceof ResetGenerationId) 191 { 192 ResetGenerationId genIdMsg = (ResetGenerationId) msg; 193 replicationServerDomain.resetGenerationId(this.handler, genIdMsg); 194 } 195 else if (msg instanceof WindowProbe) 196 { 197 WindowProbe windowProbeMsg = (WindowProbe) msg; 198 handler.process(windowProbeMsg); 199 } 200 else if (msg instanceof ReplServerInfoMessage) 201 { 202 ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg; 203 handler.receiveReplServerInfo(infoMsg); 204 replicationServerDomain.receiveReplServerInfo(infoMsg, handler); 205 } 206 else if (msg instanceof MonitorRequestMessage) 207 { 208 MonitorRequestMessage replServerMonitorRequestMsg = 209 (MonitorRequestMessage) msg; 210 handler.process(replServerMonitorRequestMsg); 211 } 212 else if (msg instanceof MonitorMessage) 213 { 214 MonitorMessage replServerMonitorMsg = (MonitorMessage) msg; 215 handler.process(replServerMonitorMsg); 216 } 217 else if (msg == null) 218 { 219 /* 220 * The remote server has sent an unknown message, 221 * close the conenction. 222 */ 223 Message message = NOTE_READER_NULL_MSG.get(handler.toString()); 224 logError(message); 225 return; 226 } 227 } 228 } catch (IOException e) 229 { 230 /* 231 * The connection has been broken 232 * Log a message and exit from this loop 233 * So that this handler is stopped. 234 */ 235 if (debugEnabled()) 236 TRACER.debugInfo( 237 "In RS " + replicationServerDomain.getReplicationServer(). 238 getMonitorInstanceName() + 239 " reader IO EXCEPTION for serverID=" + serverId 240 + stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage()); 241 Message message = NOTE_SERVER_DISCONNECT.get(handler.toString()); 242 logError(message); 243 } catch (ClassNotFoundException e) 244 { 245 if (debugEnabled()) 246 TRACER.debugInfo( 247 "In RS <" + replicationServerDomain.getReplicationServer(). 248 getMonitorInstanceName() + 249 " reader CNF EXCEPTION serverID=" + serverId 250 + stackTraceToSingleLineString(e)); 251 /* 252 * The remote server has sent an unknown message, 253 * close the connection. 254 */ 255 Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString()); 256 logError(message); 257 } catch (Exception e) 258 { 259 if (debugEnabled()) 260 TRACER.debugInfo( 261 "In RS <" + replicationServerDomain.getReplicationServer(). 262 getMonitorInstanceName() + 263 " server reader EXCEPTION serverID=" + serverId 264 + stackTraceToSingleLineString(e)); 265 /* 266 * The remote server has sent an unknown message, 267 * close the connection. 268 */ 269 Message message = NOTE_READER_EXCEPTION.get(handler.toString()); 270 logError(message); 271 } 272 finally 273 { 274 /* 275 * The thread only exit the loop above is some error condition 276 * happen. 277 * Attempt to close the socket and stop the server handler. 278 */ 279 if (debugEnabled()) 280 TRACER.debugInfo( 281 "In RS " + replicationServerDomain.getReplicationServer(). 282 getMonitorInstanceName() + 283 " server reader for serverID=" + serverId + 284 " is closing the session"); 285 try 286 { 287 session.close(); 288 } catch (IOException e) 289 { 290 // ignore 291 } 292 replicationServerDomain.stopServer(handler); 293 } 294 if (debugEnabled()) 295 TRACER.debugInfo( 296 "In RS " + replicationServerDomain.getReplicationServer(). 297 getMonitorInstanceName() + 298 (handler.isReplicationServer()?" RS":" LDAP") + 299 " server reader stopped for serverID=" + serverId); 300 } 301 }