My Project 3.2.0
C++ Distributed Hash Table
Loading...
Searching...
No Matches
network_utils.h
1/*
2 * Copyright (C) 2014-2023 Savoir-faire Linux Inc.
3 * Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18#pragma once
19
20#include "def.h"
21
22#include "sockaddr.h"
23#include "utils.h"
24#include "logger.h"
25
26#ifdef _WIN32
27#include <ws2tcpip.h>
28#include <winsock2.h>
29#else
30#include <sys/socket.h>
31#include <netinet/in.h>
32#include <unistd.h>
33#endif
34
35#include <functional>
36#include <thread>
37#include <atomic>
38#include <mutex>
39#include <list>
40
41namespace dht {
42namespace net {
43
44static const constexpr in_port_t DHT_DEFAULT_PORT = 4222;
45static const constexpr size_t RX_QUEUE_MAX_SIZE = 1024 * 64;
46static const constexpr std::chrono::milliseconds RX_QUEUE_MAX_DELAY(650);
47
48int bindSocket(const SockAddr& addr, SockAddr& bound);
49
50bool setNonblocking(int fd, bool nonblocking = true);
51
52#ifdef _WIN32
53void udpPipe(int fds[2]);
54#endif
56 Blob data;
57 SockAddr from;
58 time_point received;
59};
60using PacketList = std::list<ReceivedPacket>;
61
62class OPENDHT_PUBLIC DatagramSocket {
63public:
67 using OnReceive = std::function<PacketList(PacketList&& packets)>;
68 virtual ~DatagramSocket() {};
69
70 virtual int sendTo(const SockAddr& dest, const uint8_t* data, size_t size, bool replied) = 0;
71
72 inline void setOnReceive(OnReceive&& cb) {
73 std::lock_guard<std::mutex> lk(lock);
74 rx_callback = std::move(cb);
75 }
76
77 virtual bool hasIPv4() const = 0;
78 virtual bool hasIPv6() const = 0;
79
80 SockAddr getBound(sa_family_t family = AF_UNSPEC) const {
81 std::lock_guard<std::mutex> lk(lock);
82 return getBoundRef(family);
83 }
84 in_port_t getPort(sa_family_t family = AF_UNSPEC) const {
85 std::lock_guard<std::mutex> lk(lock);
86 return getBoundRef(family).getPort();
87 }
88
89 virtual const SockAddr& getBoundRef(sa_family_t family = AF_UNSPEC) const = 0;
90
92 virtual std::vector<SockAddr> resolve(const std::string& host, const std::string& service = {}) {
93 return SockAddr::resolve(host, service);
94 }
95
96 virtual void stop() = 0;
97protected:
98
99 PacketList getNewPacket() {
100 PacketList pkts;
101 if (toRecycle_.empty()) {
102 pkts.emplace_back();
103 } else {
104 auto begIt = toRecycle_.begin();
105 auto begItNext = std::next(begIt);
106 pkts.splice(pkts.end(), toRecycle_, begIt, begItNext);
107 }
108 return pkts;
109 }
110
111 inline void onReceived(PacketList&& packets) {
112 std::lock_guard<std::mutex> lk(lock);
113 if (rx_callback) {
114 auto r = rx_callback(std::move(packets));
115 if (not r.empty() and toRecycle_.size() < RX_QUEUE_MAX_SIZE)
116 toRecycle_.splice(toRecycle_.end(), std::move(r));
117 }
118 }
119protected:
120 mutable std::mutex lock;
121private:
122 OnReceive rx_callback;
123 PacketList toRecycle_;
124};
125
126class OPENDHT_PUBLIC UdpSocket : public DatagramSocket {
127public:
128 UdpSocket(in_port_t port, const std::shared_ptr<Logger>& l = {});
129 UdpSocket(const SockAddr& bind4, const SockAddr& bind6, const std::shared_ptr<Logger>& l = {});
130 ~UdpSocket();
131
132 int sendTo(const SockAddr& dest, const uint8_t* data, size_t size, bool replied) override;
133
134 const SockAddr& getBoundRef(sa_family_t family = AF_UNSPEC) const override {
135 return (family == AF_INET6) ? bound6 : bound4;
136 }
137
138 bool hasIPv4() const override {
139 std::lock_guard<std::mutex> lk(lock);
140 return s4 != -1;
141 }
142 bool hasIPv6() const override {
143 std::lock_guard<std::mutex> lk(lock);
144 return s6 != -1;
145 }
146
147 void stop() override;
148private:
149 std::shared_ptr<Logger> logger;
150 int s4 {-1};
151 int s6 {-1};
152 int stopfd {-1};
153 SockAddr bound4, bound6;
154 std::thread rcv_thread {};
155 std::atomic_bool running {false};
156
157 void openSockets(const SockAddr& bind4, const SockAddr& bind6);
158};
159
160}
161}
virtual std::vector< SockAddr > resolve(const std::string &host, const std::string &service={})
std::function< PacketList(PacketList &&packets)> OnReceive
std::vector< uint8_t > Blob
Definition utils.h:151