View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.net.telnet;
19  
20  import java.io.BufferedInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  
25  /***
26   *
27   * <p>
28   *
29   * <p>
30   * <p>
31   * @author Daniel F. Savarese
32   * @author Bruno D'Avanzo
33   ***/
34  
35  
36  final class TelnetInputStream extends BufferedInputStream implements Runnable
37  {
38      static final int _STATE_DATA = 0, _STATE_IAC = 1, _STATE_WILL = 2,
39                       _STATE_WONT = 3, _STATE_DO = 4, _STATE_DONT = 5,
40                       _STATE_SB = 6, _STATE_SE = 7, _STATE_CR = 8, _STATE_IAC_SB = 9;
41  
42      private boolean __hasReachedEOF, __isClosed;
43      private boolean __readIsWaiting;
44      private int __receiveState, __queueHead, __queueTail, __bytesAvailable;
45      private int[] __queue;
46      private TelnetClient __client;
47      private Thread __thread;
48      private IOException __ioException;
49  
50      /* TERMINAL-TYPE option (start)*/
51      private int __suboption[] = new int[256];
52      private int __suboption_count = 0;
53      /* TERMINAL-TYPE option (end)*/
54  
55      private boolean __threaded;
56  
57      TelnetInputStream(InputStream input, TelnetClient client,
58                        boolean readerThread)
59      {
60          super(input);
61          __client = client;
62          __receiveState = _STATE_DATA;
63          __isClosed = true;
64          __hasReachedEOF = false;
65          // Make it 2049, because when full, one slot will go unused, and we
66          // want a 2048 byte buffer just to have a round number (base 2 that is)
67          __queue = new int[2049];
68          __queueHead = 0;
69          __queueTail = 0;
70          __bytesAvailable = 0;
71          __ioException = null;
72          __readIsWaiting = false;
73          __threaded = false;
74          if(readerThread)
75              __thread = new Thread(this);
76          else
77              __thread = null;
78      }
79  
80      TelnetInputStream(InputStream input, TelnetClient client) {
81          this(input, client, true);
82      }
83  
84      void _start()
85      {
86          if(__thread == null)
87              return;
88  
89          int priority;
90          __isClosed = false;
91          // TODO remove this
92          // Need to set a higher priority in case JVM does not use pre-emptive
93          // threads.  This should prevent scheduler induced deadlock (rather than
94          // deadlock caused by a bug in this code).
95          priority = Thread.currentThread().getPriority() + 1;
96          if (priority > Thread.MAX_PRIORITY)
97              priority = Thread.MAX_PRIORITY;
98          __thread.setPriority(priority);
99          __thread.setDaemon(true);
100         __thread.start();
101         __threaded = true;
102     }
103 
104 
105     // synchronized(__client) critical sections are to protect against
106     // TelnetOutputStream writing through the telnet client at same time
107     // as a processDo/Will/etc. command invoked from TelnetInputStream
108     // tries to write.
109     private int __read(boolean mayBlock) throws IOException
110     {
111         int ch;
112 
113 _loop:
114         while (true)
115         {
116  
117             // If there is no more data AND we were told not to block, just return -2. (More efficient than exception.)
118             if(!mayBlock && super.available() == 0)
119                 return -2;
120             
121             // Otherwise, exit only when we reach end of stream.
122             if ((ch = super.read()) < 0)
123                 return -1;
124 
125             ch = (ch & 0xff);
126 
127             /* Code Section added for supporting AYT (start)*/
128             synchronized (__client)
129             {
130                 __client._processAYTResponse();
131             }
132             /* Code Section added for supporting AYT (end)*/
133 
134             /* Code Section added for supporting spystreams (start)*/
135             __client._spyRead(ch);
136             /* Code Section added for supporting spystreams (end)*/
137 
138 _mainSwitch:
139             switch (__receiveState)
140             {
141 
142             case _STATE_CR:
143                 if (ch == '\0')
144                 {
145                     // Strip null
146                     continue;
147                 }
148                 // How do we handle newline after cr?
149                 //  else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&
150 
151                 // Handle as normal data by falling through to _STATE_DATA case
152 
153             case _STATE_DATA:
154                 if (ch == TelnetCommand.IAC)
155                 {
156                     __receiveState = _STATE_IAC;
157                     continue;
158                 }
159 
160 
161                 if (ch == '\r')
162                 {
163                     synchronized (__client)
164                     {
165                         if (__client._requestedDont(TelnetOption.BINARY))
166                             __receiveState = _STATE_CR;
167                         else
168                             __receiveState = _STATE_DATA;
169                     }
170                 }
171                 else
172                     __receiveState = _STATE_DATA;
173                 break;
174 
175             case _STATE_IAC:
176                 switch (ch)
177                 {
178                 case TelnetCommand.WILL:
179                     __receiveState = _STATE_WILL;
180                     continue;
181                 case TelnetCommand.WONT:
182                     __receiveState = _STATE_WONT;
183                     continue;
184                 case TelnetCommand.DO:
185                     __receiveState = _STATE_DO;
186                     continue;
187                 case TelnetCommand.DONT:
188                     __receiveState = _STATE_DONT;
189                     continue;
190                 /* TERMINAL-TYPE option (start)*/
191                 case TelnetCommand.SB:
192                     __suboption_count = 0;
193                     __receiveState = _STATE_SB;
194                     continue;
195                 /* TERMINAL-TYPE option (end)*/
196                 case TelnetCommand.IAC:
197                     __receiveState = _STATE_DATA;
198                     break;
199                 default:
200                     break;
201                 }
202                 __receiveState = _STATE_DATA;
203                 continue;
204             case _STATE_WILL:
205                 synchronized (__client)
206                 {
207                     __client._processWill(ch);
208                     __client._flushOutputStream();
209                 }
210                 __receiveState = _STATE_DATA;
211                 continue;
212             case _STATE_WONT:
213                 synchronized (__client)
214                 {
215                     __client._processWont(ch);
216                     __client._flushOutputStream();
217                 }
218                 __receiveState = _STATE_DATA;
219                 continue;
220             case _STATE_DO:
221                 synchronized (__client)
222                 {
223                     __client._processDo(ch);
224                     __client._flushOutputStream();
225                 }
226                 __receiveState = _STATE_DATA;
227                 continue;
228             case _STATE_DONT:
229                 synchronized (__client)
230                 {
231                     __client._processDont(ch);
232                     __client._flushOutputStream();
233                 }
234                 __receiveState = _STATE_DATA;
235                 continue;
236             /* TERMINAL-TYPE option (start)*/
237             case _STATE_SB:
238                 switch (ch)
239                 {
240                 case TelnetCommand.IAC:
241                     __receiveState = _STATE_IAC_SB;
242                     continue;
243                 default:
244                     // store suboption char
245                     __suboption[__suboption_count++] = ch;
246                     break;
247                 }
248                 __receiveState = _STATE_SB;
249                 continue;
250             case _STATE_IAC_SB:
251                 switch (ch)
252                 {
253                 case TelnetCommand.SE:
254                     synchronized (__client)
255                     {
256                         __client._processSuboption(__suboption, __suboption_count);
257                         __client._flushOutputStream();
258                     }
259                     __receiveState = _STATE_DATA;
260                     continue;
261                 default:
262                     __receiveState = _STATE_SB;
263                     break;
264                 }
265                 __receiveState = _STATE_DATA;
266                 continue;
267             /* TERMINAL-TYPE option (end)*/
268             }
269 
270             break;
271         }
272 
273         return ch;
274     }
275 
276     // synchronized(__client) critical sections are to protect against
277     // TelnetOutputStream writing through the telnet client at same time
278     // as a processDo/Will/etc. command invoked from TelnetInputStream
279     // tries to write.
280     private void __processChar(int ch) throws InterruptedException
281     {
282         // Critical section because we're altering __bytesAvailable,
283         // __queueTail, and the contents of _queue.
284         synchronized (__queue)
285         {
286             while (__bytesAvailable >= __queue.length - 1)
287             {
288                 // The queue is full. We need to wait before adding any more data to it. Hopefully the stream owner
289                 // will consume some data soon! 
290                 if(__threaded)
291                 {
292                     __queue.notify();
293                     try
294                     {
295                         __queue.wait();
296                     }
297                     catch (InterruptedException e)
298                     {
299                         throw e;
300                     }
301                 }
302                 else
303                 {
304                     // We've been asked to add another character to the queue, but it is already full and there's
305                     // no other thread to drain it. This should not have happened! 
306                     throw new IllegalStateException("Queue is full! Cannot process another character.");
307                 }
308             }
309 
310             // Need to do this in case we're not full, but block on a read
311             if (__readIsWaiting && __threaded)
312             {
313                 __queue.notify();
314             }
315 
316             __queue[__queueTail] = ch;
317             ++__bytesAvailable;
318 
319             if (++__queueTail >= __queue.length)
320                 __queueTail = 0;
321         }
322     }
323 
324     @Override
325     public int read() throws IOException
326     {
327         // Critical section because we're altering __bytesAvailable,
328         // __queueHead, and the contents of _queue in addition to
329         // testing value of __hasReachedEOF.
330         synchronized (__queue)
331         {
332 
333             while (true)
334             {
335                 if (__ioException != null)
336                 {
337                     IOException e;
338                     e = __ioException;
339                     __ioException = null;
340                     throw e;
341                 }
342 
343                 if (__bytesAvailable == 0)
344                 {
345                     // Return -1 if at end of file
346                     if (__hasReachedEOF)
347                         return -1;
348 
349                     // Otherwise, we have to wait for queue to get something
350                     if(__threaded)
351                     {
352                         __queue.notify();
353                         try
354                         {
355                             __readIsWaiting = true;
356                             __queue.wait();
357                             __readIsWaiting = false;
358                         }
359                         catch (InterruptedException e)
360                         {
361                             throw new InterruptedIOException("Fatal thread interruption during read.");
362                         }
363                     }
364                     else
365                     {
366                         //__alreadyread = false;
367                         __readIsWaiting = true;
368                         int ch;
369                         boolean mayBlock = true;    // block on the first read only
370                         
371                         do
372                         {
373                             try
374                             {
375                                 if ((ch = __read(mayBlock)) < 0)
376                                     if(ch != -2)
377                                         return (ch);
378                             }
379                             catch (InterruptedIOException e)
380                             {
381                                 synchronized (__queue)
382                                 {
383                                     __ioException = e;
384                                     __queue.notifyAll();
385                                     try
386                                     {
387                                         __queue.wait(100);
388                                     }
389                                     catch (InterruptedException interrupted)
390                                     {
391                                     }
392                                 }
393                                 return (-1);
394                             }
395 
396 
397                             try
398                             {
399                                 if(ch != -2)
400                                 {
401                                     __processChar(ch);
402                                 }
403                             }
404                             catch (InterruptedException e)
405                             {
406                                 if (__isClosed)
407                                     return (-1);
408                             }
409                             
410                             // Reads should not block on subsequent iterations. Potentially, this could happen if the 
411                             // remaining buffered socket data consists entirely of Telnet command sequence and no "user" data.
412                             mayBlock = false;
413                             
414                         }
415                         // Continue reading as long as there is data available and the queue is not full.
416                         while (super.available() > 0 && __bytesAvailable < __queue.length - 1);
417                         
418                         __readIsWaiting = false;
419                     }
420                     continue;
421                 }
422                 else
423                 {
424                     int ch;
425 
426                     ch = __queue[__queueHead];
427 
428                     if (++__queueHead >= __queue.length)
429                         __queueHead = 0;
430 
431                     --__bytesAvailable;
432 
433             // Need to explicitly notify() so available() works properly
434             if(__bytesAvailable == 0 && __threaded) {
435                 __queue.notify();
436             }
437             
438                     return ch;
439                 }
440             }
441         }
442     }
443 
444 
445     /***
446      * Reads the next number of bytes from the stream into an array and
447      * returns the number of bytes read.  Returns -1 if the end of the
448      * stream has been reached.
449      * <p>
450      * @param buffer  The byte array in which to store the data.
451      * @return The number of bytes read. Returns -1 if the
452      *          end of the message has been reached.
453      * @exception IOException If an error occurs in reading the underlying
454      *            stream.
455      ***/
456     @Override
457     public int read(byte buffer[]) throws IOException
458     {
459         return read(buffer, 0, buffer.length);
460     }
461 
462 
463     /***
464      * Reads the next number of bytes from the stream into an array and returns
465      * the number of bytes read.  Returns -1 if the end of the
466      * message has been reached.  The characters are stored in the array
467      * starting from the given offset and up to the length specified.
468      * <p>
469      * @param buffer The byte array in which to store the data.
470      * @param offset  The offset into the array at which to start storing data.
471      * @param length   The number of bytes to read.
472      * @return The number of bytes read. Returns -1 if the
473      *          end of the stream has been reached.
474      * @exception IOException If an error occurs while reading the underlying
475      *            stream.
476      ***/
477     @Override
478     public int read(byte buffer[], int offset, int length) throws IOException
479     {
480         int ch, off;
481 
482         if (length < 1)
483             return 0;
484 
485         // Critical section because run() may change __bytesAvailable
486         synchronized (__queue)
487         {
488             if (length > __bytesAvailable)
489                 length = __bytesAvailable;
490         }
491 
492         if ((ch = read()) == -1)
493             return -1;
494 
495         off = offset;
496 
497         do
498         {
499             buffer[offset++] = (byte)ch;
500         }
501         while (--length > 0 && (ch = read()) != -1);
502 
503         //__client._spyRead(buffer, off, offset - off);
504         return (offset - off);
505     }
506 
507 
508     /*** Returns false.  Mark is not supported. ***/
509     @Override
510     public boolean markSupported()
511     {
512         return false;
513     }
514 
515     @Override
516     public int available() throws IOException
517     {
518         // Critical section because run() may change __bytesAvailable
519         synchronized (__queue)
520         {
521             return __bytesAvailable;
522         }
523     }
524 
525 
526     // Cannot be synchronized.  Will cause deadlock if run() is blocked
527     // in read because BufferedInputStream read() is synchronized.
528     @Override
529     public void close() throws IOException
530     {
531         // Completely disregard the fact thread may still be running.
532         // We can't afford to block on this close by waiting for
533         // thread to terminate because few if any JVM's will actually
534         // interrupt a system read() from the interrupt() method.
535         super.close();
536 
537         synchronized (__queue)
538         {
539             __hasReachedEOF = true;
540             __isClosed      = true;
541 
542             if (__thread != null && __thread.isAlive())
543             {
544                 __thread.interrupt();
545             }
546 
547             __queue.notifyAll();
548         }
549 
550         __threaded = false;
551     }
552 
553     public void run()
554     {
555         int ch;
556 
557         try
558         {
559 _outerLoop:
560             while (!__isClosed)
561             {
562                 try
563                 {
564                     if ((ch = __read(true)) < 0)
565                         break;
566                 }
567                 catch (InterruptedIOException e)
568                 {
569                     synchronized (__queue)
570                     {
571                         __ioException = e;
572                         __queue.notifyAll();
573                         try
574                         {
575                             __queue.wait(100);
576                         }
577                         catch (InterruptedException interrupted)
578                         {
579                             if (__isClosed)
580                                 break _outerLoop;
581                         }
582                         continue;
583                     }
584                 } catch(RuntimeException re) {
585                     // We treat any runtime exceptions as though the
586                     // stream has been closed.  We close the
587                     // underlying stream just to be sure.
588                     super.close();
589                     // Breaking the loop has the effect of setting
590                     // the state to closed at the end of the method.
591                     break _outerLoop;
592                 }
593 
594                 try
595                 {
596                     __processChar(ch);
597                 }
598                 catch (InterruptedException e)
599                 {
600                     if (__isClosed)
601                         break _outerLoop;
602                 }
603             }
604         }
605         catch (IOException ioe)
606         {
607             synchronized (__queue)
608             {
609                 __ioException = ioe;
610             }
611         }
612 
613         synchronized (__queue)
614         {
615             __isClosed      = true; // Possibly redundant
616             __hasReachedEOF = true;
617             __queue.notify();
618         }
619 
620         __threaded = false;
621     }
622 }
623 
624 /* Emacs configuration
625  * Local variables:        **
626  * mode:             java  **
627  * c-basic-offset:   4     **
628  * indent-tabs-mode: nil   **
629  * End:                    **
630  */