org.opends.server.replication.plugin
Class ReplicationDomain

java.lang.Object
  extended by java.lang.Thread
      extended by org.opends.server.api.DirectoryThread
          extended by org.opends.server.replication.plugin.ReplicationDomain
All Implemented Interfaces:
java.lang.Runnable, ConfigurationChangeListener<ReplicationDomainCfg>, AlertGenerator

public class ReplicationDomain
extends DirectoryThread
implements ConfigurationChangeListener<ReplicationDomainCfg>, AlertGenerator

This class implements the bulk part of the.of the Directory Server side of the replication code. It contains the root method for publishing a change, processing a change received from the replicationServer service, handle conflict resolution, handle protocol messages from the replicationServer.


Nested Class Summary
 
Nested classes/interfaces inherited from class java.lang.Thread
java.lang.Thread.State, java.lang.Thread.UncaughtExceptionHandler
 
Field Summary
static java.lang.String DS_SYNC_CONFLICT
          The attribute used to mark conflicting entries.
protected static java.lang.String REPLICATION_GENERATION_ID
          The attribute name used to store the state in the backend.
 
Fields inherited from class java.lang.Thread
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY
 
Constructor Summary
ReplicationDomain(ReplicationDomainCfg configuration, java.util.concurrent.LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
          Creates a new ReplicationDomain using configuration from configEntry.
 
Method Summary
protected  void abandonImportExport(ErrorMessage errorMsg)
          Processes an error message received while an import/export is on going.
 void ack(ChangeNumber changeNumber)
          Send an Ack message.
 ConfigChangeResult applyConfigurationChange(ReplicationDomainCfg configuration)
          Applies the configuration changes to this change listener.
 void backupEnd()
          Do whatever is needed when a backup is finished.
 void backupStart()
          Do whatever is needed when a backup is started.
static void clearJEBackend(boolean createBaseEntry, java.lang.String beID, java.lang.String dn)
          Clears all the entries from the JE backend determined by the be id passed into the method.
protected  void closeBackendImport(Backend backend)
          Make post import operations.
 long computeGenerationId()
          Compute the data generationId associated with the current data present in the backend for this domain.
 short decodeSource(java.lang.String sourceString)
          Verifies that the given string represents a valid source from which this server can be initialized.
 short decodeTarget(java.lang.String targetString)
          Verifies that the given string represents a valid source from which this server can be initialized.
 void disable()
          Disable the replication on this domain.
 void doPreOperation(PreOperationAddOperation addOperation)
          The preOperation phase for the add Operation.
 void enable()
          Enable back the domain after a previous disable.
protected  void exportBackend()
          Export the entries from the backend.
 void exportLDIFEntry(java.lang.String lDIFEntry)
          Exports an entry in LDIF format.
 java.util.LinkedHashMap<java.lang.String,java.lang.String> getAlerts()
          Retrieves information about the set of alerts that this generator may produce.
 Backend getBackend()
          Returns the backend associated to this domain.
 DN getBaseDN()
          Returns the base DN of this ReplicationDomain.
 java.lang.String getClassName()
          Retrieves the fully-qualified name of the Java class for this alert generator implementation.
 DN getComponentEntryDN()
          Retrieves the DN of the configuration entry with which this alert generator is associated.
 int getCurrentRcvWindow()
          Get the current receive window size.
 int getCurrentSendWindow()
          Get the current send window size.
 int getDebugCount()
          Get the debugCount.
 long getGenerationId()
          Returns the generationId set for this domain.
 int getMaxRcvWindow()
          Get the maximum receive window size.
 int getMaxSendWindow()
          Get the maximum send window size.
 int getNumLostConnections()
          Get the number of times the replication connection was lost.
 int getNumProcessedUpdates()
          get the number of updates replayed by the replication.
 int getNumRcvdUpdates()
          get the number of updates received by the replication plugin.
 int getNumReplayedPostOpCalled()
          get the number of updates replayed successfully by the replication.
 int getNumResolvedModifyConflicts()
          Get the number of modify conflicts successfully resolved.
 int getNumResolvedNamingConflicts()
          Get the number of namign conflicts successfully resolved.
 int getNumSentUpdates()
          Get the number of updates sent by the replication plugin.
 int getNumUnresolvedNamingConflicts()
          Get the number of unresolved conflicts.
 int getPendingUpdatesCount()
          Get the number of updates in the pending list.
 java.lang.String getReplicationServer()
          Get the name of the replicationServer to which this domain is currently connected.
 int getServerId()
          Get the server ID.
 ServerState getServerState()
          get the ServerState.
 SynchronizationProviderResult handleConflictResolution(PreOperationAddOperation addOperation)
          Implement the handleConflictResolution phase of the addOperation.
 SynchronizationProviderResult handleConflictResolution(PreOperationDeleteOperation deleteOperation)
          Implement the handleConflictResolution phase of the deleteOperation.
 SynchronizationProviderResult handleConflictResolution(PreOperationModifyDNOperation modifyDNOperation)
          Implement the handleConflictResolution phase of the ModifyDNOperation.
 SynchronizationProviderResult handleConflictResolution(PreOperationModifyOperation modifyOperation)
          Handle the conflict resolution.
 boolean ieRunning()
          Returns a boolean indiciating if an import or export is currently processed.
 void incProcessedUpdates()
          Increment the number of processed updates.
protected  void initialize(InitializeTargetMessage initializeMessage)
          Initializes the domain's backend with received entries.
 void initializeFromRemote(short source, Task initTask)
          Initializes this domain from another source server.
 void initializeRemote(short target, short requestorID, Task initTask)
          Process the initialization of some other server or servers in the topology specified by the target argument when this initialization specifying the server that requests the initialization.
 void initializeRemote(short target, Task initTask)
          Process the initialization of some other server or servers in the topology specified by the target argument.
static boolean isConfigurationAcceptable(ReplicationDomainCfg configuration, java.util.List<Message> unacceptableReasons)
          Check if the provided configuration is acceptable for add.
 boolean isConfigurationChangeAcceptable(ReplicationDomainCfg configuration, java.util.List<Message> unacceptableReasons)
          Indicates whether the proposed change to the configuration is acceptable to this change listener.
 boolean isConnected()
          Check if the domain is connected to a ReplicationServer.
 boolean isSessionEncrypted()
          Determine whether the connection to the replication server is encrypted.
protected  void loadDataState()
          Do what necessary when the data have changed : load state, load generation Id.
 long loadGenerationId()
          Load the GenerationId from the root entry of the domain from the REPLICATION_GENERATION_ID attribute in database to memory, or compute it if not found.
 UpdateMessage receive()
          Receives an update message from the replicationServer.
 void receiveAck(AckMessage ack)
          Do the necessary processing when an AckMessage is received.
 byte[] receiveEntryBytes()
          Receives bytes related to an entry in the context of an import to initialize the domain (called by ReplLDIFInputStream).
 void receiveUpdate(UpdateMessage update)
          Do the necessary processing when an UpdateMessage was received.
 void replay(UpdateMessage msg)
          Create and replay a synchronized Operation from an UpdateMessage.
 void resetGenerationId(java.lang.Long generationIdNewValue)
          Reset the generationId of this domain in the whole topology.
protected static Backend retrievesBackend(DN baseDN)
          Retrieves the backend related to the domain.
static ReplicationDomain retrievesReplicationDomain(DN baseDN)
          Retrieves a replication domain based on the baseDN.
 void run()
          
 ResultCode saveGenerationId(long generationId)
          Stores the value of the generationId.
 void shutdown()
          Shutdown this ReplicationDomain.
 boolean solveConflict()
          Check if the domain solve conflicts.
 void synchronize(PostOperationOperation op)
          Check if an operation must be synchronized.
 void synchronizeModifications(java.util.List<Modification> modifications)
          Push the modifications contain the in given parameter has a modification that would happen on a local server.
 void updateError(ChangeNumber changeNumber)
          This method is called when an error happens while replaying an operation.
 
Methods inherited from class org.opends.server.api.DirectoryThread
getAssociatedTask, getCreationStackTrace, getDebugProperties, getParentThread, setAssociatedTask
 
Methods inherited from class java.lang.Thread
activeCount, checkAccess, clone, countStackFrames, currentThread, destroy, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, start, stop, stop, suspend, toString, yield
 
Methods inherited from class java.lang.Object
equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

DS_SYNC_CONFLICT

public static final java.lang.String DS_SYNC_CONFLICT
The attribute used to mark conflicting entries. The value of this attribute should be the dn that this entry was supposed to have when it was marked as conflicting.

See Also:
Constant Field Values

REPLICATION_GENERATION_ID

protected static final java.lang.String REPLICATION_GENERATION_ID
The attribute name used to store the state in the backend.

See Also:
Constant Field Values
Constructor Detail

ReplicationDomain

public ReplicationDomain(ReplicationDomainCfg configuration,
                         java.util.concurrent.LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
                  throws ConfigException
Creates a new ReplicationDomain using configuration from configEntry.

Parameters:
configuration - The configuration of this ReplicationDomain.
updateToReplayQueue - The queue for update messages to replay.
Throws:
ConfigException - In case of invalid configuration.
Method Detail

getBaseDN

public DN getBaseDN()
Returns the base DN of this ReplicationDomain.

Returns:
The base DN of this ReplicationDomain

handleConflictResolution

public SynchronizationProviderResult handleConflictResolution(PreOperationDeleteOperation deleteOperation)
Implement the handleConflictResolution phase of the deleteOperation.

Parameters:
deleteOperation - The deleteOperation.
Returns:
A SynchronizationProviderResult indicating if the operation can continue.

handleConflictResolution

public SynchronizationProviderResult handleConflictResolution(PreOperationAddOperation addOperation)
Implement the handleConflictResolution phase of the addOperation.

Parameters:
addOperation - The AddOperation.
Returns:
A SynchronizationProviderResult indicating if the operation can continue.

handleConflictResolution

public SynchronizationProviderResult handleConflictResolution(PreOperationModifyDNOperation modifyDNOperation)
Implement the handleConflictResolution phase of the ModifyDNOperation.

Parameters:
modifyDNOperation - The ModifyDNOperation.
Returns:
A SynchronizationProviderResult indicating if the operation can continue.

handleConflictResolution

public SynchronizationProviderResult handleConflictResolution(PreOperationModifyOperation modifyOperation)
Handle the conflict resolution. Called by the core server after locking the entry and before starting the actual modification.

Parameters:
modifyOperation - the operation
Returns:
code indicating is operation must proceed

doPreOperation

public void doPreOperation(PreOperationAddOperation addOperation)
The preOperation phase for the add Operation. Its job is to generate the replication context associated to the operation. It is necessary to do it in this phase because contrary to the other operations, the entry uid is not set when the handleConflict phase is called.

Parameters:
addOperation - The Add Operation.

receive

public UpdateMessage receive()
Receives an update message from the replicationServer. also responsible for updating the list of pending changes

Returns:
the received message - null if none

receiveUpdate

public void receiveUpdate(UpdateMessage update)
Do the necessary processing when an UpdateMessage was received.

Parameters:
update - The received UpdateMessage.

receiveAck

public void receiveAck(AckMessage ack)
Do the necessary processing when an AckMessage is received.

Parameters:
ack - The AckMessage that was received.

synchronize

public void synchronize(PostOperationOperation op)
Check if an operation must be synchronized. Also update the list of pending changes and the server RUV

Parameters:
op - the operation

getNumRcvdUpdates

public int getNumRcvdUpdates()
get the number of updates received by the replication plugin.

Returns:
the number of updates received

getNumSentUpdates

public int getNumSentUpdates()
Get the number of updates sent by the replication plugin.

Returns:
the number of updates sent

getPendingUpdatesCount

public int getPendingUpdatesCount()
Get the number of updates in the pending list.

Returns:
The number of updates in the pending list

incProcessedUpdates

public void incProcessedUpdates()
Increment the number of processed updates.


getNumProcessedUpdates

public int getNumProcessedUpdates()
get the number of updates replayed by the replication.

Returns:
The number of updates replayed by the replication

getNumReplayedPostOpCalled

public int getNumReplayedPostOpCalled()
get the number of updates replayed successfully by the replication.

Returns:
The number of updates replayed successfully

getServerState

public ServerState getServerState()
get the ServerState.

Returns:
the ServerState

getDebugCount

public int getDebugCount()
Get the debugCount.

Returns:
Returns the debugCount.

ack

public void ack(ChangeNumber changeNumber)
Send an Ack message.

Parameters:
changeNumber - The ChangeNumber for which the ack must be sent.

run

public void run()

Specified by:
run in interface java.lang.Runnable
Overrides:
run in class java.lang.Thread

shutdown

public void shutdown()
Shutdown this ReplicationDomain.


getReplicationServer

public java.lang.String getReplicationServer()
Get the name of the replicationServer to which this domain is currently connected.

Returns:
the name of the replicationServer to which this domain is currently connected.

replay

public void replay(UpdateMessage msg)
Create and replay a synchronized Operation from an UpdateMessage.

Parameters:
msg - The UpdateMessage to be replayed.

updateError

public void updateError(ChangeNumber changeNumber)
This method is called when an error happens while replaying an operation. It is necessary because the postOperation does not always get called when error or Exceptions happen during the operation replay.

Parameters:
changeNumber - the ChangeNumber of the operation with error.

getMaxRcvWindow

public int getMaxRcvWindow()
Get the maximum receive window size.

Returns:
The maximum receive window size.

getCurrentRcvWindow

public int getCurrentRcvWindow()
Get the current receive window size.

Returns:
The current receive window size.

getMaxSendWindow

public int getMaxSendWindow()
Get the maximum send window size.

Returns:
The maximum send window size.

getCurrentSendWindow

public int getCurrentSendWindow()
Get the current send window size.

Returns:
The current send window size.

getNumLostConnections

public int getNumLostConnections()
Get the number of times the replication connection was lost.

Returns:
The number of times the replication connection was lost.

getNumResolvedModifyConflicts

public int getNumResolvedModifyConflicts()
Get the number of modify conflicts successfully resolved.

Returns:
The number of modify conflicts successfully resolved.

getNumResolvedNamingConflicts

public int getNumResolvedNamingConflicts()
Get the number of namign conflicts successfully resolved.

Returns:
The number of naming conflicts successfully resolved.

getNumUnresolvedNamingConflicts

public int getNumUnresolvedNamingConflicts()
Get the number of unresolved conflicts.

Returns:
The number of unresolved conflicts.

getServerId

public int getServerId()
Get the server ID.

Returns:
The server ID.

solveConflict

public boolean solveConflict()
Check if the domain solve conflicts.

Returns:
a boolean indicating if the domain should sove conflicts.

disable

public void disable()
Disable the replication on this domain. The session to the replication server will be stopped. The domain will not be destroyed but call to the pre-operation methods will result in failure. The listener thread will be destroyed. The monitor informations will still be accessible.


loadDataState

protected void loadDataState()
                      throws DirectoryException
Do what necessary when the data have changed : load state, load generation Id.

Throws:
DirectoryException - Thrown when an error occurs.

enable

public void enable()
Enable back the domain after a previous disable. The domain will connect back to a replication Server and will recreate threads to listen for messages from the Sycnhronization server. The generationId will be retrieved or computed if necessary. The ServerState will also be read again from the local database.


computeGenerationId

public long computeGenerationId()
                         throws DirectoryException
Compute the data generationId associated with the current data present in the backend for this domain.

Returns:
The computed generationId.
Throws:
DirectoryException - When an error occurs.

getGenerationId

public long getGenerationId()
Returns the generationId set for this domain.

Returns:
The generationId.

saveGenerationId

public ResultCode saveGenerationId(long generationId)
Stores the value of the generationId.

Parameters:
generationId - The value of the generationId.
Returns:
a ResultCode indicating if the method was successfull.

loadGenerationId

public long loadGenerationId()
                      throws DirectoryException
Load the GenerationId from the root entry of the domain from the REPLICATION_GENERATION_ID attribute in database to memory, or compute it if not found.

Returns:
generationId The retrieved value of generationId
Throws:
DirectoryException - When an error occurs.

resetGenerationId

public void resetGenerationId(java.lang.Long generationIdNewValue)
Reset the generationId of this domain in the whole topology. A message is sent to the Replication Servers for them to reset their change dbs.

Parameters:
generationIdNewValue - The new value of the generation Id.

backupStart

public void backupStart()
Do whatever is needed when a backup is started. We need to make sure that the serverState is correclty save.


backupEnd

public void backupEnd()
Do whatever is needed when a backup is finished.


receiveEntryBytes

public byte[] receiveEntryBytes()
Receives bytes related to an entry in the context of an import to initialize the domain (called by ReplLDIFInputStream).

Returns:
The bytes. Null when the Done or Err message has been received

abandonImportExport

protected void abandonImportExport(ErrorMessage errorMsg)
Processes an error message received while an import/export is on going.

Parameters:
errorMsg - The error message received.

clearJEBackend

public static void clearJEBackend(boolean createBaseEntry,
                                  java.lang.String beID,
                                  java.lang.String dn)
                           throws java.lang.Exception
Clears all the entries from the JE backend determined by the be id passed into the method.

Parameters:
createBaseEntry - Indicate whether to automatically create the base entry and add it to the backend.
beID - The be id to clear.
dn - The suffix of the backend to create if the the createBaseEntry boolean is true.
Throws:
java.lang.Exception - If an unexpected problem occurs.

exportBackend

protected void exportBackend()
                      throws DirectoryException
Export the entries from the backend. The ieContext must have been set before calling.

Throws:
DirectoryException - when an error occurred

retrievesBackend

protected static Backend retrievesBackend(DN baseDN)
Retrieves the backend related to the domain.

Parameters:
baseDN - The baseDN to retrieve the backend
Returns:
The backend of that domain.

exportLDIFEntry

public void exportLDIFEntry(java.lang.String lDIFEntry)
                     throws java.io.IOException
Exports an entry in LDIF format.

Parameters:
lDIFEntry - The entry to be exported..
Throws:
java.io.IOException - when an error occurred.

initializeFromRemote

public void initializeFromRemote(short source,
                                 Task initTask)
                          throws DirectoryException
Initializes this domain from another source server.

Parameters:
source - The source from which to initialize
initTask - The task that launched the initialization and should be updated of its progress.
Throws:
DirectoryException - when an error occurs

decodeSource

public short decodeSource(java.lang.String sourceString)
                   throws DirectoryException
Verifies that the given string represents a valid source from which this server can be initialized.

Parameters:
sourceString - The string representing the source
Returns:
The source as a short value
Throws:
DirectoryException - if the string is not valid

decodeTarget

public short decodeTarget(java.lang.String targetString)
                   throws DirectoryException
Verifies that the given string represents a valid source from which this server can be initialized.

Parameters:
targetString - The string representing the source
Returns:
The source as a short value
Throws:
DirectoryException - if the string is not valid

initializeRemote

public void initializeRemote(short target,
                             Task initTask)
                      throws DirectoryException
Process the initialization of some other server or servers in the topology specified by the target argument.

Parameters:
target - The target that should be initialized
initTask - The task that triggers this initialization and that should be updated with its progress.
Throws:
DirectoryException - When an error occurs.

initializeRemote

public void initializeRemote(short target,
                             short requestorID,
                             Task initTask)
                      throws DirectoryException
Process the initialization of some other server or servers in the topology specified by the target argument when this initialization specifying the server that requests the initialization.

Parameters:
target - The target that should be initialized.
requestorID - The server that initiated the export.
initTask - The task that triggers this initialization and that should be updated with its progress.
Throws:
DirectoryException - When an error occurs.

initialize

protected void initialize(InitializeTargetMessage initializeMessage)
                   throws DirectoryException
Initializes the domain's backend with received entries.

Parameters:
initializeMessage - The message that initiated the import.
Throws:
DirectoryException - Thrown when an error occurs.

closeBackendImport

protected void closeBackendImport(Backend backend)
                           throws DirectoryException
Make post import operations.

Parameters:
backend - The backend implied in the import.
Throws:
DirectoryException - Thrown when an error occurs.

retrievesReplicationDomain

public static ReplicationDomain retrievesReplicationDomain(DN baseDN)
                                                    throws DirectoryException
Retrieves a replication domain based on the baseDN.

Parameters:
baseDN - The baseDN of the domain to retrieve
Returns:
The domain retrieved
Throws:
DirectoryException - When an error occurred or no domain match the provided baseDN.

getBackend

public Backend getBackend()
Returns the backend associated to this domain.

Returns:
The associated backend.

ieRunning

public boolean ieRunning()
Returns a boolean indiciating if an import or export is currently processed.

Returns:
The status

synchronizeModifications

public void synchronizeModifications(java.util.List<Modification> modifications)
Push the modifications contain the in given parameter has a modification that would happen on a local server. The modifications are not applied to the local database, historical information is not updated but a ChangeNumber is generated and the ServerState associated to this domain is updated.

Parameters:
modifications - The modification to push

isConfigurationAcceptable

public static boolean isConfigurationAcceptable(ReplicationDomainCfg configuration,
                                                java.util.List<Message> unacceptableReasons)
Check if the provided configuration is acceptable for add.

Parameters:
configuration - The configuration to check.
unacceptableReasons - When the configuration is not acceptable, this table is use to return the reasons why this configuration is not acceptbale.
Returns:
true if the configuration is acceptable, false other wise.

applyConfigurationChange

public ConfigChangeResult applyConfigurationChange(ReplicationDomainCfg configuration)
Applies the configuration changes to this change listener.

Specified by:
applyConfigurationChange in interface ConfigurationChangeListener<ReplicationDomainCfg>
Parameters:
configuration - The new configuration containing the changes.
Returns:
Returns information about the result of changing the configuration.

isConfigurationChangeAcceptable

public boolean isConfigurationChangeAcceptable(ReplicationDomainCfg configuration,
                                               java.util.List<Message> unacceptableReasons)
Indicates whether the proposed change to the configuration is acceptable to this change listener.

Specified by:
isConfigurationChangeAcceptable in interface ConfigurationChangeListener<ReplicationDomainCfg>
Parameters:
configuration - The new configuration containing the changes.
unacceptableReasons - A list that can be used to hold messages about why the provided configuration is not acceptable.
Returns:
Returns true if the proposed change is acceptable, or false if it is not.

getAlerts

public java.util.LinkedHashMap<java.lang.String,java.lang.String> getAlerts()
Retrieves information about the set of alerts that this generator may produce. The map returned should be between the notification type for a particular notification and the human-readable description for that notification. This alert generator must not generate any alerts with types that are not contained in this list.

Specified by:
getAlerts in interface AlertGenerator
Returns:
Information about the set of alerts that this generator may produce.

getClassName

public java.lang.String getClassName()
Retrieves the fully-qualified name of the Java class for this alert generator implementation.

Specified by:
getClassName in interface AlertGenerator
Returns:
The fully-qualified name of the Java class for this alert generator implementation.

getComponentEntryDN

public DN getComponentEntryDN()
Retrieves the DN of the configuration entry with which this alert generator is associated.

Specified by:
getComponentEntryDN in interface AlertGenerator
Returns:
The DN of the configuration entry with which this alert generator is associated.

isConnected

public boolean isConnected()
Check if the domain is connected to a ReplicationServer.

Returns:
true if the server is connected, false if not.

isSessionEncrypted

public boolean isSessionEncrypted()
Determine whether the connection to the replication server is encrypted.

Returns:
true if the connection is encrypted, false otherwise.