1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.io.nio;
16
17 import java.io.IOException;
18 import java.nio.channels.CancelledKeyException;
19 import java.nio.channels.SelectableChannel;
20 import java.nio.channels.SelectionKey;
21 import java.nio.channels.Selector;
22 import java.nio.channels.ServerSocketChannel;
23 import java.nio.channels.SocketChannel;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27
28 import org.mortbay.component.AbstractLifeCycle;
29 import org.mortbay.io.Connection;
30 import org.mortbay.io.EndPoint;
31 import org.mortbay.log.Log;
32 import org.mortbay.thread.Timeout;
33
34
35
36
37
38
39
40
41
42
43 public abstract class SelectorManager extends AbstractLifeCycle
44 {
45
46 private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.mortbay.io.nio.JVMBUG_THRESHHOLD",512).intValue();
47 private static final int __MONITOR_PERIOD=Integer.getInteger("org.mortbay.io.nio.MONITOR_PERIOD",1000).intValue();
48 private static final int __MAX_SELECTS=Integer.getInteger("org.mortbay.io.nio.MAX_SELECTS",15000).intValue();
49 private static final int __BUSY_PAUSE=Integer.getInteger("org.mortbay.io.nio.BUSY_PAUSE",50).intValue();
50 private static final int __BUSY_KEY=Integer.getInteger("org.mortbay.io.nio.BUSY_KEY",-1).intValue();
51
52 private boolean _delaySelectKeyUpdate=true;
53 private long _maxIdleTime;
54 private long _lowResourcesConnections;
55 private long _lowResourcesMaxIdleTime;
56 private transient SelectSet[] _selectSet;
57 private int _selectSets=1;
58 private volatile int _set;
59
60
61
62
63
64
65 public void setMaxIdleTime(long maxIdleTime)
66 {
67 _maxIdleTime=maxIdleTime;
68 }
69
70
71
72
73
74 public void setSelectSets(int selectSets)
75 {
76 long lrc = _lowResourcesConnections * _selectSets;
77 _selectSets=selectSets;
78 _lowResourcesConnections=lrc/_selectSets;
79 }
80
81
82
83
84
85 public long getMaxIdleTime()
86 {
87 return _maxIdleTime;
88 }
89
90
91
92
93
94 public int getSelectSets()
95 {
96 return _selectSets;
97 }
98
99
100
101
102
103 public boolean isDelaySelectKeyUpdate()
104 {
105 return _delaySelectKeyUpdate;
106 }
107
108
109
110
111
112
113
114 public void register(SocketChannel channel, Object att) throws IOException
115 {
116 int s=_set++;
117 s=s%_selectSets;
118 SelectSet[] sets=_selectSet;
119 if (sets!=null)
120 {
121 SelectSet set=sets[s];
122 set.addChange(channel,att);
123 set.wakeup();
124 }
125 }
126
127
128
129
130
131
132
133 public void register(ServerSocketChannel acceptChannel) throws IOException
134 {
135 int s=_set++;
136 s=s%_selectSets;
137 SelectSet set=_selectSet[s];
138 set.addChange(acceptChannel);
139 set.wakeup();
140 }
141
142
143
144
145
146 public long getLowResourcesConnections()
147 {
148 return _lowResourcesConnections*_selectSets;
149 }
150
151
152
153
154
155
156
157
158 public void setLowResourcesConnections(long lowResourcesConnections)
159 {
160 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
161 }
162
163
164
165
166
167 public long getLowResourcesMaxIdleTime()
168 {
169 return _lowResourcesMaxIdleTime;
170 }
171
172
173
174
175
176
177 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
178 {
179 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
180 }
181
182
183
184
185
186
187 public void doSelect(int acceptorID) throws IOException
188 {
189 SelectSet[] sets= _selectSet;
190 if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
191 sets[acceptorID].doSelect();
192 }
193
194
195
196
197
198
199 public void setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate)
200 {
201 _delaySelectKeyUpdate=delaySelectKeyUpdate;
202 }
203
204
205
206
207
208
209
210 protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
211
212
213 public abstract boolean dispatch(Runnable task) throws IOException;
214
215
216
217
218
219 protected void doStart() throws Exception
220 {
221 _selectSet = new SelectSet[_selectSets];
222 for (int i=0;i<_selectSet.length;i++)
223 _selectSet[i]= new SelectSet(i);
224
225 super.doStart();
226 }
227
228
229
230 protected void doStop() throws Exception
231 {
232 SelectSet[] sets= _selectSet;
233 _selectSet=null;
234 if (sets!=null)
235 for (int i=0;i<sets.length;i++)
236 {
237 SelectSet set = sets[i];
238 if (set!=null)
239 set.stop();
240 }
241 super.doStop();
242 }
243
244
245
246
247
248 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
249
250
251
252
253
254 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
255
256
257 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
258
259
260
261
262
263
264
265
266
267 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
268
269
270 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
271 {
272 Log.warn(ex);
273 }
274
275
276
277
278 public class SelectSet
279 {
280 private transient int _change;
281 private transient List[] _changes;
282 private transient Timeout _idleTimeout;
283 private transient int _nextSet;
284 private transient Timeout _retryTimeout;
285 private transient Selector _selector;
286 private transient int _setID;
287 private volatile boolean _selecting;
288 private transient int _jvmBug;
289 private int _selects;
290 private long _monitorStart;
291 private long _monitorNext;
292 private boolean _pausing;
293 private SelectionKey _busyKey;
294 private int _busyKeyCount;
295 private long _log;
296 private int _paused;
297 private int _jvmFix0;
298 private int _jvmFix1;
299 private int _jvmFix2;
300
301
302 SelectSet(int acceptorID) throws Exception
303 {
304 _setID=acceptorID;
305
306 _idleTimeout = new Timeout(this);
307 _idleTimeout.setDuration(getMaxIdleTime());
308 _retryTimeout = new Timeout(this);
309 _retryTimeout.setDuration(0L);
310
311
312 _selector = Selector.open();
313 _changes = new ArrayList[] {new ArrayList(),new ArrayList()};
314 _change=0;
315 _monitorStart=System.currentTimeMillis();
316 _monitorNext=_monitorStart+__MONITOR_PERIOD;
317 _log=_monitorStart+60000;
318 }
319
320
321 public void addChange(Object point)
322 {
323 synchronized (_changes)
324 {
325 _changes[_change].add(point);
326 }
327 }
328
329
330 public void addChange(SelectableChannel channel, Object att)
331 {
332 if (att==null)
333 addChange(channel);
334 else if (att instanceof EndPoint)
335 addChange(att);
336 else
337 addChange(new ChangeSelectableChannel(channel,att));
338 }
339
340
341 public void cancelIdle(Timeout.Task task)
342 {
343 synchronized (this)
344 {
345 task.cancel();
346 }
347 }
348
349
350
351
352
353
354
355 public void doSelect() throws IOException
356 {
357 SelectionKey key=null;
358
359 try
360 {
361 List changes;
362 final Selector selector;
363 synchronized (_changes)
364 {
365 changes=_changes[_change];
366 _change=_change==0?1:0;
367 _selecting=true;
368 selector=_selector;
369 }
370
371
372 for (int i = 0; i < changes.size(); i++)
373 {
374 try
375 {
376 Object o = changes.get(i);
377
378 if (o instanceof EndPoint)
379 {
380
381 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
382 endpoint.doUpdateKey();
383 }
384 else if (o instanceof Runnable)
385 {
386 dispatch((Runnable)o);
387 }
388 else if (o instanceof ChangeSelectableChannel)
389 {
390
391 final ChangeSelectableChannel asc = (ChangeSelectableChannel)o;
392 final SelectableChannel channel=asc._channel;
393 final Object att = asc._attachment;
394
395 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
396 {
397 key = channel.register(selector,SelectionKey.OP_READ,att);
398 SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key);
399 key.attach(endpoint);
400 endpoint.dispatch();
401 }
402 else if (channel.isOpen())
403 {
404 channel.register(selector,SelectionKey.OP_CONNECT,att);
405 }
406 }
407 else if (o instanceof SocketChannel)
408 {
409 final SocketChannel channel=(SocketChannel)o;
410
411 if (channel.isConnected())
412 {
413 key = channel.register(selector,SelectionKey.OP_READ,null);
414 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
415 key.attach(endpoint);
416 endpoint.dispatch();
417 }
418 else if (channel.isOpen())
419 {
420 channel.register(selector,SelectionKey.OP_CONNECT,null);
421 }
422 }
423 else if (o instanceof ServerSocketChannel)
424 {
425 ServerSocketChannel channel = (ServerSocketChannel)o;
426 channel.register(getSelector(),SelectionKey.OP_ACCEPT);
427 }
428 else if (o instanceof ChangeTask)
429 {
430 ((ChangeTask)o).run();
431 }
432 else
433 throw new IllegalArgumentException(o.toString());
434 }
435 catch (Exception e)
436 {
437 if (isRunning())
438 Log.warn(e);
439 else
440 Log.debug(e);
441 }
442 }
443 changes.clear();
444
445 long idle_next = 0;
446 long retry_next = 0;
447 long now=System.currentTimeMillis();
448 synchronized (this)
449 {
450 _idleTimeout.setNow(now);
451 _retryTimeout.setNow(now);
452 if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)
453 _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
454 else
455 _idleTimeout.setDuration(_maxIdleTime);
456 idle_next=_idleTimeout.getTimeToNext();
457 retry_next=_retryTimeout.getTimeToNext();
458 }
459
460
461 long wait = 1000L;
462 if (idle_next >= 0 && wait > idle_next)
463 wait = idle_next;
464 if (wait > 0 && retry_next >= 0 && wait > retry_next)
465 wait = retry_next;
466
467
468 if (wait > 2)
469 {
470
471 if (_pausing)
472 {
473 try
474 {
475 Thread.sleep(__BUSY_PAUSE);
476 }
477 catch(InterruptedException e)
478 {
479 Log.ignore(e);
480 }
481 }
482
483 long before=now;
484 int selected=selector.select(wait);
485 now = System.currentTimeMillis();
486 _idleTimeout.setNow(now);
487 _retryTimeout.setNow(now);
488 _selects++;
489
490
491
492
493 if (now>_monitorNext)
494 {
495 _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
496 _pausing=_selects>__MAX_SELECTS;
497 if (_pausing)
498 _paused++;
499
500 _selects=0;
501 _jvmBug=0;
502 _monitorStart=now;
503 _monitorNext=now+__MONITOR_PERIOD;
504 }
505
506 if (now>_log)
507 {
508 if (_paused>0)
509 Log.info(this+" Busy selector - injecting delay "+_paused+" times");
510
511 if (_jvmFix2>0)
512 Log.info(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
513
514 if (_jvmFix1>0)
515 Log.info(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, canceled keys "+_jvmFix0+" times");
516
517 else if(Log.isDebugEnabled() && _jvmFix0>0)
518 Log.info(this+" JVM BUG(s) - canceled keys "+_jvmFix0+" times");
519 _paused=0;
520 _jvmFix2=0;
521 _jvmFix1=0;
522 _jvmFix0=0;
523 _log=now+60000;
524 }
525
526
527 if (selected==0 && wait>10 && (now-before)<(wait/2))
528 {
529
530 _jvmBug++;
531 if (_jvmBug>(__JVMBUG_THRESHHOLD))
532 {
533 try
534 {
535 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
536 _jvmFix2++;
537
538 Thread.sleep(__BUSY_PAUSE);
539 }
540 catch(InterruptedException e)
541 {
542 Log.ignore(e);
543 }
544 }
545 else if (_jvmBug==__JVMBUG_THRESHHOLD)
546 {
547 synchronized (this)
548 {
549
550 _jvmFix1++;
551
552 final Selector new_selector = Selector.open();
553 Iterator iterator = _selector.keys().iterator();
554 while (iterator.hasNext())
555 {
556 SelectionKey k = (SelectionKey)iterator.next();
557 if (!k.isValid() || k.interestOps()==0)
558 continue;
559
560 final SelectableChannel channel = k.channel();
561 final Object attachment = k.attachment();
562
563 if (attachment==null)
564 addChange(channel);
565 else
566 addChange(channel,attachment);
567 }
568 _selector.close();
569 _selector=new_selector;
570 return;
571 }
572 }
573 else if (_jvmBug%32==31)
574 {
575
576 int cancelled=0;
577 Iterator iter = selector.keys().iterator();
578 while(iter.hasNext())
579 {
580 SelectionKey k = (SelectionKey) iter.next();
581 if (k.isValid()&&k.interestOps()==0)
582 {
583 k.cancel();
584 cancelled++;
585 }
586 }
587 if (cancelled>0)
588 _jvmFix0++;
589
590 return;
591 }
592 }
593 else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
594 {
595
596 SelectionKey busy = (SelectionKey)selector.selectedKeys().iterator().next();
597 if (busy==_busyKey)
598 {
599 if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
600 {
601 final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
602 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
603 busy.cancel();
604 if (endpoint!=null)
605 {
606 dispatch(new Runnable()
607 {
608 public void run()
609 {
610 try
611 {
612 endpoint.close();
613 }
614 catch (IOException e)
615 {
616 Log.ignore(e);
617 }
618 }
619 });
620 }
621 }
622 }
623 else
624 _busyKeyCount=0;
625 _busyKey=busy;
626 }
627 }
628 else
629 {
630 selector.selectNow();
631 _selects++;
632 }
633
634
635 if (_selector==null || !selector.isOpen())
636 return;
637
638
639 Iterator iter = selector.selectedKeys().iterator();
640 while (iter.hasNext())
641 {
642 key = (SelectionKey) iter.next();
643
644 try
645 {
646 if (!key.isValid())
647 {
648 key.cancel();
649 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
650 if (endpoint != null)
651 endpoint.doUpdateKey();
652 continue;
653 }
654
655 Object att = key.attachment();
656
657 if (att instanceof SelectChannelEndPoint)
658 {
659 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)att;
660 endpoint.dispatch();
661 }
662 else if (key.isAcceptable())
663 {
664 SocketChannel channel = acceptChannel(key);
665 if (channel==null)
666 continue;
667
668 channel.configureBlocking(false);
669
670
671 _nextSet=++_nextSet%_selectSet.length;
672
673
674 if (_nextSet==_setID)
675 {
676
677 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
678 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
679 cKey.attach(endpoint);
680 if (endpoint != null)
681 endpoint.dispatch();
682 }
683 else
684 {
685
686 _selectSet[_nextSet].addChange(channel);
687 _selectSet[_nextSet].wakeup();
688 }
689 }
690 else if (key.isConnectable())
691 {
692
693 SocketChannel channel = (SocketChannel)key.channel();
694 boolean connected=false;
695 try
696 {
697 connected=channel.finishConnect();
698 }
699 catch(Exception e)
700 {
701 connectionFailed(channel,e,att);
702 }
703 finally
704 {
705 if (connected)
706 {
707 key.interestOps(SelectionKey.OP_READ);
708 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
709 key.attach(endpoint);
710 endpoint.dispatch();
711 }
712 else
713 {
714 key.cancel();
715 }
716 }
717 }
718 else
719 {
720
721 SocketChannel channel = (SocketChannel)key.channel();
722 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
723 key.attach(endpoint);
724 if (key.isReadable())
725 endpoint.dispatch();
726 }
727 key = null;
728 }
729 catch (CancelledKeyException e)
730 {
731 Log.ignore(e);
732 }
733 catch (Exception e)
734 {
735 if (isRunning())
736 Log.warn(e);
737 else
738 Log.ignore(e);
739
740 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
741 {
742 key.interestOps(0);
743
744 key.cancel();
745 }
746 }
747 }
748
749
750 selector.selectedKeys().clear();
751
752
753 _idleTimeout.tick(now);
754 _retryTimeout.tick(now);
755
756 }
757 catch (CancelledKeyException e)
758 {
759 Log.ignore(e);
760 }
761 finally
762 {
763 _selecting=false;
764 }
765 }
766
767
768 public SelectorManager getManager()
769 {
770 return SelectorManager.this;
771 }
772
773
774 public long getNow()
775 {
776 return _idleTimeout.getNow();
777 }
778
779
780 public void scheduleIdle(Timeout.Task task)
781 {
782 synchronized (this)
783 {
784 if (_idleTimeout.getDuration() <= 0)
785 return;
786
787 task.schedule(_idleTimeout);
788 }
789 }
790
791
792 public void scheduleTimeout(Timeout.Task task, long timeout)
793 {
794 synchronized (this)
795 {
796 _retryTimeout.schedule(task, timeout);
797 }
798 }
799
800
801 public void wakeup()
802 {
803 Selector selector = _selector;
804 if (selector!=null)
805 selector.wakeup();
806 }
807
808
809 Selector getSelector()
810 {
811 return _selector;
812 }
813
814
815 void stop() throws Exception
816 {
817 boolean selecting=true;
818 while(selecting)
819 {
820 wakeup();
821 selecting=_selecting;
822 }
823
824 ArrayList keys=new ArrayList(_selector.keys());
825 Iterator iter =keys.iterator();
826
827 while (iter.hasNext())
828 {
829 SelectionKey key = (SelectionKey)iter.next();
830 if (key==null)
831 continue;
832 Object att=key.attachment();
833 if (att instanceof EndPoint)
834 {
835 EndPoint endpoint = (EndPoint)att;
836 try
837 {
838 endpoint.close();
839 }
840 catch(IOException e)
841 {
842 Log.ignore(e);
843 }
844 }
845 }
846
847 synchronized (this)
848 {
849 selecting=_selecting;
850 while(selecting)
851 {
852 wakeup();
853 selecting=_selecting;
854 }
855
856 _idleTimeout.cancelAll();
857 _retryTimeout.cancelAll();
858 try
859 {
860 if (_selector != null)
861 _selector.close();
862 }
863 catch (IOException e)
864 {
865 Log.ignore(e);
866 }
867 _selector=null;
868 }
869 }
870 }
871
872
873 private static class ChangeSelectableChannel
874 {
875 final SelectableChannel _channel;
876 final Object _attachment;
877
878 public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
879 {
880 super();
881 _channel = channel;
882 _attachment = attachment;
883 }
884 }
885
886
887 private interface ChangeTask
888 {
889 public void run();
890 }
891 }