001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.plugin; 028 import java.util.concurrent.LinkedBlockingQueue; 029 import java.util.concurrent.TimeUnit; 030 import org.opends.messages.Message; 031 032 import static org.opends.server.loggers.ErrorLogger.logError; 033 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; 034 import static org.opends.server.loggers.debug.DebugLogger.getTracer; 035 import static org.opends.messages.ReplicationMessages.*; 036 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; 037 038 import org.opends.server.api.DirectoryThread; 039 import org.opends.server.loggers.debug.DebugTracer; 040 import org.opends.server.replication.protocol.UpdateMessage; 041 042 /** 043 * Thread that is used to get messages from the Replication servers 044 * and replay them in the current server. 045 */ 046 public class ListenerThread extends DirectoryThread 047 { 048 /** 049 * The tracer object for the debug logger. 050 */ 051 private static final DebugTracer TRACER = getTracer(); 052 053 private ReplicationDomain repDomain; 054 private boolean shutdown = false; 055 private boolean done = false; 056 private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue; 057 058 059 /** 060 * Constructor for the ListenerThread. 061 * 062 * @param repDomain the replication domain that created this thread 063 * @param updateToReplayQueue The update messages queue we must 064 * store messages in 065 */ 066 public ListenerThread(ReplicationDomain repDomain, 067 LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue) 068 { 069 super("Replication Listener thread " + 070 "serverID=" + repDomain.serverId + 071 " domain=" + repDomain.getName()); 072 this.repDomain = repDomain; 073 this.updateToReplayQueue = updateToReplayQueue; 074 } 075 076 /** 077 * Shutdown this listener thread. 078 */ 079 public void shutdown() 080 { 081 shutdown = true; 082 } 083 084 /** 085 * Run method for this class. 086 */ 087 @Override 088 public void run() 089 { 090 UpdateMessage updateMsg = null; 091 092 if (debugEnabled()) 093 { 094 TRACER.debugInfo("Replication Listener thread starting."); 095 } 096 097 while (!shutdown) 098 { 099 try 100 { 101 // Loop receiving update messages and puting them in the update message 102 // queue 103 while ((!shutdown) && ((updateMsg = repDomain.receive()) != null)) 104 { 105 // Put update message into the queue (block until some place in the 106 // queue is available) 107 UpdateToReplay updateToReplay = 108 new UpdateToReplay(updateMsg, repDomain); 109 boolean queued = false; 110 while (!queued && !shutdown) 111 { 112 // Use timedout method (offer) instead of put for being able to 113 // shutdown the thread 114 queued = updateToReplayQueue.offer(updateToReplay, 115 1L, TimeUnit.SECONDS); 116 } 117 if (!queued) 118 { 119 // Shutdown requested but could not push message: ensure this one is 120 // not lost and put it in the queue before dying 121 updateToReplayQueue.offer(updateToReplay); 122 } 123 } 124 if (updateMsg == null) 125 shutdown = true; 126 } catch (Exception e) 127 { 128 /* 129 * catch all exceptions happening in repDomain.receive so that the 130 * thread never dies even in case of problems. 131 */ 132 Message message = ERR_EXCEPTION_RECEIVING_REPLICATION_MESSAGE.get( 133 stackTraceToSingleLineString(e)); 134 logError(message); 135 } 136 } 137 138 // Stop the HeartBeat thread 139 repDomain.getBroker().stopHeartBeat(); 140 141 done = true; 142 143 if (debugEnabled()) 144 { 145 TRACER.debugInfo("Replication Listener thread stopping."); 146 } 147 } 148 149 /** 150 * Wait for the completion of this thread. 151 */ 152 public void waitForShutdown() 153 { 154 try 155 { 156 while (done == false) 157 { 158 Thread.sleep(50); 159 } 160 } catch (InterruptedException e) 161 { 162 // exit the loop if this thread is interrupted. 163 } 164 } 165 }