UniSet
1.4.0
|
00001 #ifndef UNetReceiver_H_ 00002 #define UNetReceiver_H_ 00003 // ----------------------------------------------------------------------------- 00004 #include <ostream> 00005 #include <string> 00006 #include <queue> 00007 #include <cc++/socket.h> 00008 #include <sigc++/sigc++.h> 00009 #include "UniSetObject_LT.h" 00010 #include "Trigger.h" 00011 #include "Mutex.h" 00012 #include "SMInterface.h" 00013 #include "SharedMemory.h" 00014 #include "ThreadCreator.h" 00015 #include "UDPPacket.h" 00016 // ----------------------------------------------------------------------------- 00017 /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP. 00018 * =============== 00019 * Собственно реализация сделана так: 00020 * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности 00021 * что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета 00022 * (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд", 00023 * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout) 00024 * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше.. 00025 * Всё это реализовано в функции UNetReceiver::real_update() 00026 * 00027 * КЭШ 00028 * === 00029 * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов. 00030 * Кэш расчитан на то, что принимаемые пакеты всегда имеют одну и ту же длину и последовательность. 00031 * Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо). 00032 * Порядковый номер данных в пакете является индексом в кэше. 00033 * Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем, 00034 * ID который пришёл в пакете - элемент кэша обновляется. 00035 * Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется. 00036 * 00037 * Обработка сбоя или переполнения счётчика пакетов(перехода через максимум) 00038 * ========================================================================= 00039 * Для защиты от сбоя счётика сделана следующая логика: 00040 * Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается, 00041 * что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью. 00042 * При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет, 00043 * в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка. 00044 * Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься 00045 * и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные 00046 * затирают старые, если их не успели вынуть и обработать. 00047 * \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди. 00048 */ 00049 // ----------------------------------------------------------------------------- 00050 class UNetReceiver 00051 { 00052 public: 00053 UNetReceiver( const std::string host, const ost::tpport_t port, SMInterface* smi ); 00054 ~UNetReceiver(); 00055 00056 void start(); 00057 void stop(); 00058 00059 void receive(); 00060 void update(); 00061 00062 inline std::string getName(){ return myname; } 00063 00064 // блокировать сохранение данный в SM 00065 void setLockUpdate( bool st ); 00066 00067 void resetTimeout(); 00068 00069 inline bool isRecvOK(){ return !ptRecvTimeout.checkTime(); } 00070 inline unsigned long getLostPacketsNum(){ return lostPackets; } 00071 00072 void setReceiveTimeout( timeout_t msec ); 00073 void setReceivePause( timeout_t msec ); 00074 void setUpdatePause( timeout_t msec ); 00075 void setLostTimeout( timeout_t msec ); 00076 void setMaxDifferens( unsigned long set ); 00077 00078 void setRespondID( UniSetTypes::ObjectId id, bool invert=false ); 00079 void setLostPacketsID( UniSetTypes::ObjectId id ); 00080 00081 void setMaxProcessingCount( int set ); 00082 00083 inline ost::IPV4Address getAddress(){ return addr; } 00084 inline ost::tpport_t getPort(){ return port; } 00085 00087 enum Event 00088 { 00089 evOK, 00090 evTimeout 00091 }; 00092 00093 typedef sigc::slot<void,UNetReceiver*,Event> EventSlot; 00094 void connectEvent( EventSlot sl ); 00095 00096 protected: 00097 00098 SMInterface* shm; 00099 00100 bool recv(); 00101 void step(); 00102 void real_update(); 00103 00104 void initIterators(); 00105 00106 private: 00107 UNetReceiver(); 00108 00109 int recvpause; 00110 int updatepause; 00112 ost::UDPReceive* udp; 00113 ost::IPV4Address addr; 00114 ost::tpport_t port; 00115 std::string myname; 00116 00117 UniSetTypes::uniset_mutex pollMutex; 00118 PassiveTimer ptRecvTimeout; 00119 timeout_t recvTimeout; 00120 timeout_t lostTimeout; 00121 PassiveTimer ptLostTimeout; 00122 unsigned long lostPackets; 00124 UniSetTypes::ObjectId sidRespond; 00125 IOController::DIOStateList::iterator ditRespond; 00126 bool respondInvert; 00127 UniSetTypes::ObjectId sidLostPackets; 00128 IOController::AIOStateList::iterator aitLostPackets; 00129 00130 bool activated; 00131 00132 ThreadCreator<UNetReceiver>* r_thr; // receive thread 00133 ThreadCreator<UNetReceiver>* u_thr; // update thread 00134 00135 // функция определения приоритетного сообщения для обработки 00136 struct PacketCompare: 00137 public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool> 00138 { 00139 inline bool operator()(const UniSetUDP::UDPMessage& lhs, 00140 const UniSetUDP::UDPMessage& rhs) const 00141 { return lhs.num > rhs.num; } 00142 }; 00143 typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue; 00144 PacketQueue qpack; 00145 UniSetUDP::UDPMessage pack; 00146 UniSetUDP::UDPPacket r_buf; 00147 UniSetTypes::uniset_mutex packMutex; 00148 unsigned long pnum; 00153 unsigned long maxDifferens; 00154 00155 PacketQueue qtmp; 00156 bool waitClean; 00157 unsigned long rnum; 00159 int maxProcessingCount; 00161 bool lockUpdate; 00162 UniSetTypes::uniset_mutex lockMutex; 00163 00164 EventSlot slEvent; 00165 Trigger trTimeout; 00166 UniSetTypes::uniset_mutex tmMutex; 00167 00168 struct ItemInfo 00169 { 00170 long id; 00171 IOController::AIOStateList::iterator ait; 00172 IOController::DIOStateList::iterator dit; 00173 UniversalIO::IOTypes iotype; 00174 00175 ItemInfo(): 00176 id(UniSetTypes::DefaultObjectId), 00177 iotype(UniversalIO::UnknownIOType){} 00178 }; 00179 00180 typedef std::vector<ItemInfo> ItemVec; 00181 ItemVec d_icache; 00182 ItemVec a_icache; 00184 bool d_cache_init_ok; 00185 bool a_cache_init_ok; 00186 00187 void initDCache( UniSetUDP::UDPMessage& pack, bool force=false ); 00188 void initACache( UniSetUDP::UDPMessage& pack, bool force=false ); 00189 }; 00190 // ----------------------------------------------------------------------------- 00191 #endif // UNetReceiver_H_ 00192 // -----------------------------------------------------------------------------