UniSet  2.8.0
UNetReceiver.h
1 /*
2  * Copyright (c) 2015 Pavel Vainerman.
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License as
6  * published by the Free Software Foundation, version 2.1.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  * Lesser General Lesser Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  */
16 // -----------------------------------------------------------------------------
17 #ifndef UNetReceiver_H_
18 #define UNetReceiver_H_
19 // -----------------------------------------------------------------------------
20 #include <ostream>
21 #include <memory>
22 #include <string>
23 #include <queue>
24 #include <unordered_map>
25 #include <sigc++/sigc++.h>
26 #include <ev++.h>
27 #include "UniSetObject.h"
28 #include "Trigger.h"
29 #include "Mutex.h"
30 #include "SMInterface.h"
31 #include "SharedMemory.h"
32 #include "UDPPacket.h"
33 #include "CommonEventLoop.h"
34 #include "UDPCore.h"
35 // --------------------------------------------------------------------------
36 namespace uniset
37 {
38  // -----------------------------------------------------------------------------
39  /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
40  * ===============
41  * Собственно реализация сделана так:
42  * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
43  * что были посланы, сделана очередь с приоритетом. В качестве приоритета используется номер пакета
44  * (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд",
45  * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
46  * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше..
47  * Всё это реализовано в функции UNetReceiver::real_update()
48  *
49  * КЭШ
50  * ===
51  * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
52  * Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
53  * Порядковый номер данных в пакете является индексом в кэше.
54  * Для защиты от изменения последовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
55  * ID который пришёл в пакете - элемент кэша обновляется.
56  * Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
57  *
58  * КЭШ (ДОПОЛНЕНИЕ)
59  * ===
60  * Т.к. в общем случае, данные могут быть разбиты не несколько (много) пакетов, то для каждого из них выделен свой кэш и создан отдельный
61  * map, ключом в котором является идентификатор данных (см. UDPMessage::getDataID()).
62  * Кэш в map добавляется когда приходит пакет с новым UDPMessage::getDataID() и в дальнейшим используется для этого пакета.
63  * В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и рассчитан на статичность пакетов,
64  * т.е. на то что UNetSender не будет с течением времени менять структуру отправляемых пакетов.
65  *
66  * Обработка сбоя или переполнения счётчика пакетов(перехода через максимум)
67  * =========================================================================
68  * Для защиты от сбоя счётчика сделана следующая логика:
69  * Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается,
70  * что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью.
71  * При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет,
72  * в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка.
73  * Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься
74  * и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные
75  * затирают старые, если их не успели вынуть и обработать.
76  * \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
77  * =========================================================================
78  * ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем..
79  *
80  * Создание соединения (открытие сокета)
81  * ======================================
82  * Попытка создать сокет производиться сразу в конструкторе, если это не получается,
83  * то создаётся таймер (evCheckConnection), который периодически (checkConnectionTime) пытается вновь
84  * открыть сокет.. и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
85  * (в момент создания объекта UNetReceiver) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
86  * ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
87  * Если такая логика не требуется, то можно задать в конструкторе
88  * последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет
89  * выкинуто исключение при неудачной попытке создания соединения.
90  *
91  * Стратегия обновления данных в SM
92  * ==================================
93  * При помощи функции setUpdateStrategy() можно выбрать стратегию обновления данных в SM.
94  * Поддерживается два варианта:
95  * 'thread' - отдельный поток обновления
96  * 'evloop' - использование общего с приёмом event loop (libev)
97  */
98  // -----------------------------------------------------------------------------
99  class UNetReceiver:
100  protected EvWatcher,
101  public std::enable_shared_from_this<UNetReceiver>
102  {
103  public:
104  UNetReceiver( const std::string& host, int port, const std::shared_ptr<SMInterface>& smi
105  , bool nocheckConnection = false
106  , const std::string& prefix = "unet" );
107  virtual ~UNetReceiver();
108 
109  void start();
110  void stop();
111 
112  inline const std::string getName() const
113  {
114  return myname;
115  }
116 
117  // блокировать сохранение данных в SM
118  void setLockUpdate( bool st ) noexcept;
119  bool isLockUpdate() const noexcept;
120 
121  void resetTimeout() noexcept;
122 
123  bool isInitOK() const noexcept;
124  bool isRecvOK() const noexcept;
125  size_t getLostPacketsNum() const noexcept;
126 
127  void setReceiveTimeout( timeout_t msec ) noexcept;
128  void setReceivePause( timeout_t msec ) noexcept;
129  void setUpdatePause( timeout_t msec ) noexcept;
130  void setLostTimeout( timeout_t msec ) noexcept;
131  void setPrepareTime( timeout_t msec ) noexcept;
132  void setCheckConnectionPause( timeout_t msec ) noexcept;
133  void setMaxDifferens( unsigned long set ) noexcept;
134  void setEvrunTimeout(timeout_t msec ) noexcept;
135  void setInitPause( timeout_t msec ) noexcept;
136 
137  void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept;
138  void setLostPacketsID( uniset::ObjectId id ) noexcept;
139 
140  void setMaxProcessingCount( int set ) noexcept;
141 
142  void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись
143 
144  inline std::string getAddress() const noexcept
145  {
146  return addr;
147  }
148  inline int getPort() const noexcept
149  {
150  return port;
151  }
152 
154  enum Event
155  {
158  };
159 
160  typedef sigc::slot<void, const std::shared_ptr<UNetReceiver>&, Event> EventSlot;
161  void connectEvent( EventSlot sl ) noexcept;
162 
163  // --------------------------------------------------------------------
166  {
167  useUpdateUnknown,
170  };
171 
172  static UpdateStrategy strToUpdateStrategy( const std::string& s ) noexcept;
173  static std::string to_string( UpdateStrategy s) noexcept;
174 
176  void setUpdateStrategy( UpdateStrategy set );
177 
178  // специальная обёртка, захватывающая или нет mutex в зависимости от стратегии
179  // (т.к. при evloop mutex захватывать не нужно)
181  {
182  public:
183  pack_guard( std::mutex& m, UpdateStrategy s );
184  ~pack_guard();
185 
186  protected:
187  std::mutex& m;
188  UpdateStrategy s;
189  };
190 
191  // --------------------------------------------------------------------
192 
193  inline std::shared_ptr<DebugStream> getLog()
194  {
195  return unetlog;
196  }
197 
198  virtual const std::string getShortInfo() const noexcept;
199 
200  protected:
201 
202  const std::shared_ptr<SMInterface> shm;
203  std::shared_ptr<DebugStream> unetlog;
204 
205  bool receive() noexcept;
206  void step() noexcept;
207  void update() noexcept;
208  void updateThread() noexcept;
209  void callback( ev::io& watcher, int revents ) noexcept;
210  void readEvent( ev::io& watcher ) noexcept;
211  void updateEvent( ev::periodic& watcher, int revents ) noexcept;
212  void checkConnectionEvent( ev::periodic& watcher, int revents ) noexcept;
213  void statisticsEvent( ev::periodic& watcher, int revents ) noexcept;
214  void initEvent( ev::timer& watcher, int revents ) noexcept;
215  virtual void evprepare( const ev::loop_ref& eloop ) noexcept override;
216  virtual void evfinish(const ev::loop_ref& eloop ) noexcept override;
217  virtual std::string wname() const noexcept override
218  {
219  return myname;
220  }
221 
222  void initIterators() noexcept;
223  bool createConnection( bool throwEx = false );
224  void checkConnection();
225 
226  public:
227 
228  // функция определения приоритетного сообщения для обработки
230  public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
231  {
232  inline bool operator()(const UniSetUDP::UDPMessage& lhs,
233  const UniSetUDP::UDPMessage& rhs) const
234  {
235  return lhs.num > rhs.num;
236  }
237  };
238 
239  typedef std::priority_queue<UniSetUDP::UDPMessage, std::vector<UniSetUDP::UDPMessage>, PacketCompare> PacketQueue;
240 
241  private:
242  UNetReceiver();
243 
244  timeout_t recvpause = { 10 };
245  timeout_t updatepause = { 100 };
247  std::unique_ptr<UDPReceiveU> udp;
248  std::string addr;
249  int port = { 0 };
250  Poco::Net::SocketAddress saddr;
251  std::string myname;
252  ev::io evReceive;
253  ev::periodic evCheckConnection;
254  ev::periodic evStatistic;
255  ev::periodic evUpdate;
256  ev::timer evInitPause;
257 
258  UpdateStrategy upStrategy = { useUpdateEventLoop };
259 
260  // счётчики для подсчёта статистики
261  size_t recvCount = { 0 };
262  size_t upCount = { 0 };
263 
264  // текущая статистик
265  size_t statRecvPerSec = { 0 };
266  size_t statUpPerSec = { 0 };
268  std::unique_ptr< ThreadCreator<UNetReceiver> > upThread; // update thread
269 
270  // делаем loop общим.. одним на всех!
271  static CommonEventLoop loop;
272 
273  double checkConnectionTime = { 10.0 }; // sec
274  std::mutex checkConnMutex;
275 
276  PassiveTimer ptRecvTimeout;
277  PassiveTimer ptPrepare;
278  timeout_t recvTimeout = { 5000 }; // msec
279  timeout_t prepareTime = { 2000 };
280  timeout_t evrunTimeout = { 15000 };
281  timeout_t lostTimeout = { 200 };
282 
283  double initPause = { 5.0 }; // пауза на начальную инициализацию (сек)
284  std::atomic_bool initOK = { false };
285 
286  PassiveTimer ptLostTimeout;
287  size_t lostPackets = { 0 };
289  uniset::ObjectId sidRespond = { uniset::DefaultObjectId };
290  IOController::IOStateList::iterator itRespond;
291  bool respondInvert = { false };
292  uniset::ObjectId sidLostPackets;
293  IOController::IOStateList::iterator itLostPackets;
294 
295  std::atomic_bool activated = { false };
296 
297  PacketQueue qpack;
298  UniSetUDP::UDPMessage pack;
299  UniSetUDP::UDPPacket r_buf;
300  std::mutex packMutex;
301  size_t pnum = { 0 };
306  size_t maxDifferens = { 20 };
307 
308  PacketQueue qtmp;
309  bool waitClean = { false };
310  size_t rnum = { 0 };
312  size_t maxProcessingCount = { 100 };
314  std::atomic_bool lockUpdate = { false };
316  EventSlot slEvent;
317  Trigger trTimeout;
318  std::mutex tmMutex;
319 
320  struct CacheItem
321  {
322  long id = { uniset::DefaultObjectId };
323  IOController::IOStateList::iterator ioit;
324 
325  CacheItem():
326  id(uniset::DefaultObjectId) {}
327  };
328 
329  typedef std::vector<CacheItem> CacheVec;
330  struct CacheInfo
331  {
332  CacheInfo():
333  cache_init_ok(false) {}
334 
335  bool cache_init_ok = { false };
336  CacheVec cache;
337  };
338 
339  // ключом является UDPMessage::getDataID()
340  typedef std::unordered_map<long, CacheInfo> CacheMap;
341  CacheMap d_icache_map;
342  CacheMap a_icache_map;
344  bool d_cache_init_ok = { false };
345  bool a_cache_init_ok = { false };
346 
347  void initDCache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept;
348  void initACache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept;
349  };
350  // --------------------------------------------------------------------------
351 } // end of namespace uniset
352 // -----------------------------------------------------------------------------
353 #endif // UNetReceiver_H_
354 // -----------------------------------------------------------------------------
Definition: DebugStream.h:91
Definition: SMInterface.h:31
Definition: CommonEventLoop.h:14
Definition: UNetReceiver.h:168
Definition: CommonEventLoop.h:18
Definition: UNetReceiver.h:157
const ObjectId DefaultObjectId
Definition: UniSetTypes.h:69
Definition: UDPPacket.h:106
Definition: UNetReceiver.h:229
Event
Definition: UNetReceiver.h:154
void setUpdateStrategy(UpdateStrategy set)
функция должна вызываться до первого вызова start()
Definition: UNetReceiver.cc:968
Definition: UNetReceiver.h:180
Definition: UNetReceiver.h:156
Definition: UNetReceiver.h:99
UpdateStrategy
Definition: UNetReceiver.h:165
Definition: UNetReceiver.h:169
long ObjectId
Definition: UniSetTypes_i.idl:30