UniSet  2.8.0
UNetSender.h
1 /*
2  * Copyright (c) 2015 Pavel Vainerman.
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License as
6  * published by the Free Software Foundation, version 2.1.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  * Lesser General Lesser Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  */
16 // -----------------------------------------------------------------------------
17 #ifndef UNetSender_H_
18 #define UNetSender_H_
19 // -----------------------------------------------------------------------------
20 #include <ostream>
21 #include <string>
22 #include <vector>
23 #include <limits>
24 #include <unordered_map>
25 #include "UniSetObject.h"
26 #include "Trigger.h"
27 #include "Mutex.h"
28 #include "SMInterface.h"
29 #include "SharedMemory.h"
30 #include "ThreadCreator.h"
31 #include "UDPCore.h"
32 #include "UDPPacket.h"
33 // --------------------------------------------------------------------------
34 namespace uniset
35 {
36  // -----------------------------------------------------------------------------
37  /*
38  * Распределение датчиков по пакетам
39  * =========================================================================
40  * Все пересылаемые данные разбиваются на группы по частоте посылки("sendfactor").
41  * Частота посылки кратна sendpause, задаётся для каждого датчика, при помощи свойства prefix_sendfactor.
42  * Внутри каждой группы пакеты набираются по мере "заполнения".
43  *
44  * Добавление датчика в пакет и создание нового пакета при переполнении происходит в функции initItem().
45  * Причем так как дискретные и аналоговые датчики обрабатываются отдельно (но пересылаются в одном пакете),
46  * то датчики, которые первые переполнятся приводят к тому, что создаётся новый пакет и они добавляются в него,
47  * в свою очередь остальные продолжают "добивать" предыдущий пакет.
48  * В initItem() каждому UItem в dlist кроме pack_ind присваивается еще и номер пакета pack_num, который гарантировано соответствует
49  * существующему пакету, поэтому в дальнейшем при использовании pack_num в качестве ключа в mypacks мы не проверяем пакет на существование.
50  *
51  * ОПТИМИЗАЦИЯ N1: Для оптимизации обработки посылаемых пакетов (на стороне UNetSender) сделана следующая логика:
52  * Номер очередного посылаемого пакета меняется (увеличивается) только, если изменились данные с момента
53  последней посылки. Для этого по данным каждый раз производится расчёт UNetUDP::makeCRC() и сравнивается с последним.
54  На стороне UNetReceiver пакеты с повторными номерами (т.е. уже обработанные) - откидываются.
55  *
56  *
57  * Создание соединения
58  * ======================================
59  * Попытка создать соединение производиться сразу в конструкторе, если это не получается,
60  * то в потоке "посылки", с заданным периодом (checkConnectionTime) идёт попытка создать соединение..
61  * и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
62  * (в момент создания объекта UNetSender) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
63  * ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
64  * Если такая логика не требуется, то можно задать в конструкторе флаг nocheckconnection=true,
65  * тогда при создании объекта UNetSender, в конструкторе будет
66  * выкинуто исключение при неудачной попытке создания соединения.
67  * \warning setCheckConnectionPause(msec) должно быть кратно sendpause!
68  */
69  class UNetSender
70  {
71  public:
72  UNetSender( const std::string& host, const int port, const std::shared_ptr<SMInterface>& smi
73  , bool nocheckConnection = false
74  , const std::string& s_field = ""
75  , const std::string& s_fvalue = ""
76  , const std::string& prop_prefix = "unet"
77  , const std::string& prefix = "unet"
78  , size_t maxDCount = UniSetUDP::MaxDCount
79  , size_t maxACount = UniSetUDP::MaxACount );
80 
81  virtual ~UNetSender();
82 
83  typedef size_t sendfactor_t;
84 
85  static const long not_specified_value = { std::numeric_limits<long>::max() };
86 
87  struct UItem
88  {
89  UItem():
90  iotype(UniversalIO::UnknownIOType),
92  pack_num(0),
93  pack_ind(0),
94  pack_sendfactor(0) {}
95 
96  UniversalIO::IOType iotype;
98  IOController::IOStateList::iterator ioit;
99  size_t pack_num;
100  size_t pack_ind;
101  sendfactor_t pack_sendfactor = { 0 };
102  long undefined_value = { not_specified_value };
103  friend std::ostream& operator<<( std::ostream& os, UItem& p );
104  };
105 
106  typedef std::unordered_map<uniset::ObjectId, UItem> UItemMap;
107 
108  size_t getDataPackCount() const;
109 
110  void start();
111  void stop();
112 
113  void send() noexcept;
114 
115  struct PackMessage
116  {
117  PackMessage( UniSetUDP::UDPMessage&& m ) noexcept: msg(std::move(m)) {}
118  PackMessage( const UniSetUDP::UDPMessage& m ) = delete;
119 
120  PackMessage() noexcept {}
121 
124  };
125 
126  void real_send( PackMessage& mypack ) noexcept;
127 
129  void updateFromSM();
130 
132  void updateSensor( uniset::ObjectId id, long value );
133 
135  void updateItem( UItem& it, long value );
136 
137  inline void setSendPause( int msec )
138  {
139  sendpause = msec;
140  }
141  inline void setPackSendPause( int msec )
142  {
143  packsendpause = msec;
144  }
145 
146  void setCheckConnectionPause( int msec );
147 
149  void askSensors( UniversalIO::UIOCommand cmd );
150 
152  void initIterators();
153 
154  inline std::shared_ptr<DebugStream> getLog()
155  {
156  return unetlog;
157  }
158 
159  virtual const std::string getShortInfo() const;
160 
161  inline std::string getAddress() const
162  {
163  return addr;
164  }
165  inline int getPort() const
166  {
167  return port;
168  }
169 
170  inline size_t getADataSize() const
171  {
172  return maxAData;
173  }
174  inline size_t getDDataSize() const
175  {
176  return maxDData;
177  }
178 
179  protected:
180 
181  std::string s_field = { "" };
182  std::string s_fvalue = { "" };
183  std::string prop_prefix = { "" };
184 
185  const std::shared_ptr<SMInterface> shm;
186  std::shared_ptr<DebugStream> unetlog;
187 
188  bool initItem( UniXML::iterator& it );
189  bool readItem( const std::shared_ptr<UniXML>& xml, UniXML::iterator& it, xmlNode* sec );
190 
191  void readConfiguration();
192 
193  bool createConnection( bool throwEx );
194 
195  private:
196  UNetSender();
197 
198  std::unique_ptr<UDPSocketU> udp;
199  std::string addr;
200  int port = { 0 };
201  std::string s_host = { "" };
202  Poco::Net::SocketAddress saddr;
203 
204  std::string myname = { "" };
205  timeout_t sendpause = { 150 };
206  timeout_t packsendpause = { 5 };
207  timeout_t writeTimeout = { 1000 }; // msec
208  std::atomic_bool activated = { false };
209  PassiveTimer ptCheckConnection;
210 
211  typedef std::unordered_map<sendfactor_t, std::vector<PackMessage>> Packs;
212 
213  // mypacks заполняется в начале и дальше с ним происходит только чтение
214  // поэтому mutex-ом его не защищаем
215  Packs mypacks;
216  std::unordered_map<sendfactor_t, size_t> packs_anum;
217  std::unordered_map<sendfactor_t, size_t> packs_dnum;
218  UItemMap items;
219  size_t packetnum = { 1 };
220  uint16_t lastcrc = { 0 };
221  UniSetUDP::UDPPacket s_msg;
222 
223  size_t maxAData = { UniSetUDP::MaxACount };
224  size_t maxDData = { UniSetUDP::MaxDCount };
225 
226  std::unique_ptr< ThreadCreator<UNetSender> > s_thr; // send thread
227 
228  size_t ncycle = { 0 };
230  };
231  // --------------------------------------------------------------------------
232 } // end of namespace uniset
233 // -----------------------------------------------------------------------------
234 #endif // UNetSender_H_
235 // -----------------------------------------------------------------------------
Definition: CommonEventLoop.h:14
Definition: UNetSender.h:87
Definition: UNetSender.h:69
void askSensors(UniversalIO::UIOCommand cmd)
Definition: UNetSender.cc:540
void updateSensor(uniset::ObjectId id, long value)
Definition: UNetSender.cc:182
const ObjectId DefaultObjectId
Definition: UniSetTypes.h:69
Definition: UDPPacket.h:106
void updateFromSM()
Definition: UNetSender.cc:155
Definition: UNetSender.h:115
Definition: Mutex.h:31
void updateItem(UItem &it, long value)
Definition: UNetSender.cc:193
void initIterators()
Definition: UNetSender.cc:534
long ObjectId
Definition: UniSetTypes_i.idl:30