1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.commons.net.telnet;
19
20 import java.io.BufferedInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24
25
26
27
28
29
30
31
32
33
34
35
36 final class TelnetInputStream extends BufferedInputStream implements Runnable
37 {
38 static final int _STATE_DATA = 0, _STATE_IAC = 1, _STATE_WILL = 2,
39 _STATE_WONT = 3, _STATE_DO = 4, _STATE_DONT = 5,
40 _STATE_SB = 6, _STATE_SE = 7, _STATE_CR = 8, _STATE_IAC_SB = 9;
41
42 private boolean __hasReachedEOF, __isClosed;
43 private boolean __readIsWaiting;
44 private int __receiveState, __queueHead, __queueTail, __bytesAvailable;
45 private int[] __queue;
46 private TelnetClient __client;
47 private Thread __thread;
48 private IOException __ioException;
49
50
51 private int __suboption[] = new int[256];
52 private int __suboption_count = 0;
53
54
55 private boolean __threaded;
56
57 TelnetInputStream(InputStream input, TelnetClient client,
58 boolean readerThread)
59 {
60 super(input);
61 __client = client;
62 __receiveState = _STATE_DATA;
63 __isClosed = true;
64 __hasReachedEOF = false;
65
66
67 __queue = new int[2049];
68 __queueHead = 0;
69 __queueTail = 0;
70 __bytesAvailable = 0;
71 __ioException = null;
72 __readIsWaiting = false;
73 __threaded = false;
74 if(readerThread)
75 __thread = new Thread(this);
76 else
77 __thread = null;
78 }
79
80 TelnetInputStream(InputStream input, TelnetClient client) {
81 this(input, client, true);
82 }
83
84 void _start()
85 {
86 if(__thread == null)
87 return;
88
89 int priority;
90 __isClosed = false;
91
92
93
94
95 priority = Thread.currentThread().getPriority() + 1;
96 if (priority > Thread.MAX_PRIORITY)
97 priority = Thread.MAX_PRIORITY;
98 __thread.setPriority(priority);
99 __thread.setDaemon(true);
100 __thread.start();
101 __threaded = true;
102 }
103
104
105
106
107
108
109 private int __read(boolean mayBlock) throws IOException
110 {
111 int ch;
112
113 _loop:
114 while (true)
115 {
116
117
118 if(!mayBlock && super.available() == 0)
119 return -2;
120
121
122 if ((ch = super.read()) < 0)
123 return -1;
124
125 ch = (ch & 0xff);
126
127
128 synchronized (__client)
129 {
130 __client._processAYTResponse();
131 }
132
133
134
135 __client._spyRead(ch);
136
137
138 _mainSwitch:
139 switch (__receiveState)
140 {
141
142 case _STATE_CR:
143 if (ch == '\0')
144 {
145
146 continue;
147 }
148
149
150
151
152
153 case _STATE_DATA:
154 if (ch == TelnetCommand.IAC)
155 {
156 __receiveState = _STATE_IAC;
157 continue;
158 }
159
160
161 if (ch == '\r')
162 {
163 synchronized (__client)
164 {
165 if (__client._requestedDont(TelnetOption.BINARY))
166 __receiveState = _STATE_CR;
167 else
168 __receiveState = _STATE_DATA;
169 }
170 }
171 else
172 __receiveState = _STATE_DATA;
173 break;
174
175 case _STATE_IAC:
176 switch (ch)
177 {
178 case TelnetCommand.WILL:
179 __receiveState = _STATE_WILL;
180 continue;
181 case TelnetCommand.WONT:
182 __receiveState = _STATE_WONT;
183 continue;
184 case TelnetCommand.DO:
185 __receiveState = _STATE_DO;
186 continue;
187 case TelnetCommand.DONT:
188 __receiveState = _STATE_DONT;
189 continue;
190
191 case TelnetCommand.SB:
192 __suboption_count = 0;
193 __receiveState = _STATE_SB;
194 continue;
195
196 case TelnetCommand.IAC:
197 __receiveState = _STATE_DATA;
198 break;
199 default:
200 break;
201 }
202 __receiveState = _STATE_DATA;
203 continue;
204 case _STATE_WILL:
205 synchronized (__client)
206 {
207 __client._processWill(ch);
208 __client._flushOutputStream();
209 }
210 __receiveState = _STATE_DATA;
211 continue;
212 case _STATE_WONT:
213 synchronized (__client)
214 {
215 __client._processWont(ch);
216 __client._flushOutputStream();
217 }
218 __receiveState = _STATE_DATA;
219 continue;
220 case _STATE_DO:
221 synchronized (__client)
222 {
223 __client._processDo(ch);
224 __client._flushOutputStream();
225 }
226 __receiveState = _STATE_DATA;
227 continue;
228 case _STATE_DONT:
229 synchronized (__client)
230 {
231 __client._processDont(ch);
232 __client._flushOutputStream();
233 }
234 __receiveState = _STATE_DATA;
235 continue;
236
237 case _STATE_SB:
238 switch (ch)
239 {
240 case TelnetCommand.IAC:
241 __receiveState = _STATE_IAC_SB;
242 continue;
243 default:
244
245 __suboption[__suboption_count++] = ch;
246 break;
247 }
248 __receiveState = _STATE_SB;
249 continue;
250 case _STATE_IAC_SB:
251 switch (ch)
252 {
253 case TelnetCommand.SE:
254 synchronized (__client)
255 {
256 __client._processSuboption(__suboption, __suboption_count);
257 __client._flushOutputStream();
258 }
259 __receiveState = _STATE_DATA;
260 continue;
261 default:
262 __receiveState = _STATE_SB;
263 break;
264 }
265 __receiveState = _STATE_DATA;
266 continue;
267
268 }
269
270 break;
271 }
272
273 return ch;
274 }
275
276
277
278
279
280 private void __processChar(int ch) throws InterruptedException
281 {
282
283
284 synchronized (__queue)
285 {
286 while (__bytesAvailable >= __queue.length - 1)
287 {
288
289
290 if(__threaded)
291 {
292 __queue.notify();
293 try
294 {
295 __queue.wait();
296 }
297 catch (InterruptedException e)
298 {
299 throw e;
300 }
301 }
302 else
303 {
304
305
306 throw new IllegalStateException("Queue is full! Cannot process another character.");
307 }
308 }
309
310
311 if (__readIsWaiting && __threaded)
312 {
313 __queue.notify();
314 }
315
316 __queue[__queueTail] = ch;
317 ++__bytesAvailable;
318
319 if (++__queueTail >= __queue.length)
320 __queueTail = 0;
321 }
322 }
323
324 @Override
325 public int read() throws IOException
326 {
327
328
329
330 synchronized (__queue)
331 {
332
333 while (true)
334 {
335 if (__ioException != null)
336 {
337 IOException e;
338 e = __ioException;
339 __ioException = null;
340 throw e;
341 }
342
343 if (__bytesAvailable == 0)
344 {
345
346 if (__hasReachedEOF)
347 return -1;
348
349
350 if(__threaded)
351 {
352 __queue.notify();
353 try
354 {
355 __readIsWaiting = true;
356 __queue.wait();
357 __readIsWaiting = false;
358 }
359 catch (InterruptedException e)
360 {
361 throw new InterruptedIOException("Fatal thread interruption during read.");
362 }
363 }
364 else
365 {
366
367 __readIsWaiting = true;
368 int ch;
369 boolean mayBlock = true;
370
371 do
372 {
373 try
374 {
375 if ((ch = __read(mayBlock)) < 0)
376 if(ch != -2)
377 return (ch);
378 }
379 catch (InterruptedIOException e)
380 {
381 synchronized (__queue)
382 {
383 __ioException = e;
384 __queue.notifyAll();
385 try
386 {
387 __queue.wait(100);
388 }
389 catch (InterruptedException interrupted)
390 {
391 }
392 }
393 return (-1);
394 }
395
396
397 try
398 {
399 if(ch != -2)
400 {
401 __processChar(ch);
402 }
403 }
404 catch (InterruptedException e)
405 {
406 if (__isClosed)
407 return (-1);
408 }
409
410
411
412 mayBlock = false;
413
414 }
415
416 while (super.available() > 0 && __bytesAvailable < __queue.length - 1);
417
418 __readIsWaiting = false;
419 }
420 continue;
421 }
422 else
423 {
424 int ch;
425
426 ch = __queue[__queueHead];
427
428 if (++__queueHead >= __queue.length)
429 __queueHead = 0;
430
431 --__bytesAvailable;
432
433
434 if(__bytesAvailable == 0 && __threaded) {
435 __queue.notify();
436 }
437
438 return ch;
439 }
440 }
441 }
442 }
443
444
445
446
447
448
449
450
451
452
453
454
455
456 @Override
457 public int read(byte buffer[]) throws IOException
458 {
459 return read(buffer, 0, buffer.length);
460 }
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477 @Override
478 public int read(byte buffer[], int offset, int length) throws IOException
479 {
480 int ch, off;
481
482 if (length < 1)
483 return 0;
484
485
486 synchronized (__queue)
487 {
488 if (length > __bytesAvailable)
489 length = __bytesAvailable;
490 }
491
492 if ((ch = read()) == -1)
493 return -1;
494
495 off = offset;
496
497 do
498 {
499 buffer[offset++] = (byte)ch;
500 }
501 while (--length > 0 && (ch = read()) != -1);
502
503
504 return (offset - off);
505 }
506
507
508
509 @Override
510 public boolean markSupported()
511 {
512 return false;
513 }
514
515 @Override
516 public int available() throws IOException
517 {
518
519 synchronized (__queue)
520 {
521 return __bytesAvailable;
522 }
523 }
524
525
526
527
528 @Override
529 public void close() throws IOException
530 {
531
532
533
534
535 super.close();
536
537 synchronized (__queue)
538 {
539 __hasReachedEOF = true;
540 __isClosed = true;
541
542 if (__thread != null && __thread.isAlive())
543 {
544 __thread.interrupt();
545 }
546
547 __queue.notifyAll();
548 }
549
550 __threaded = false;
551 }
552
553 public void run()
554 {
555 int ch;
556
557 try
558 {
559 _outerLoop:
560 while (!__isClosed)
561 {
562 try
563 {
564 if ((ch = __read(true)) < 0)
565 break;
566 }
567 catch (InterruptedIOException e)
568 {
569 synchronized (__queue)
570 {
571 __ioException = e;
572 __queue.notifyAll();
573 try
574 {
575 __queue.wait(100);
576 }
577 catch (InterruptedException interrupted)
578 {
579 if (__isClosed)
580 break _outerLoop;
581 }
582 continue;
583 }
584 } catch(RuntimeException re) {
585
586
587
588 super.close();
589
590
591 break _outerLoop;
592 }
593
594 try
595 {
596 __processChar(ch);
597 }
598 catch (InterruptedException e)
599 {
600 if (__isClosed)
601 break _outerLoop;
602 }
603 }
604 }
605 catch (IOException ioe)
606 {
607 synchronized (__queue)
608 {
609 __ioException = ioe;
610 }
611 }
612
613 synchronized (__queue)
614 {
615 __isClosed = true;
616 __hasReachedEOF = true;
617 __queue.notify();
618 }
619
620 __threaded = false;
621 }
622 }
623
624
625
626
627
628
629
630