My Project 3.2.0
C++ Distributed Hash Table
Loading...
Searching...
No Matches
dhtrunner.h
1/*
2 * Copyright (C) 2014-2023 Savoir-faire Linux Inc.
3 * Authors: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4 * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5 * Sébastien Blin <sebastien.blin@savoirfairelinux.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include "def.h"
24#include "infohash.h"
25#include "value.h"
26#include "callbacks.h"
27#include "sockaddr.h"
28#include "logger.h"
29#include "network_utils.h"
30#include "node_export.h"
31
32#include <thread>
33#include <mutex>
34#include <atomic>
35#include <condition_variable>
36#include <future>
37#include <exception>
38#include <queue>
39#include <chrono>
40
41namespace dht {
42
43struct Node;
44class SecureDht;
45class PeerDiscovery;
46struct SecureDhtConfig;
47
54class OPENDHT_PUBLIC DhtRunner {
55
56public:
57 using StatusCallback = std::function<void(NodeStatus, NodeStatus)>;
58
59 struct Config {
60 SecureDhtConfig dht_config {};
61 bool threaded {true};
62 std::string proxy_server {};
63 std::string push_node_id {};
64 std::string push_token {};
65 std::string push_topic {};
66 std::string push_platform {};
67 bool peer_discovery {false};
68 bool peer_publish {false};
69 std::shared_ptr<dht::crypto::Certificate> server_ca;
70 dht::crypto::Identity client_identity;
71 SockAddr bind4 {}, bind6 {};
72 };
73
74 struct Context {
75 std::shared_ptr<Logger> logger {};
76 std::unique_ptr<net::DatagramSocket> sock;
77 std::shared_ptr<PeerDiscovery> peerDiscovery {};
78 StatusCallback statusChangedCallback {};
79 CertificateStoreQuery certificateStore {};
80 IdentityAnnouncedCb identityAnnouncedCb {};
81 PublicAddressChangedCb publicAddressChangedCb {};
82 std::unique_ptr<std::mt19937_64> rng {};
83 Context() {}
84 };
85
86 DhtRunner();
87 virtual ~DhtRunner();
88
89 void get(InfoHash id, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter f = {}, Where w = {}) {
90 get(id, bindGetCb(cb), donecb, f, w);
91 }
92
93 void get(InfoHash id, GetCallbackSimple cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) {
94 get(id, bindGetCb(cb), donecb, f, w);
95 }
96
97 void get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f={}, Where w = {});
98
99 void get(InfoHash id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) {
100 get(id, cb, bindDoneCb(donecb), f, w);
101 }
102 void get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb={}, Value::Filter f = {}, Where w = {});
103
104 template <class T>
105 void get(InfoHash hash, std::function<bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb={})
106 {
107 get(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
108 return cb(unpackVector<T>(vals));
109 },
110 dcb,
111 getFilterSet<T>());
112 }
113 template <class T>
114 void get(InfoHash hash, std::function<bool(T&&)> cb, DoneCallbackSimple dcb={})
115 {
116 get(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
117 for (const auto& v : vals) {
118 try {
119 if (not cb(Value::unpack<T>(*v)))
120 return false;
121 } catch (const std::exception&) {
122 continue;
123 }
124 }
125 return true;
126 },
127 dcb,
128 getFilterSet<T>());
129 }
130
131 std::future<std::vector<std::shared_ptr<dht::Value>>> get(InfoHash key, Value::Filter f = {}, Where w = {}) {
132 auto p = std::make_shared<std::promise<std::vector<std::shared_ptr< dht::Value >>>>();
133 auto values = std::make_shared<std::vector<std::shared_ptr< dht::Value >>>();
134 get(key, [=](const std::vector<std::shared_ptr<dht::Value>>& vlist) {
135 values->insert(values->end(), vlist.begin(), vlist.end());
136 return true;
137 }, [=](bool) {
138 p->set_value(std::move(*values));
139 },
140 f, w);
141 return p->get_future();
142 }
143
144 template <class T>
145 std::future<std::vector<T>> get(InfoHash key) {
146 auto p = std::make_shared<std::promise<std::vector<T>>>();
147 auto values = std::make_shared<std::vector<T>>();
148 get<T>(key, [=](T&& v) {
149 values->emplace_back(std::move(v));
150 return true;
151 }, [=](bool) {
152 p->set_value(std::move(*values));
153 });
154 return p->get_future();
155 }
156
157 void query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb = {}, Query q = {});
158 void query(const InfoHash& hash, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query q = {}) {
159 query(hash, cb, bindDoneCb(done_cb), q);
160 }
161
162 std::future<size_t> listen(InfoHash key, ValueCallback vcb, Value::Filter f = {}, Where w = {});
163
164 std::future<size_t> listen(InfoHash key, GetCallback cb, Value::Filter f={}, Where w={}) {
165 return listen(key, [cb=std::move(cb)](const std::vector<Sp<Value>>& vals, bool expired){
166 if (not expired)
167 return cb(vals);
168 return true;
169 }, std::forward<Value::Filter>(f), std::forward<Where>(w));
170 }
171 std::future<size_t> listen(const std::string& key, GetCallback vcb, Value::Filter f = {}, Where w = {});
172 std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = {}, Where w = {}) {
173 return listen(key, bindGetCb(cb), f, w);
174 }
175
176 template <class T>
177 std::future<size_t> listen(InfoHash hash, std::function<bool(std::vector<T>&&)> cb)
178 {
179 return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
180 return cb(unpackVector<T>(vals));
181 },
182 getFilterSet<T>());
183 }
184 template <class T>
185 std::future<size_t> listen(InfoHash hash, std::function<bool(std::vector<T>&&, bool)> cb)
186 {
187 return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals, bool expired) {
188 return cb(unpackVector<T>(vals), expired);
189 },
190 getFilterSet<T>());
191 }
192
193 template <typename T>
194 std::future<size_t> listen(InfoHash hash, std::function<bool(T&&)> cb, Value::Filter f = {}, Where w = {})
195 {
196 return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
197 for (const auto& v : vals) {
198 try {
199 if (not cb(Value::unpack<T>(*v)))
200 return false;
201 } catch (const std::exception&) {
202 continue;
203 }
204 }
205 return true;
206 },
207 getFilterSet<T>(f), w);
208 }
209 template <typename T>
210 std::future<size_t> listen(InfoHash hash, std::function<bool(T&&, bool)> cb, Value::Filter f = {}, Where w = {})
211 {
212 return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals, bool expired) {
213 for (const auto& v : vals) {
214 try {
215 if (not cb(Value::unpack<T>(*v), expired))
216 return false;
217 } catch (const std::exception&) {
218 continue;
219 }
220 }
221 return true;
222 },
223 getFilterSet<T>(f), w);
224 }
225
226 void cancelListen(InfoHash h, size_t token);
227 void cancelListen(InfoHash h, std::shared_future<size_t> token);
228
229 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false);
230 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) {
231 put(hash, value, bindDoneCb(cb), created, permanent);
232 }
233
234 void put(InfoHash hash, Value&& value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false);
235 void put(InfoHash hash, Value&& value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) {
236 put(hash, std::forward<Value>(value), bindDoneCb(cb), created, permanent);
237 }
238 void put(const std::string& key, Value&& value, DoneCallbackSimple cb={}, time_point created=time_point::max(), bool permanent = false);
239
240 void cancelPut(const InfoHash& h, Value::Id id);
241 void cancelPut(const InfoHash& h, const std::shared_ptr<Value>& value);
242
243 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, bool permanent = false);
244 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false) {
245 putSigned(hash, value, bindDoneCb(cb), permanent);
246 }
247
248 void putSigned(InfoHash hash, Value&& value, DoneCallback cb={}, bool permanent = false);
249 void putSigned(InfoHash hash, Value&& value, DoneCallbackSimple cb, bool permanent = false) {
250 putSigned(hash, std::forward<Value>(value), bindDoneCb(cb), permanent);
251 }
252 void putSigned(const std::string& key, Value&& value, DoneCallbackSimple cb={}, bool permanent = false);
253
254 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb={}, bool permanent = false);
255 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false) {
256 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
257 }
258
259 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb={}, bool permanent = false);
260 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallbackSimple cb, bool permanent = false) {
261 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
262 }
263 void putEncrypted(const std::string& key, InfoHash to, Value&& value, DoneCallback cb={}, bool permanent = false);
264
265 void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, std::shared_ptr<Value> value, DoneCallback cb={}, bool permanent = false);
266 void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false) {
267 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
268 }
269
270 void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, Value&& value, DoneCallback cb={}, bool permanent = false);
271 void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, Value&& value, DoneCallbackSimple cb, bool permanent = false) {
272 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
273 }
274
275
280 void bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple cb={});
281 void bootstrap(SockAddr addr, DoneCallbackSimple cb={});
282
287 void bootstrap(std::vector<NodeExport> nodes);
288
295 void bootstrap(const std::string& host, const std::string& service);
296 void bootstrap(const std::string& hostService);
297
302 void bootstrap(const InfoHash& id, const SockAddr& address);
303
308
315
316 void dumpTables() const;
317
322 std::shared_ptr<crypto::PublicKey> getPublicKey() const;
323
328
333 SockAddr getBound(sa_family_t f = AF_INET) const;
334
339 in_port_t getBoundPort(sa_family_t f = AF_INET) const;
340
341 std::pair<size_t, size_t> getStoreSize() const;
342
343 void getStorageLimit() const;
344 void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT);
345
346 std::vector<NodeExport> exportNodes() const;
347
348 std::vector<ValuesExport> exportValues() const;
349
350 void setLogger(const Sp<Logger>& logger = {});
351 void setLogger(const Logger& logger) {
352 setLogger(std::make_shared<Logger>(logger));
353 }
354
358 void setLogFilter(const InfoHash& f = {});
359
360 void registerType(const ValueType& type);
361
362 void importValues(const std::vector<ValuesExport>& values);
363
364 bool isRunning() const {
365 return running != State::Idle;
366 }
367
368 NodeStats getNodesStats(sa_family_t af) const;
369 unsigned getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const;
370 NodeInfo getNodeInfo() const;
371 void getNodeInfo(std::function<void(std::shared_ptr<NodeInfo>)>);
372
373 std::vector<unsigned> getNodeMessageStats(bool in = false) const;
374 std::string getStorageLog() const;
375 std::string getStorageLog(const InfoHash&) const;
376 std::string getRoutingTablesLog(sa_family_t af) const;
377 std::string getSearchesLog(sa_family_t af = AF_UNSPEC) const;
378 std::string getSearchLog(const InfoHash&, sa_family_t af = AF_UNSPEC) const;
379 std::vector<SockAddr> getPublicAddress(sa_family_t af = AF_UNSPEC) const;
380 std::vector<std::string> getPublicAddressStr(sa_family_t af = AF_UNSPEC) const;
381 void getPublicAddress(std::function<void(std::vector<SockAddr>&&)>, sa_family_t af = AF_UNSPEC);
382
383 // securedht methods
384
385 void findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>&)>);
386 void registerCertificate(std::shared_ptr<crypto::Certificate> cert);
387 void setLocalCertificateStore(CertificateStoreQuery&& query_method);
388
395 void run(in_port_t port = dht::net::DHT_DEFAULT_PORT, const crypto::Identity& identity = {}, bool threaded = true, NetId network = 0) {
396 Config config;
397 config.dht_config.node_config.network = network;
398 config.dht_config.id = identity;
399 config.threaded = threaded;
400 run(port, config);
401 }
402 void run(in_port_t port, Config& config, Context&& context = {});
403
407 void run(const char* ip4, const char* ip6, const char* service, Config& config, Context&& context = {});
408
409 void run(const Config& config, Context&& context);
410
411 void setOnStatusChanged(StatusCallback&& cb) {
412 if (cb)
413 statusCbs.emplace_back(std::move(cb));
414 }
415
421 time_point loop() {
422 std::lock_guard<std::mutex> lck(dht_mtx);
423 return loop_();
424 }
425
429 void shutdown(ShutdownCallback cb = {}, bool stop = false);
430
436 void join();
437
438 std::shared_ptr<PeerDiscovery> getPeerDiscovery() const { return peerDiscovery_; };
439
440 void setProxyServer(const std::string& proxy, const std::string& pushNodeId = "");
441
446 void enableProxy(bool proxify);
447
448 /* Push notification methods */
449
453 void setPushNotificationToken(const std::string& token);
454
458 void setPushNotificationTopic(const std::string& topic);
459
463 void setPushNotificationPlatform(const std::string& platform);
464
468 void pushNotificationReceived(const std::map<std::string, std::string>& data);
469
470 /* Proxy server mothods */
471 void forwardAllMessages(bool forward);
472
473private:
474 enum class State {
475 Idle,
476 Running,
477 Stopping
478 };
479
480 time_point loop_();
481
482 NodeStatus getStatus() const {
483 return std::max(status4, status6);
484 }
485
486 bool checkShutdown();
487 void opEnded();
488 DoneCallback bindOpDoneCallback(DoneCallback&& cb);
489 DoneCallbackSimple bindOpDoneCallback(DoneCallbackSimple&& cb);
490
492 std::unique_ptr<SecureDht> dht_;
493
495 std::atomic_bool use_proxy {false};
496
498 Config config_;
499 IdentityAnnouncedCb identityAnnouncedCb_;
500
504 void resetDht();
505
506 mutable std::mutex dht_mtx {};
507 std::thread dht_thread {};
508 std::condition_variable cv {};
509 std::mutex sock_mtx {};
510 net::PacketList rcv {};
511 decltype(rcv) rcv_free {};
512
513 std::queue<std::function<void(SecureDht&)>> pending_ops_prio {};
514 std::queue<std::function<void(SecureDht&)>> pending_ops {};
515 std::mutex storage_mtx {};
516
517 std::atomic<State> running {State::Idle};
518 std::atomic_size_t ongoing_ops {0};
519 std::vector<ShutdownCallback> shutdownCallbacks_;
520
521 NodeStatus status4 {NodeStatus::Disconnected},
522 status6 {NodeStatus::Disconnected};
523
524 std::vector<StatusCallback> statusCbs {};
525
527 std::shared_ptr<PeerDiscovery> peerDiscovery_;
528
533 std::shared_ptr<dht::Logger> logger_;
534};
535
536}
InfoHash getId() const
in_port_t getBoundPort(sa_family_t f=AF_INET) const
void clearBootstrap()
void shutdown(ShutdownCallback cb={}, bool stop=false)
void bootstrap(const std::string &host, const std::string &service)
void setPushNotificationToken(const std::string &token)
void connectivityChanged()
time_point loop()
Definition dhtrunner.h:421
InfoHash getNodeId() const
void pushNotificationReceived(const std::map< std::string, std::string > &data)
void run(in_port_t port=dht::net::DHT_DEFAULT_PORT, const crypto::Identity &identity={}, bool threaded=true, NetId network=0)
Definition dhtrunner.h:395
void bootstrap(std::vector< SockAddr > nodes, DoneCallbackSimple cb={})
void setLogFilter(const InfoHash &f={})
SockAddr getBound(sa_family_t f=AF_INET) const
void run(const char *ip4, const char *ip6, const char *service, Config &config, Context &&context={})
void enableProxy(bool proxify)
void setPushNotificationPlatform(const std::string &platform)
void bootstrap(const InfoHash &id, const SockAddr &address)
void setPushNotificationTopic(const std::string &topic)
void bootstrap(std::vector< NodeExport > nodes)
NodeStatus
Definition callbacks.h:42
NetId network
Definition callbacks.h:114