My Project 2.4.4
C++ Distributed Hash Table
Loading...
Searching...
No Matches
dht.h
1/*
2 * Copyright (C) 2014-2022 Savoir-faire Linux Inc.
3 * Authors: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4 * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5 * Sébastien Blin <sebastien.blin@savoirfairelinux.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include "infohash.h"
24#include "value.h"
25#include "utils.h"
26#include "network_engine.h"
27#include "scheduler.h"
28#include "routing_table.h"
29#include "callbacks.h"
30#include "dht_interface.h"
31
32#include <string>
33#include <array>
34#include <vector>
35#include <map>
36#include <functional>
37#include <memory>
38
39#ifdef _WIN32
40#include <iso646.h>
41#endif
42
43namespace dht {
44
45namespace net {
46struct Request;
47} /* namespace net */
48
49struct Storage;
50struct ValueStorage;
51class StorageBucket;
52struct Listener;
53struct LocalListener;
54
62class OPENDHT_PUBLIC Dht final : public DhtInterface {
63public:
68 Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, const Sp<Logger>& l = {});
69
70 Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, const Logger& l = {})
71 : Dht(std::move(sock), config, std::make_shared<Logger>(l)) {}
72
73 virtual ~Dht();
74
78 inline const InfoHash& getNodeId() const override { return myid; }
79
80 NodeStatus updateStatus(sa_family_t af) override;
81
85 NodeStatus getStatus(sa_family_t af) const override {
86 return dht(af).status;
87 }
88
89 NodeStatus getStatus() const override {
90 return std::max(getStatus(AF_INET), getStatus(AF_INET6));
91 }
92
93 net::DatagramSocket* getSocket() const override { return network_engine.getSocket(); };
94
98 void shutdown(ShutdownCallback cb, bool stop = false) override;
99
106 bool isRunning(sa_family_t af = 0) const override;
107
108 virtual void registerType(const ValueType& type) override {
109 types.registerType(type);
110 }
111 const ValueType& getType(ValueType::Id type_id) const override {
112 return types.getType(type_id);
113 }
114
115 void addBootstrap(const std::string& host, const std::string& service) override {
116 bootstrap_nodes.emplace_back(host, service);
117 startBootstrap();
118 }
119
120 void clearBootstrap() override {
121 bootstrap_nodes.clear();
122 }
123
129 void insertNode(const InfoHash& id, const SockAddr&) override;
130 void insertNode(const NodeExport& n) override {
131 insertNode(n.id, SockAddr(n.ss, n.sslen));
132 }
133
134 void pingNode(SockAddr, DoneCallbackSimple&& cb={}) override;
135
136 time_point periodic(const uint8_t *buf, size_t buflen, SockAddr, const time_point& now) override;
137 time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen, const time_point& now) override {
138 return periodic(buf, buflen, SockAddr(from, fromlen), now);
139 }
140
151 virtual void get(const InfoHash& key, GetCallback cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) override;
152 virtual void get(const InfoHash& key, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f={}, Where&& w = {}) override {
153 get(key, cb, bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
154 }
155 virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) override {
156 get(key, bindGetCb(cb), donecb, std::forward<Value::Filter>(f), std::forward<Where>(w));
157 }
158 virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) override {
159 get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
160 }
171 virtual void query(const InfoHash& key, QueryCallback cb, DoneCallback done_cb = {}, Query&& q = {}) override;
172 virtual void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) override {
173 query(key, cb, bindDoneCb(done_cb), std::forward<Query>(q));
174 }
175
179 std::vector<Sp<Value>> getLocal(const InfoHash& key, const Value::Filter& f = {}) const override;
180
184 Sp<Value> getLocalById(const InfoHash& key, Value::Id vid) const override;
185
192 void put(const InfoHash& key,
193 Sp<Value>,
194 DoneCallback cb=nullptr,
195 time_point created=time_point::max(),
196 bool permanent = false) override;
197 void put(const InfoHash& key,
198 const Sp<Value>& v,
199 DoneCallbackSimple cb,
200 time_point created=time_point::max(),
201 bool permanent = false) override
202 {
203 put(key, v, bindDoneCb(cb), created, permanent);
204 }
205
206 void put(const InfoHash& key,
207 Value&& v,
208 DoneCallback cb=nullptr,
209 time_point created=time_point::max(),
210 bool permanent = false) override
211 {
212 put(key, std::make_shared<Value>(std::move(v)), cb, created, permanent);
213 }
214 void put(const InfoHash& key,
215 Value&& v,
216 DoneCallbackSimple cb,
217 time_point created=time_point::max(),
218 bool permanent = false) override
219 {
220 put(key, std::forward<Value>(v), bindDoneCb(cb), created, permanent);
221 }
222
226 std::vector<Sp<Value>> getPut(const InfoHash&) const override;
227
231 Sp<Value> getPut(const InfoHash&, const Value::Id&) const override;
232
237 bool cancelPut(const InfoHash&, const Value::Id&) override;
238
246 size_t listen(const InfoHash&, ValueCallback, Value::Filter={}, Where={}) override;
247
248 size_t listen(const InfoHash& key, GetCallback cb, Value::Filter f={}, Where w={}) override {
249 return listen(key, [cb](const std::vector<Sp<Value>>& vals, bool expired){
250 if (not expired)
251 return cb(vals);
252 return true;
253 }, std::forward<Value::Filter>(f), std::forward<Where>(w));
254 }
255 size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w={}) override {
256 return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w));
257 }
258
259 bool cancelListen(const InfoHash&, size_t token) override;
260
266 void connectivityChanged(sa_family_t) override;
267 void connectivityChanged() override {
268 reported_addr.clear();
269 connectivityChanged(AF_INET);
270 connectivityChanged(AF_INET6);
271 }
272
277 std::vector<NodeExport> exportNodes() const override;
278
279 std::vector<ValuesExport> exportValues() const override;
280 void importValues(const std::vector<ValuesExport>&) override;
281
282 void saveState(const std::string& path) const;
283 void loadState(const std::string& path);
284
285 NodeStats getNodesStats(sa_family_t af) const override;
286
287 std::string getStorageLog() const override;
288 std::string getStorageLog(const InfoHash&) const override;
289
290 std::string getRoutingTablesLog(sa_family_t) const override;
291 std::string getSearchesLog(sa_family_t) const override;
292 std::string getSearchLog(const InfoHash&, sa_family_t af = AF_UNSPEC) const override;
293
294 void dumpTables() const override;
295 std::vector<unsigned> getNodeMessageStats(bool in = false) override {
296 return network_engine.getNodeMessageStats(in);
297 }
298
302 void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT) override {
303 max_store_size = limit;
304 }
305 size_t getStorageLimit() const override {
306 return max_store_size;
307 }
308
313 std::pair<size_t, size_t> getStoreSize() const override {
314 return {total_store_size, total_values};
315 }
316
317 std::vector<SockAddr> getPublicAddress(sa_family_t family = 0) override;
318
319 void pushNotificationReceived(const std::map<std::string, std::string>&) override {}
320 void resubscribe(unsigned) {}
321
322private:
323
324 /* When performing a search, we search for up to SEARCH_NODES closest nodes
325 to the destination, and use the additional ones to backtrack if any of
326 the target 8 turn out to be dead. */
327 static constexpr unsigned SEARCH_NODES {14};
328
329 /* The number of bad nodes is limited in order to help determine
330 * presence of connectivity changes. See
331 * https://github.com/savoirfairelinux/opendht/issues/137 for details.
332 *
333 * According to the tables, 25 is a good average value for big networks. If
334 * the network is small, normal search expiration process will handle the
335 * situation.
336 * */
337 static constexpr unsigned SEARCH_MAX_BAD_NODES {25};
338
339 /* Concurrent search nodes requested count */
340 static constexpr unsigned MAX_REQUESTED_SEARCH_NODES {4};
341
342 /* Number of listening nodes */
343 static constexpr unsigned LISTEN_NODES {4};
344
345 /* The maximum number of hashes we're willing to track. */
346 static constexpr unsigned MAX_HASHES {1024 * 1024 * 1024};
347
348 /* The maximum number of searches we keep data about. */
349 static constexpr unsigned MAX_SEARCHES {1024 * 1024};
350
351 static constexpr std::chrono::minutes MAX_STORAGE_MAINTENANCE_EXPIRE_TIME {10};
352
353 /* The time after which we consider a search to be expirable. */
354 static constexpr std::chrono::minutes SEARCH_EXPIRE_TIME {62};
355
356 /* Timeout for listen */
357 static constexpr duration LISTEN_EXPIRE_TIME {std::chrono::seconds(30)};
358 static constexpr duration LISTEN_EXPIRE_TIME_PUBLIC {std::chrono::minutes(5)};
359
360 static constexpr duration REANNOUNCE_MARGIN {std::chrono::seconds(10)};
361
362 static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10};
363
364 static constexpr size_t TOKEN_SIZE {32};
365
366 // internal structures
367 struct SearchNode;
368 struct Get;
369 struct Announce;
370 struct Search;
371
372 // prevent copy
373 Dht(const Dht&) = delete;
374 Dht& operator=(const Dht&) = delete;
375
376 std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()};
377
378 InfoHash myid {};
379
380 uint64_t secret {};
381 uint64_t oldsecret {};
382
383 // registred types
384 TypeStore types;
385
386 using SearchMap = std::map<InfoHash, Sp<Search>>;
387 struct Kad {
388 RoutingTable buckets {};
389 SearchMap searches {};
390 unsigned pending_pings {0};
391 NodeStatus status;
392
393 NodeStatus getStatus(time_point now) const;
394 NodeStats getNodesStats(time_point now, const InfoHash& myid) const;
395 };
396
397 Kad dht4 {};
398 Kad dht6 {};
399
400 std::vector<std::pair<std::string,std::string>> bootstrap_nodes {};
401 std::chrono::steady_clock::duration bootstrap_period {BOOTSTRAP_PERIOD};
402 Sp<Scheduler::Job> bootstrapJob {};
403
404 std::map<InfoHash, Storage> store;
405 std::map<SockAddr, StorageBucket, SockAddr::ipCmp> store_quota;
406 size_t total_values {0};
407 size_t total_store_size {0};
408 size_t max_store_keys {MAX_HASHES};
409 size_t max_store_size {DEFAULT_STORAGE_LIMIT};
410
411 size_t max_searches {MAX_SEARCHES};
412 size_t search_id {0};
413
414 // map a global listen token to IPv4, IPv6 specific listen tokens.
415 // 0 is the invalid token.
416 std::map<size_t, std::tuple<size_t, size_t, size_t>> listeners {};
417 size_t listener_token {1};
418
419
420 // timing
421 Scheduler scheduler;
422 Sp<Scheduler::Job> nextNodesConfirmation {};
423 Sp<Scheduler::Job> nextStorageMaintenance {};
424
425 net::NetworkEngine network_engine;
426 using ReportedAddr = std::pair<unsigned, SockAddr>;
427 std::vector<ReportedAddr> reported_addr;
428
429 std::string persistPath;
430
431 // are we a bootstrap node ?
432 // note: Any running node can be used as a bootstrap node.
433 // Only nodes running only as bootstrap nodes should
434 // be put in bootstrap mode.
435 const bool is_bootstrap {false};
436 const bool maintain_storage {false};
437 const bool public_stable {false};
438
439 inline const duration& getListenExpiration() const {
440 return public_stable ? LISTEN_EXPIRE_TIME_PUBLIC : LISTEN_EXPIRE_TIME;
441 }
442
443 void rotateSecrets();
444
445 Blob makeToken(const SockAddr&, bool old) const;
446 bool tokenMatch(const Blob& token, const SockAddr&) const;
447
448 void reportedAddr(const SockAddr&);
449
450 // Storage
451 void storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t tid, Query&& = {}, int version = 0);
452 bool storageStore(const InfoHash& id, const Sp<Value>& value, time_point created, const SockAddr& sa = {}, bool permanent = false);
453 bool storageRefresh(const InfoHash& id, Value::Id vid);
454 void expireStore();
455 void expireStorage(InfoHash h);
456 void expireStore(decltype(store)::iterator);
457
458 void storageRemoved(const InfoHash& id, Storage& st, const std::vector<Sp<Value>>& values, size_t totalSize);
459 void storageChanged(const InfoHash& id, Storage& st, const Sp<Value>&, bool newValue);
460 std::string printStorageLog(const decltype(store)::value_type&) const;
461
467 void dataPersistence(InfoHash id);
468 size_t maintainStorage(decltype(store)::value_type&, bool force=false, const DoneCallback& donecb={});
469
470 // Buckets
471 Kad& dht(sa_family_t af) { return af == AF_INET ? dht4 : dht6; }
472 const Kad& dht(sa_family_t af) const { return af == AF_INET ? dht4 : dht6; }
473 RoutingTable& buckets(sa_family_t af) { return dht(af).buckets; }
474 const RoutingTable& buckets(sa_family_t af) const { return dht(af).buckets; }
475 Bucket* findBucket(const InfoHash& id, sa_family_t af) {
476 auto& b = buckets(af);
477 auto it = b.findBucket(id);
478 return it == b.end() ? nullptr : &(*it);
479 }
480 const Bucket* findBucket(const InfoHash& id, sa_family_t af) const {
481 return const_cast<Dht*>(this)->findBucket(id, af);
482 }
483
484 void expireBuckets(RoutingTable&);
485 void sendCachedPing(Bucket& b);
486 bool bucketMaintenance(RoutingTable&);
487 void dumpBucket(const Bucket& b, std::ostream& out) const;
488 void bootstrap();
489 void startBootstrap();
490 void stopBootstrap();
491
492 // Nodes
493 void onNewNode(const Sp<Node>& node, int confirm);
494 const Sp<Node> findNode(const InfoHash& id, sa_family_t af) const;
495 bool trySearchInsert(const Sp<Node>& node);
496
497 // Searches
498 inline SearchMap& searches(sa_family_t af) { return dht(af).searches; }
499 inline const SearchMap& searches(sa_family_t af) const { return dht(af).searches; }
500
505 Sp<Search> search(const InfoHash& id, sa_family_t af, GetCallback = {}, QueryCallback = {}, DoneCallback = {}, Value::Filter = {}, const Sp<Query>& q = {});
506
507 void announce(const InfoHash& id, sa_family_t af, Sp<Value> value, DoneCallback callback, time_point created=time_point::max(), bool permanent = false);
508 size_t listenTo(const InfoHash& id, sa_family_t af, ValueCallback cb, Value::Filter f = {}, const Sp<Query>& q = {});
509
517 unsigned refill(Search& sr);
518 void expireSearches();
519
520 void confirmNodes();
521 void expire();
522
523 void onConnected();
524 void onDisconnected();
525
534 void searchNodeGetDone(const net::Request& status,
535 net::RequestAnswer&& answer,
536 std::weak_ptr<Search> ws,
537 Sp<Query> query);
538
548 void searchNodeGetExpired(const net::Request& status, bool over, std::weak_ptr<Search> ws, Sp<Query> query);
549
557 void paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n);
558
562 SearchNode* searchSendGetValues(Sp<Search> sr, SearchNode *n = nullptr, bool update = true);
563
570 void searchSendAnnounceValue(const Sp<Search>& sr);
571
579 void searchStep(std::weak_ptr<Search> ws);
580
581 void searchSynchedNodeListen(const Sp<Search>&, SearchNode&);
582
583 void dumpSearch(const Search& sr, std::ostream& out) const;
584
585 bool neighbourhoodMaintenance(RoutingTable&);
586
587 void onError(Sp<net::Request> node, net::DhtProtocolException e);
588 /* when our address is reported by a distant peer. */
589 void onReportedAddr(const InfoHash& id, const SockAddr&);
590 /* when we receive a ping request */
591 net::RequestAnswer onPing(Sp<Node> node);
592 /* when we receive a "find node" request */
593 net::RequestAnswer onFindNode(Sp<Node> node, const InfoHash& hash, want_t want);
594 void onFindNodeDone(const Sp<Node>& status,
595 net::RequestAnswer& a,
596 Sp<Search> sr);
597 /* when we receive a "get values" request */
598 net::RequestAnswer onGetValues(Sp<Node> node,
599 const InfoHash& hash,
600 want_t want,
601 const Query& q);
602 void onGetValuesDone(const Sp<Node>& status,
603 net::RequestAnswer& a,
604 Sp<Search>& sr,
605 const Sp<Query>& orig_query);
606 /* when we receive a listen request */
607 net::RequestAnswer onListen(Sp<Node> node,
608 const InfoHash& hash,
609 const Blob& token,
610 size_t socket_id,
611 const Query& query,
612 int version = 0);
613 void onListenDone(const Sp<Node>& status,
614 net::RequestAnswer& a,
615 Sp<Search>& sr);
616 /* when we receive an announce request */
617 net::RequestAnswer onAnnounce(Sp<Node> node,
618 const InfoHash& hash,
619 const Blob& token,
620 const std::vector<Sp<Value>>& v,
621 const time_point& created);
622 net::RequestAnswer onRefresh(Sp<Node> node,
623 const InfoHash& hash,
624 const Blob& token,
625 const Value::Id& vid);
626 void onAnnounceDone(const Sp<Node>& status,
627 net::RequestAnswer& a,
628 Sp<Search>& sr);
629};
630
631}
Definition: dht.h:62
void insertNode(const InfoHash &id, const SockAddr &) override
void setStorageLimit(size_t limit=DEFAULT_STORAGE_LIMIT) override
Definition: dht.h:302
NodeStatus updateStatus(sa_family_t af) override
NodeStatus getStatus(sa_family_t af) const override
Definition: dht.h:85
std::pair< size_t, size_t > getStoreSize() const override
Definition: dht.h:313
Sp< Value > getLocalById(const InfoHash &key, Value::Id vid) const override
std::vector< Sp< Value > > getPut(const InfoHash &) const override
std::vector< NodeExport > exportNodes() const override
size_t listen(const InfoHash &, ValueCallback, Value::Filter={}, Where={}) override
bool cancelPut(const InfoHash &, const Value::Id &) override
void put(const InfoHash &key, Sp< Value >, DoneCallback cb=nullptr, time_point created=time_point::max(), bool permanent=false) override
void connectivityChanged(sa_family_t) override
void pushNotificationReceived(const std::map< std::string, std::string > &) override
Definition: dht.h:319
size_t listen(const InfoHash &key, GetCallback cb, Value::Filter f={}, Where w={}) override
Definition: dht.h:248
bool isRunning(sa_family_t af=0) const override
Sp< Value > getPut(const InfoHash &, const Value::Id &) const override
void shutdown(ShutdownCallback cb, bool stop=false) override
virtual void get(const InfoHash &key, GetCallback cb, DoneCallback donecb={}, Value::Filter &&f={}, Where &&w={}) override
std::vector< Sp< Value > > getLocal(const InfoHash &key, const Value::Filter &f={}) const override
virtual void query(const InfoHash &key, QueryCallback cb, DoneCallback done_cb={}, Query &&q={}) override
const InfoHash & getNodeId() const override
Definition: dht.h:78
Dht(std::unique_ptr< net::DatagramSocket > &&sock, const Config &config, const Sp< Logger > &l={})
Definition: callbacks.h:35
std::vector< uint8_t > Blob
Definition: utils.h:151
NodeStatus
Definition: callbacks.h:42
Describes a query destined to another peer.
Definition: value.h:925
Serializable dht::Value filter.
Definition: value.h:801