My Project 2.4.4
C++ Distributed Hash Table
Loading...
Searching...
No Matches
dhtrunner.h
1/*
2 * Copyright (C) 2014-2022 Savoir-faire Linux Inc.
3 * Authors: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4 * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5 * Sébastien Blin <sebastien.blin@savoirfairelinux.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include "def.h"
24#include "infohash.h"
25#include "value.h"
26#include "callbacks.h"
27#include "sockaddr.h"
28#include "log_enable.h"
29#include "network_utils.h"
30
31#include <thread>
32#include <mutex>
33#include <atomic>
34#include <condition_variable>
35#include <future>
36#include <exception>
37#include <queue>
38#include <chrono>
39
40namespace dht {
41
42struct Node;
43class SecureDht;
44class PeerDiscovery;
45struct SecureDhtConfig;
46
53class OPENDHT_PUBLIC DhtRunner {
54
55public:
56 using StatusCallback = std::function<void(NodeStatus, NodeStatus)>;
57
58 struct Config {
59 SecureDhtConfig dht_config {};
60 bool threaded {true};
61 std::string proxy_server {};
62 std::string push_node_id {};
63 std::string push_token {};
64 bool peer_discovery {false};
65 bool peer_publish {false};
66 std::shared_ptr<dht::crypto::Certificate> server_ca;
67 dht::crypto::Identity client_identity;
68 SockAddr bind4 {}, bind6 {};
69 };
70
71 struct Context {
72 std::shared_ptr<Logger> logger {};
73 std::unique_ptr<net::DatagramSocket> sock;
74 std::shared_ptr<PeerDiscovery> peerDiscovery {};
75 StatusCallback statusChangedCallback {};
76 CertificateStoreQuery certificateStore {};
77 IdentityAnnouncedCb identityAnnouncedCb {};
78 Context() {}
79 };
80
81 DhtRunner();
82 virtual ~DhtRunner();
83
84 void get(InfoHash id, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter f = {}, Where w = {}) {
85 get(id, bindGetCb(cb), donecb, f, w);
86 }
87
88 void get(InfoHash id, GetCallbackSimple cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) {
89 get(id, bindGetCb(cb), donecb, f, w);
90 }
91
92 void get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f={}, Where w = {});
93
94 void get(InfoHash id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) {
95 get(id, cb, bindDoneCb(donecb), f, w);
96 }
97 void get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb={}, Value::Filter f = {}, Where w = {});
98
99 template <class T>
100 void get(InfoHash hash, std::function<bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb={})
101 {
102 get(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
103 return cb(unpackVector<T>(vals));
104 },
105 dcb,
106 getFilterSet<T>());
107 }
108 template <class T>
109 void get(InfoHash hash, std::function<bool(T&&)> cb, DoneCallbackSimple dcb={})
110 {
111 get(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
112 for (const auto& v : vals) {
113 try {
114 if (not cb(Value::unpack<T>(*v)))
115 return false;
116 } catch (const std::exception&) {
117 continue;
118 }
119 }
120 return true;
121 },
122 dcb,
123 getFilterSet<T>());
124 }
125
126 std::future<std::vector<std::shared_ptr<dht::Value>>> get(InfoHash key, Value::Filter f = {}, Where w = {}) {
127 auto p = std::make_shared<std::promise<std::vector<std::shared_ptr< dht::Value >>>>();
128 auto values = std::make_shared<std::vector<std::shared_ptr< dht::Value >>>();
129 get(key, [=](const std::vector<std::shared_ptr<dht::Value>>& vlist) {
130 values->insert(values->end(), vlist.begin(), vlist.end());
131 return true;
132 }, [=](bool) {
133 p->set_value(std::move(*values));
134 },
135 f, w);
136 return p->get_future();
137 }
138
139 template <class T>
140 std::future<std::vector<T>> get(InfoHash key) {
141 auto p = std::make_shared<std::promise<std::vector<T>>>();
142 auto values = std::make_shared<std::vector<T>>();
143 get<T>(key, [=](T&& v) {
144 values->emplace_back(std::move(v));
145 return true;
146 }, [=](bool) {
147 p->set_value(std::move(*values));
148 });
149 return p->get_future();
150 }
151
152 void query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb = {}, Query q = {});
153 void query(const InfoHash& hash, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query q = {}) {
154 query(hash, cb, bindDoneCb(done_cb), q);
155 }
156
157 std::future<size_t> listen(InfoHash key, ValueCallback vcb, Value::Filter f = {}, Where w = {});
158
159 std::future<size_t> listen(InfoHash key, GetCallback cb, Value::Filter f={}, Where w={}) {
160 return listen(key, [cb=std::move(cb)](const std::vector<Sp<Value>>& vals, bool expired){
161 if (not expired)
162 return cb(vals);
163 return true;
164 }, std::forward<Value::Filter>(f), std::forward<Where>(w));
165 }
166 std::future<size_t> listen(const std::string& key, GetCallback vcb, Value::Filter f = {}, Where w = {});
167 std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = {}, Where w = {}) {
168 return listen(key, bindGetCb(cb), f, w);
169 }
170
171 template <class T>
172 std::future<size_t> listen(InfoHash hash, std::function<bool(std::vector<T>&&)> cb)
173 {
174 return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
175 return cb(unpackVector<T>(vals));
176 },
177 getFilterSet<T>());
178 }
179 template <class T>
180 std::future<size_t> listen(InfoHash hash, std::function<bool(std::vector<T>&&, bool)> cb)
181 {
182 return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals, bool expired) {
183 return cb(unpackVector<T>(vals), expired);
184 },
185 getFilterSet<T>());
186 }
187
188 template <typename T>
189 std::future<size_t> listen(InfoHash hash, std::function<bool(T&&)> cb, Value::Filter f = {}, Where w = {})
190 {
191 return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals) {
192 for (const auto& v : vals) {
193 try {
194 if (not cb(Value::unpack<T>(*v)))
195 return false;
196 } catch (const std::exception&) {
197 continue;
198 }
199 }
200 return true;
201 },
202 getFilterSet<T>(f), w);
203 }
204 template <typename T>
205 std::future<size_t> listen(InfoHash hash, std::function<bool(T&&, bool)> cb, Value::Filter f = {}, Where w = {})
206 {
207 return listen(hash, [cb=std::move(cb)](const std::vector<std::shared_ptr<Value>>& vals, bool expired) {
208 for (const auto& v : vals) {
209 try {
210 if (not cb(Value::unpack<T>(*v), expired))
211 return false;
212 } catch (const std::exception&) {
213 continue;
214 }
215 }
216 return true;
217 },
218 getFilterSet<T>(f), w);
219 }
220
221 void cancelListen(InfoHash h, size_t token);
222 void cancelListen(InfoHash h, std::shared_future<size_t> token);
223
224 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false);
225 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) {
226 put(hash, value, bindDoneCb(cb), created, permanent);
227 }
228
229 void put(InfoHash hash, Value&& value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false);
230 void put(InfoHash hash, Value&& value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) {
231 put(hash, std::forward<Value>(value), bindDoneCb(cb), created, permanent);
232 }
233 void put(const std::string& key, Value&& value, DoneCallbackSimple cb={}, time_point created=time_point::max(), bool permanent = false);
234
235 void cancelPut(const InfoHash& h, Value::Id id);
236 void cancelPut(const InfoHash& h, const std::shared_ptr<Value>& value);
237
238 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, bool permanent = false);
239 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false) {
240 putSigned(hash, value, bindDoneCb(cb), permanent);
241 }
242
243 void putSigned(InfoHash hash, Value&& value, DoneCallback cb={}, bool permanent = false);
244 void putSigned(InfoHash hash, Value&& value, DoneCallbackSimple cb, bool permanent = false) {
245 putSigned(hash, std::forward<Value>(value), bindDoneCb(cb), permanent);
246 }
247 void putSigned(const std::string& key, Value&& value, DoneCallbackSimple cb={}, bool permanent = false);
248
249 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb={}, bool permanent = false);
250 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false) {
251 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
252 }
253
254 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb={}, bool permanent = false);
255 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallbackSimple cb, bool permanent = false) {
256 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
257 }
258 void putEncrypted(const std::string& key, InfoHash to, Value&& value, DoneCallback cb={}, bool permanent = false);
259
260 void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, std::shared_ptr<Value> value, DoneCallback cb={}, bool permanent = false);
261 void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false) {
262 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
263 }
264
265 void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, Value&& value, DoneCallback cb={}, bool permanent = false);
266 void putEncrypted(InfoHash hash, const std::shared_ptr<crypto::PublicKey>& to, Value&& value, DoneCallbackSimple cb, bool permanent = false) {
267 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
268 }
269
270
275 void bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple cb={});
276 void bootstrap(SockAddr addr, DoneCallbackSimple cb={});
277
282 void bootstrap(std::vector<NodeExport> nodes);
283
290 void bootstrap(const std::string& host, const std::string& service);
291 void bootstrap(const std::string& hostService);
292
297 void bootstrap(const InfoHash& id, const SockAddr& address);
298
303
310
311 void dumpTables() const;
312
317 std::shared_ptr<crypto::PublicKey> getPublicKey() const;
318
323
328 SockAddr getBound(sa_family_t f = AF_INET) const;
329
334 in_port_t getBoundPort(sa_family_t f = AF_INET) const;
335
336 std::pair<size_t, size_t> getStoreSize() const;
337
338 void getStorageLimit() const;
339 void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT);
340
341 std::vector<NodeExport> exportNodes() const;
342
343 std::vector<ValuesExport> exportValues() const;
344
345 void setLogger(const Sp<Logger>& logger = {});
346 void setLogger(const Logger& logger) {
347 setLogger(std::make_shared<Logger>(logger));
348 }
349 void setLoggers(LogMethod err = {}, LogMethod warn = {}, LogMethod debug = {});
350
354 void setLogFilter(const InfoHash& f = {});
355
356 void registerType(const ValueType& type);
357
358 void importValues(const std::vector<ValuesExport>& values);
359
360 bool isRunning() const {
361 return running != State::Idle;
362 }
363
364 NodeStats getNodesStats(sa_family_t af) const;
365 unsigned getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const;
366 NodeInfo getNodeInfo() const;
367 void getNodeInfo(std::function<void(std::shared_ptr<NodeInfo>)>);
368
369 std::vector<unsigned> getNodeMessageStats(bool in = false) const;
370 std::string getStorageLog() const;
371 std::string getStorageLog(const InfoHash&) const;
372 std::string getRoutingTablesLog(sa_family_t af) const;
373 std::string getSearchesLog(sa_family_t af = AF_UNSPEC) const;
374 std::string getSearchLog(const InfoHash&, sa_family_t af = AF_UNSPEC) const;
375 std::vector<SockAddr> getPublicAddress(sa_family_t af = AF_UNSPEC);
376 std::vector<std::string> getPublicAddressStr(sa_family_t af = AF_UNSPEC);
377 void getPublicAddress(std::function<void(std::vector<SockAddr>&&)>, sa_family_t af = AF_UNSPEC);
378
379 // securedht methods
380
381 void findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>&)>);
382 void registerCertificate(std::shared_ptr<crypto::Certificate> cert);
383 void setLocalCertificateStore(CertificateStoreQuery&& query_method);
384
391 void run(in_port_t port = dht::net::DHT_DEFAULT_PORT, const crypto::Identity& identity = {}, bool threaded = true, NetId network = 0) {
392 Config config;
393 config.dht_config.node_config.network = network;
394 config.dht_config.id = identity;
395 config.threaded = threaded;
396 run(port, config);
397 }
398 void run(in_port_t port, Config& config, Context&& context = {});
399
403 void run(const char* ip4, const char* ip6, const char* service, Config& config, Context&& context = {});
404
405 void run(const Config& config, Context&& context);
406
407 void setOnStatusChanged(StatusCallback&& cb) {
408 statusCb = std::move(cb);
409 }
410
416 time_point loop() {
417 std::lock_guard<std::mutex> lck(dht_mtx);
418 return loop_();
419 }
420
424 void shutdown(ShutdownCallback cb = {}, bool stop = false);
425
431 void join();
432
433 std::shared_ptr<PeerDiscovery> getPeerDiscovery() const { return peerDiscovery_; };
434
435 void setProxyServer(const std::string& proxy, const std::string& pushNodeId = "");
436
442 void enableProxy(bool proxify);
443
444 /* Push notification methods */
445
449 void setPushNotificationToken(const std::string& token);
450
454 void pushNotificationReceived(const std::map<std::string, std::string>& data);
455
456 /* Proxy server mothods */
457 void forwardAllMessages(bool forward);
458
459private:
460 enum class State {
461 Idle,
462 Running,
463 Stopping
464 };
465
466 time_point loop_();
467
468 NodeStatus getStatus() const {
469 return std::max(status4, status6);
470 }
471
472 bool checkShutdown();
473 void opEnded();
474 DoneCallback bindOpDoneCallback(DoneCallback&& cb);
475 DoneCallbackSimple bindOpDoneCallback(DoneCallbackSimple&& cb);
476
478 std::unique_ptr<SecureDht> dht_;
479
481 std::unique_ptr<SecureDht> dht_via_proxy_;
482
484 std::atomic_bool use_proxy {false};
485
487 Config config_;
488 IdentityAnnouncedCb identityAnnouncedCb_;
489
493 void resetDht();
497 SecureDht* activeDht() const;
498
502 struct Listener;
503 std::map<size_t, Listener> listeners_;
504 size_t listener_token_ {1};
505
506 mutable std::mutex dht_mtx {};
507 std::thread dht_thread {};
508 std::condition_variable cv {};
509 std::mutex sock_mtx {};
510 net::PacketList rcv {};
511 decltype(rcv) rcv_free {};
512
513 std::queue<std::function<void(SecureDht&)>> pending_ops_prio {};
514 std::queue<std::function<void(SecureDht&)>> pending_ops {};
515 std::mutex storage_mtx {};
516
517 std::atomic<State> running {State::Idle};
518 std::atomic_size_t ongoing_ops {0};
519 std::vector<ShutdownCallback> shutdownCallbacks_;
520
521 NodeStatus status4 {NodeStatus::Disconnected},
522 status6 {NodeStatus::Disconnected};
523 StatusCallback statusCb {nullptr};
524
526 std::shared_ptr<PeerDiscovery> peerDiscovery_;
527
532 std::shared_ptr<dht::Logger> logger_;
533};
534
535}
InfoHash getId() const
in_port_t getBoundPort(sa_family_t f=AF_INET) const
void clearBootstrap()
void shutdown(ShutdownCallback cb={}, bool stop=false)
void bootstrap(const std::string &host, const std::string &service)
void setPushNotificationToken(const std::string &token)
void connectivityChanged()
time_point loop()
Definition: dhtrunner.h:416
InfoHash getNodeId() const
void pushNotificationReceived(const std::map< std::string, std::string > &data)
void run(in_port_t port=dht::net::DHT_DEFAULT_PORT, const crypto::Identity &identity={}, bool threaded=true, NetId network=0)
Definition: dhtrunner.h:391
void bootstrap(std::vector< SockAddr > nodes, DoneCallbackSimple cb={})
void setLogFilter(const InfoHash &f={})
SockAddr getBound(sa_family_t f=AF_INET) const
void run(const char *ip4, const char *ip6, const char *service, Config &config, Context &&context={})
void enableProxy(bool proxify)
void bootstrap(const InfoHash &id, const SockAddr &address)
void bootstrap(std::vector< NodeExport > nodes)
Definition: callbacks.h:35
NodeStatus
Definition: callbacks.h:42
NetId network
Definition: callbacks.h:114