UniSet  1.4.0
include/UNetReceiver.h
00001 #ifndef UNetReceiver_H_
00002 #define UNetReceiver_H_
00003 // -----------------------------------------------------------------------------
00004 #include <ostream>
00005 #include <string>
00006 #include <queue>
00007 #include <cc++/socket.h>
00008 #include <sigc++/sigc++.h>
00009 #include "UniSetObject_LT.h"
00010 #include "Trigger.h"
00011 #include "Mutex.h"
00012 #include "SMInterface.h"
00013 #include "SharedMemory.h"
00014 #include "ThreadCreator.h"
00015 #include "UDPPacket.h"
00016 // -----------------------------------------------------------------------------
00017 /*  Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
00018  * ===============
00019  * Собственно реализация сделана так:
00020  * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
00021  * что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета
00022  * (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд",
00023  * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
00024  * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше..
00025  * Всё это реализовано в функции UNetReceiver::real_update()
00026  *
00027  * КЭШ
00028  * ===
00029  * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
00030  * Кэш расчитан на то, что принимаемые пакеты всегда имеют одну и ту же длину и последовательность.
00031  * Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
00032  * Порядковый номер данных в пакете является индексом в кэше.
00033  * Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
00034  * ID который пришёл в пакете - элемент кэша обновляется.
00035  * Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
00036  *
00037  * Обработка сбоя или переполнения счётчика пакетов(перехода через максимум)
00038  * =========================================================================
00039  * Для защиты от сбоя счётика сделана следующая логика:
00040  * Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается,
00041  * что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью.
00042  * При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет,
00043  * в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка.
00044  * Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься
00045  * и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные
00046  * затирают старые, если их не успели вынуть и обработать.
00047  * \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
00048 */
00049 // -----------------------------------------------------------------------------
00050 class UNetReceiver
00051 {
00052     public:
00053         UNetReceiver( const std::string host, const ost::tpport_t port, SMInterface* smi );
00054         ~UNetReceiver();
00055 
00056          void start();
00057          void stop();
00058 
00059          void receive();
00060          void update();
00061 
00062          inline std::string getName(){ return myname; }
00063 
00064          // блокировать сохранение данный в SM
00065          void setLockUpdate( bool st );
00066 
00067          void resetTimeout();
00068 
00069          inline bool isRecvOK(){ return !ptRecvTimeout.checkTime(); }
00070          inline unsigned long getLostPacketsNum(){ return lostPackets; }
00071 
00072          void setReceiveTimeout( timeout_t msec );
00073          void setReceivePause( timeout_t msec );
00074          void setUpdatePause( timeout_t msec );
00075          void setLostTimeout( timeout_t msec );
00076          void setMaxDifferens( unsigned long set );
00077 
00078          void setRespondID( UniSetTypes::ObjectId id, bool invert=false );
00079          void setLostPacketsID( UniSetTypes::ObjectId id );
00080 
00081          void setMaxProcessingCount( int set );
00082 
00083          inline ost::IPV4Address getAddress(){ return addr; }
00084          inline ost::tpport_t getPort(){ return port; }
00085 
00087          enum Event
00088          {
00089              evOK,      
00090              evTimeout  
00091          };
00092 
00093          typedef sigc::slot<void,UNetReceiver*,Event> EventSlot;
00094          void connectEvent( EventSlot sl );
00095 
00096     protected:
00097 
00098         SMInterface* shm;
00099 
00100         bool recv();
00101         void step();
00102         void real_update();
00103 
00104         void initIterators();
00105 
00106     private:
00107         UNetReceiver();
00108 
00109         int recvpause;      
00110         int updatepause;    
00112         ost::UDPReceive* udp;
00113         ost::IPV4Address addr;
00114         ost::tpport_t port;
00115         std::string myname;
00116 
00117         UniSetTypes::uniset_mutex pollMutex;
00118         PassiveTimer ptRecvTimeout;
00119         timeout_t recvTimeout;
00120         timeout_t lostTimeout;
00121         PassiveTimer ptLostTimeout;
00122         unsigned long lostPackets; 
00124         UniSetTypes::ObjectId sidRespond;
00125         IOController::DIOStateList::iterator ditRespond;
00126         bool respondInvert;
00127         UniSetTypes::ObjectId sidLostPackets;
00128         IOController::AIOStateList::iterator aitLostPackets;
00129 
00130         bool activated;
00131         
00132         ThreadCreator<UNetReceiver>* r_thr;     // receive thread
00133         ThreadCreator<UNetReceiver>* u_thr;     // update thread
00134 
00135         // функция определения приоритетного сообщения для обработки
00136         struct PacketCompare:
00137         public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
00138         {
00139             inline bool operator()(const UniSetUDP::UDPMessage& lhs,
00140                             const UniSetUDP::UDPMessage& rhs) const
00141                     { return lhs.num > rhs.num; }
00142         };
00143         typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue;
00144         PacketQueue qpack;  
00145         UniSetUDP::UDPMessage pack;     
00146         UniSetUDP::UDPPacket r_buf;
00147         UniSetTypes::uniset_mutex packMutex; 
00148         unsigned long pnum; 
00153         unsigned long maxDifferens;
00154 
00155         PacketQueue qtmp;   
00156         bool waitClean;     
00157         unsigned long rnum; 
00159         int maxProcessingCount; 
00161         bool lockUpdate; 
00162         UniSetTypes::uniset_mutex lockMutex;
00163         
00164         EventSlot slEvent;
00165         Trigger trTimeout;
00166         UniSetTypes::uniset_mutex tmMutex;
00167 
00168         struct ItemInfo
00169         {
00170             long id;
00171             IOController::AIOStateList::iterator ait;
00172             IOController::DIOStateList::iterator dit;
00173             UniversalIO::IOTypes iotype;
00174 
00175             ItemInfo():
00176                 id(UniSetTypes::DefaultObjectId),
00177                 iotype(UniversalIO::UnknownIOType){}
00178         };
00179 
00180         typedef std::vector<ItemInfo> ItemVec;
00181         ItemVec d_icache;   
00182         ItemVec a_icache;   
00184         bool d_cache_init_ok;
00185         bool a_cache_init_ok;
00186 
00187         void initDCache( UniSetUDP::UDPMessage& pack, bool force=false );
00188         void initACache( UniSetUDP::UDPMessage& pack, bool force=false );
00189 };
00190 // -----------------------------------------------------------------------------
00191 #endif // UNetReceiver_H_
00192 // -----------------------------------------------------------------------------