001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.tools.makeldif; 028 029 030 031 import java.io.ByteArrayOutputStream; 032 import java.io.InputStream; 033 import java.io.IOException; 034 import java.nio.ByteBuffer; 035 import java.util.concurrent.LinkedBlockingQueue; 036 import java.util.concurrent.TimeUnit; 037 038 import org.opends.server.types.Entry; 039 import org.opends.server.types.LDIFExportConfig; 040 import org.opends.server.util.LDIFException; 041 import org.opends.server.util.LDIFWriter; 042 043 044 045 /** 046 * This class creates an input stream that can be used to read entries generated 047 * by MakeLDIF as if they were being read from another source like a file. It 048 * has a fixed-size queue that dictates how many entries may be held in memory 049 * at any given time. 050 */ 051 public class MakeLDIFInputStream 052 extends InputStream 053 implements EntryWriter 054 { 055 // Indicates whether all of the entries have been generated. 056 private boolean allGenerated; 057 058 // Indicates whether this input stream has been closed. 059 private boolean closed; 060 061 // The byte array output stream that will be used to convert entries to byte 062 // arrays with their LDIF representations. 063 private ByteArrayOutputStream entryOutputStream; 064 065 // The byte array that will hold the LDIF representation of the next entry to 066 // be read. 067 private ByteBuffer entryBytes; 068 069 // The IOException that should be thrown the next time a read is requested. 070 private IOException ioException; 071 072 // The LDIF writer that will be used to write the entries to LDIF. 073 private LDIFWriter ldifWriter; 074 075 // The queue used to hold generated entries until they can be read. 076 private LinkedBlockingQueue<Entry> entryQueue; 077 078 // The background thread being used to actually generate the entries. 079 private MakeLDIFInputStreamThread generatorThread; 080 081 // The template file to use to generate the entries. 082 private TemplateFile templateFile; 083 084 085 086 /** 087 * Creates a new MakeLDIF input stream that will generate entries based on the 088 * provided template file. 089 * 090 * @param templateFile The template file to use to generate the entries. 091 */ 092 public MakeLDIFInputStream(TemplateFile templateFile) 093 { 094 this.templateFile = templateFile; 095 096 allGenerated = false; 097 closed = false; 098 entryQueue = new LinkedBlockingQueue<Entry>(10); 099 ioException = null; 100 entryBytes = null; 101 102 entryOutputStream = new ByteArrayOutputStream(8192); 103 LDIFExportConfig exportConfig = new LDIFExportConfig(entryOutputStream); 104 105 try 106 { 107 ldifWriter = new LDIFWriter(exportConfig); 108 } 109 catch (IOException ioe) 110 { 111 // This should never happen. 112 ioException = ioe; 113 } 114 115 generatorThread = new MakeLDIFInputStreamThread(this, templateFile); 116 generatorThread.start(); 117 } 118 119 120 121 /** 122 * Closes this input stream so that no more data may be read from it. 123 */ 124 public void close() 125 { 126 closed = true; 127 ioException = null; 128 } 129 130 131 132 /** 133 * Reads a single byte of data from this input stream. 134 * 135 * @return The byte read from the input stream, or -1 if the end of the 136 * stream has been reached. 137 * 138 * @throws IOException If a problem has occurred while generating data for 139 * use by this input stream. 140 */ 141 public int read() 142 throws IOException 143 { 144 if (closed) 145 { 146 return -1; 147 } 148 else if (ioException != null) 149 { 150 throw ioException; 151 } 152 153 if ((entryBytes == null) || (! entryBytes.hasRemaining())) 154 { 155 if (! getNextEntry()) 156 { 157 closed = true; 158 return -1; 159 } 160 } 161 162 return (0xFF & entryBytes.get()); 163 } 164 165 166 167 /** 168 * Reads data from this input stream. 169 * 170 * @param b The array into which the data should be read. 171 * @param off The position in the array at which point the data read may be 172 * placed. 173 * @param len The maximum number of bytes that may be read into the 174 * provided array. 175 * 176 * @return The number of bytes read from the input stream into the provided 177 * array, or -1 if the end of the stream has been reached. 178 * 179 * @throws IOException If a problem has occurred while generating data for 180 * use by this input stream. 181 */ 182 public int read(byte[] b, int off, int len) 183 throws IOException 184 { 185 if (closed) 186 { 187 return -1; 188 } 189 else if (ioException != null) 190 { 191 throw ioException; 192 } 193 194 if ((entryBytes == null) || (! entryBytes.hasRemaining())) 195 { 196 if (! getNextEntry()) 197 { 198 closed = true; 199 return -1; 200 } 201 } 202 203 int bytesRead = Math.min(len, entryBytes.remaining()); 204 entryBytes.get(b, off, bytesRead); 205 return bytesRead; 206 } 207 208 209 210 /** 211 * {@inheritDoc} 212 */ 213 public boolean writeEntry(Entry entry) 214 throws IOException, MakeLDIFException 215 { 216 while (! closed) 217 { 218 try 219 { 220 if (entryQueue.offer(entry, 500, TimeUnit.MILLISECONDS)) 221 { 222 return true; 223 } 224 } catch (InterruptedException ie) {} 225 } 226 227 return false; 228 } 229 230 231 232 /** 233 * {@inheritDoc} 234 */ 235 public void closeEntryWriter() 236 { 237 allGenerated = true; 238 } 239 240 241 242 /** 243 * Sets the I/O exception that should be thrown on any subsequent calls to 244 * <CODE>available</CODE> or <CODE>read</CODE>. 245 * 246 * @param ioException The I/O exception that should be thrown. 247 */ 248 void setIOException(IOException ioException) 249 { 250 this.ioException = ioException; 251 } 252 253 254 255 /** 256 * Retrieves the next entry and puts it in the entry byte buffer. 257 * 258 * @return <CODE>true</CODE> if the next entry is available, or 259 * <CODE>false</CODE> if there are no more entries or if the input 260 * stream has been closed. 261 */ 262 private boolean getNextEntry() 263 { 264 Entry entry = entryQueue.poll(); 265 while (entry == null) 266 { 267 if (closed) 268 { 269 return false; 270 } 271 else if (allGenerated) 272 { 273 entry = entryQueue.poll(); 274 if (entry == null) 275 { 276 return false; 277 } 278 } 279 else 280 { 281 try 282 { 283 entry = entryQueue.poll(500, TimeUnit.MILLISECONDS); 284 } catch (InterruptedException ie) {} 285 } 286 } 287 288 try 289 { 290 entryOutputStream.reset(); 291 ldifWriter.writeEntry(entry); 292 ldifWriter.flush(); 293 entryBytes = ByteBuffer.wrap(entryOutputStream.toByteArray()); 294 } 295 catch (LDIFException le) 296 { 297 // This should never happen. 298 ioException = new IOException(le.getMessage()); 299 return false; 300 } 301 catch (IOException ioe) 302 { 303 // Neither should this. 304 ioException = ioe; 305 return false; 306 } 307 308 return true; 309 } 310 } 311