Package Martel :: Package test :: Module test_RecordReader2
[hide private]
[frames] | no frames]

Source Code for Module Martel.test.test_RecordReader2

  1  # Still more tests of the RecordReader.  This one stresses the ability 
  2  # to find newlines and pass in lookahead text 
  3   
  4  from cStringIO import StringIO 
  5  import string 
  6  from Martel import RecordReader 
  7  from mx import TextTools as TT 
  8   
9 -def count_records(reader):
10 i = 0 11 #print "Testing new reader", reader 12 while 1: 13 x = reader.next() 14 #print "Read", repr(x) 15 if x is None: 16 break 17 i = i + 1 18 return i
19 20 # Notice how this omits the final newline?
21 -def normalize(s):
22 return string.join(TT.splitlines(s), "\n")
23 24 25 26 data1 = """\ 27 ID Q1234 28 DE Some protein 29 SQ blah 30 ABCDE FGHIJ 31 // 32 ID Q2345 33 DE ID 34 SQ lahb 35 ID just checking 36 BCDEF GHIJA 37 // 38 ID Q3456 39 DE me proteinSo 40 SQ ahbl 41 CDEFG HIJID 42 // 43 """ 44 data1 = normalize(data1) 45 46 ### StartsWith 47
48 -def test_startswith_generic():
49 to_eol_data = (("A\n", 1), 50 ("AA\n", 1), 51 ("A\nB\n", 1), 52 ("A\nB\nB\n", 1), 53 ("A\nB\nB\nA\n", 2), 54 ("A\nA\nB\nA\n", 3), 55 ("A\n A\nA\n", 2), 56 ("A A A A A A A A A A A A A A\nB\nA\n", 2), 57 ) 58 for s, expected in to_eol_data: 59 reader = RecordReader.StartsWith(StringIO(s), "A") 60 count = count_records(reader) 61 assert count == expected, (s, expected, count) # skips to EOL
62 63
64 -def test_startswith_SP():
65 # Check using a SWISS-PROT-like format 66 for ending in ("\n", "\r", "\r\n"): 67 s = string.replace(data1, "\n", ending) 68 for final in ("", ending): 69 d = s + final 70 71 for i in range(5, 20): 72 infile = StringIO(d) 73 reader = RecordReader.StartsWith(infile, "ID", i) 74 count = count_records(reader) 75 assert count == 3, (ending, final, i, count, d) 76 77 for i in range(6, 20): 78 infile = StringIO(d) 79 reader = RecordReader.StartsWith(infile, "ID ", i) 80 count = count_records(reader) 81 assert count == 3, (ending, final, i, count, d)
82
83 -def test_startswith_exhaustive(ending):
84 # Exhaustive test of the various combinations. Should catch most 85 # edge conditions. 86 for base in ("A" + ending, "A" + ending + "BA" + ending): 87 for repeat in range(0, 15): 88 s = base * repeat 89 infile = StringIO(s) 90 #for marker in ("A", "A\n"): # Don't use; bug when using "\n" 91 for marker in ("A",): 92 for look in range(5): 93 lookahead = base * look 94 for readhint in range(4, 10): 95 infile.seek(0) 96 reader = RecordReader.StartsWith(\ 97 infile, marker, readhint, lookahead) 98 count = count_records(reader) 99 assert count == repeat + look, \ 100 (count, ending, base, repeat, marker, 101 look, readhint) 102 infile.seek(0) 103 reader = RecordReader.StartsWith(infile, marker, 104 lookahead = lookahead) 105 count = count_records(reader) 106 assert count == repeat + look, \ 107 (count, ending, repeat, marker, look)
108
109 -def test_startswith_remainder():
110 # Make sure the remainder method works 111 for repeat in range(20): 112 vals = map(lambda x: "A\n%d\n" % x, range(repeat)) 113 data = string.join(vals, "") 114 infile = StringIO(data) 115 for look in range(10) + range(10, len(data), 5): 116 infile.seek(look) 117 lookahead = data[:look] 118 reader = RecordReader.StartsWith(infile, "A", 119 lookahead = lookahead) 120 all = "" 121 while 1: 122 file, lh = reader.remainder() 123 pos = file.tell() 124 rest = file.read() 125 assert all + lh + rest == data, (all, lh, rest, data) 126 file.seek(pos) 127 assert data.startswith(all), (data, all) 128 record = reader.next() 129 if record is None: 130 break 131 all = all + record 132 assert all == data, (all, data)
133 134
135 -def test_startswith_errors():
136 # Check the failure cases. Actually, there's only one. 137 138 # Doesn't start with A 139 for s in ("B", "B\n", " A\n", " A", "B\nA\n"): 140 try: 141 infile = StringIO(s) 142 # The current implementation will fail here, but the 143 # interface spec allows the error to be unreported 144 # until reading the record. 145 reader = RecordReader.StartsWith(infile, "A") 146 rec = reader.next() 147 raise AssertionError, "should not allow %r" % s 148 except RecordReader.ReaderError: 149 pass 150 else: 151 raise AssertionError, "should not get here"
152
153 -def test_startswith():
154 print "Testing StartsWith" 155 156 print " ... generic" 157 test_startswith_generic() 158 159 print " ... newline variations" 160 test_startswith_SP() 161 162 for ending in ("\n", "\r", "\r\n"): 163 print " ... exhaustive testing against %r" % ending 164 test_startswith_exhaustive(ending) 165 166 print " ... remainder" 167 test_startswith_remainder() 168 169 print " ... format errors" 170 test_startswith_errors()
171 172 ### EndsWith 173
174 -def test_endswith_generic():
175 to_eol_data = (("A\n", 1), 176 ("AA\n", 1), 177 ("B\nA\n", 1), 178 ("B\nB\nA\n", 1), 179 ("A\nA\n", 2), 180 ("A A\nA\n", 2), # this changes with an "A\n" reader 181 ("A", 1), 182 ("A\nA A\nA\n", 3), # this changes with an "A\n" reader 183 ("A\nA A\nA", 3), # this changes with an "A\n" reader 184 ) 185 for s, expected in to_eol_data: 186 reader = RecordReader.EndsWith(StringIO(s), "A") 187 count = count_records(reader) 188 assert count == expected, (s, expected, count) # skips to EOL 189 190 191 newline_data = (("A\n", 1), 192 #("AA\n", 1), # not legal 193 ("B\nA\n", 1), 194 ("B\nB\nA\n", 1), 195 ("A\nA\n", 2), 196 ("A A\nA\n", 1), # this changed with an "A\n" reader 197 ("A", 1), 198 ("A\nA A\nA\n", 2), # this changed with an "A\n" reader 199 ("A\nA A\nA", 2), # this changed with an "A\n" reader 200 ) 201 for s, expected in newline_data: 202 reader = RecordReader.EndsWith(StringIO(s), "A\n") 203 count = count_records(reader) 204 assert count == expected, (s, expected, count) # expects newline
205 206
207 -def test_endswith_SP():
208 # Check using a SWISS-PROT-like format 209 for ending in ("\n", "\r", "\r\n"): 210 s = string.replace(data1, "\n", ending) 211 for final in ("", ending): 212 d = s + final 213 214 loop = 0 215 for i in range(5, 20): 216 infile = StringIO(d) 217 reader = RecordReader.EndsWith(infile, "//", i) 218 count = count_records(reader) 219 assert count == 3, (ending, final, i, count, d) 220 221 for i in range(5, 20): 222 infile = StringIO(d) 223 reader = RecordReader.EndsWith(infile, "//\n", i) 224 count = count_records(reader) 225 assert count == 3, (ending, final, i, count, d)
226 227
228 -def test_endswith_exhaustive(ending):
229 # Exhaustive test of the various combinations. Should catch most 230 # edge conditions. 231 for base in ("A" + ending, "BA" + ending + "A" + ending): 232 for repeat in range(0, 15): 233 s = base * repeat 234 infile = StringIO(s) 235 for marker in ("A", "A\n"): 236 for look in range(5): 237 lookahead = base * look 238 for readhint in range(4, 10): 239 infile.seek(0) 240 reader = RecordReader.EndsWith(\ 241 infile, marker, readhint, lookahead) 242 count = count_records(reader) 243 assert count == repeat + look, \ 244 (count, ending, base, repeat, marker, 245 look, readhint) 246 infile.seek(0) 247 reader = RecordReader.EndsWith(infile, marker, 248 lookahead = lookahead) 249 count = count_records(reader) 250 assert count == repeat + look, \ 251 (count, ending, repeat, marker, look)
252
253 -def test_endswith_remainder():
254 # Make sure the remainder method works 255 for repeat in range(20): 256 vals = map(lambda x: "%d\nA\n" % x, range(repeat)) 257 data = string.join(vals, "") 258 infile = StringIO(data) 259 for look in range(10) + range(10, len(data), 5): 260 infile.seek(look) 261 lookahead = data[:look] 262 reader = RecordReader.EndsWith(infile, "A", 263 lookahead = lookahead) 264 all = "" 265 while 1: 266 file, lh = reader.remainder() 267 pos = file.tell() 268 rest = file.read() 269 assert all + lh + rest == data, (all, lh, rest, data) 270 file.seek(pos) 271 assert data.startswith(all), (data, all) 272 record = reader.next() 273 if record is None: 274 break 275 all = all + record 276 assert all == data, (all, data)
277
278 -def test_endswith_errors():
279 # Check the failure cases. 280 281 # Could no record at all 282 # Could be some records followed by an incomplete record 283 # Could be a line which partially matches the data 284 for s in ("B", "B\n", "A\nB\n", "A\nB\nA\nB\n", "A\nB\nA\n ", "AA", 285 "AA\n", "A\nB\nA X\n"): 286 has_error = 0 287 infile = StringIO(s) 288 try: 289 reader = RecordReader.EndsWith(infile, "A\n") 290 except RecordReader.ReaderError: 291 has_error = 1 292 293 if not has_error: 294 while not has_error: 295 try: 296 rec = reader.next() 297 except RecordReader.ReaderError: 298 has_error = 1 299 if not has_error and rec is None: 300 break 301 if not has_error: 302 raise AssertionError, "should not get here with %r" % s 303 304 # Could no record at all 305 # Could be some records followed by an incomplete record 306 # *Allowed* to read rest of line 307 for s in ("B", "B\n", "A\nB\n", "A\nB\nA\nB\n", "A\nB\nA\n "): 308 has_error = 0 309 infile = StringIO(s) 310 try: 311 reader = RecordReader.EndsWith(infile, "A") 312 except RecordReader.ReaderError: 313 has_error = 1 314 315 if not has_error: 316 while not has_error: 317 try: 318 rec = reader.next() 319 except RecordReader.ReaderError: 320 has_error = 1 321 if not has_error and rec is None: 322 break 323 if not has_error: 324 raise AssertionError, "should not get here with %r" % s
325 326 327
328 -def test_endswith():
329 print "Testing EndsWith" 330 331 print " ... generic" 332 test_endswith_generic() 333 334 print " ... newline variations" 335 test_endswith_SP() 336 337 for ending in ("\n", "\r", "\r\n"): 338 print " ... exhaustive testing against %r" % ending 339 test_endswith_exhaustive(ending) 340 341 print " ... remainder" 342 test_endswith_remainder() 343 344 print " ... format errors" 345 test_endswith_errors()
346 347 ### Until 348 349 # Don't need to do that much testing since the code is built on 350 # top of the StartsWith reader, which has already been tested.
351 -def test_until():
352 # Can only read at most one record 353 print "Testing Until" 354 test_data = ("A", 355 "A\n", 356 "A\nB", 357 "A\nB\n", 358 "A\nBCDE\nQWE\nTRYU\nA\n", 359 "A\nA\nA\nA\nA\nA\nA\n", 360 "AB", 361 "AB\n", 362 "AB\nAC", 363 "AB\nAC\n", 364 ) 365 366 for ending in ("\n", "\r", "\r\n"): 367 for pre in ("", "B\n", "BA\n", "CA\nBA\n"): 368 pre_nl = string.replace(pre, "\n", ending) 369 for look in (0, 1, 3): 370 for text in test_data: 371 text_nl = string.replace(text, "\n", ending) 372 s = pre_nl + text_nl 373 reader = RecordReader.Until(StringIO(s[look:]), 374 "A", 375 lookahead = s[:look], 376 sizehint = 4) 377 found_record = None 378 while 1: 379 rec = reader.next() 380 if rec is None: 381 break 382 assert not found_record, \ 383 "Already found %r but also found %r in %r" % \ 384 (found_record, rec, s) 385 found_record = rec 386 387 assert found_record == pre_nl, \ 388 "Expecting record %r, found %r in %r" % \ 389 (pre_nl, found_record, s) 390 infile, remainder = reader.remainder() 391 remainder = remainder + infile.read() 392 assert remainder == text_nl, \ 393 "Expecting remainder %r, found %r in %r" % \ 394 (text_nl, remainder, s)
395 396 ### CountLines 397
398 -def test_count_lines():
399 # Create a set of 'i' lines and read 'count' lines at a time. 400 # Either count divides i or it doesn't. If it does, the reader 401 # should go to completion. If it does not, the reader should 402 # have a remainder whose size can be verified. 403 404 print "Testing CountLines" 405 for ending in ("\n", "\r", "\r\n"): 406 print " ... exhaustive testing against %r" % ending 407 s = "" 408 for i in range(25): 409 for count in range(1,i+1): 410 for look in (0, 2, 5): 411 rep, final = divmod(i, count) 412 reader = RecordReader.CountLines(StringIO(s[look:]), 413 count, 414 lookahead = s[:look], 415 sizehint = 1) 416 all = "" 417 while rep > 0: 418 rec = reader.next() 419 lines = string.split(rec, ending) 420 assert len(lines)-1 == count, \ 421 "Expecting %d lines, got %d in %r using %r" % \ 422 (count, len(lines)-1, rec, ending) 423 all = all + rec 424 rep = rep - 1 425 426 if final == 0: 427 # Reader should be at the end of input 428 rec = reader.next() 429 assert rec is None, \ 430 "Should be at end of reader, got %r" % rec 431 rec = reader.next() 432 assert rec is None, \ 433 "data after end of reader, got %r" % rec 434 else: 435 # There should be a remainder of size i % count lines 436 infile, remainder = reader.remainder() 437 text = remainder + infile.read() 438 all = all + text 439 lines = string.split(text, ending) 440 assert len(lines)-1 == final, \ 441 "Expecting %d final lines, got %d" % \ 442 (final, len(lines)-1) 443 try: 444 rec = reader.next() 445 raise AssertionError, \ 446 "Got unexpected final record, %r" % rec 447 except RecordReader.ReaderError: 448 pass 449 assert all == s, \ 450 "record data %r doesn't rebuild input %r" % \ 451 (all, s) 452 s = s + str(i) + ending
453 454 ### Nothing 455
456 -def test_nothing():
457 print "Testing Nothing" 458 459 s = "This is a test.\nThis is only a test.\nHad this been an actual...\n" 460 for ending in ("\n", "\r", "\r\n"): 461 data = string.replace(s, "\n", ending) 462 for look in (0, 1, 2, 5): 463 reader = RecordReader.Nothing(StringIO(data[look:]), 464 sizehint = 1, 465 lookahead = data[:look]) 466 rec = reader.next() 467 assert rec is None, "should be empty, not %r" % rec 468 rec = reader.next() 469 assert rec is None, "2nd time should also be empty, not %r" % rec 470 471 infile, remainder = reader.remainder() 472 remainder = remainder + infile.read() 473 assert remainder == data, "Why %r when input was %r?" % \ 474 (remainder, data)
475 476 ### Everything 477
478 -def test_everything():
479 print "Testing Everything" 480 481 s = "This is a test.\nThis is only a test.\nHad this been an actual...\n" 482 for ending in ("\n", "\r", "\r\n"): 483 data = string.replace(s, "\n", ending) 484 for look in (0, 1, 2, 5): 485 reader = RecordReader.Everything(StringIO(data[look:]), 486 sizehint = 1, 487 lookahead = data[:look]) 488 rec = reader.next() 489 assert rec == data, "Record %r is not same as input %r" % \ 490 (rec, data) 491 infile, remainder = reader.remainder() 492 remainder = remainder + infile.read() 493 assert not remainder, "Why is there a remainder of %r?" % \ 494 remainder 495 496 rec = reader.next() 497 assert rec is None, "Expecting None after final read, got %r" % \ 498 rec 499 rec = reader.next() 500 assert rec is None, "Expecting None (again), got %r" % rec
501 502 503 504 ### test driver 505
506 -def test():
507 test_startswith() 508 test_endswith() 509 test_until() 510 test_count_lines() 511 test_nothing() 512 test_everything()
513 514 if __name__ == "__main__": 515 test() 516