26#include "network_engine.h"
28#include "routing_table.h"
30#include "dht_interface.h"
68 Dht(std::unique_ptr<net::DatagramSocket>&& sock,
const Config& config,
const Sp<Logger>& l = {});
70 Dht(std::unique_ptr<net::DatagramSocket>&& sock,
const Config& config,
const Logger& l = {})
71 : Dht(std::move(sock), config, std::make_shared<Logger>(l)) {}
86 return dht(af).status;
90 return std::max(getStatus(AF_INET), getStatus(AF_INET6));
93 net::DatagramSocket* getSocket()
const override {
return network_engine.getSocket(); };
98 void shutdown(ShutdownCallback cb,
bool stop =
false)
override;
108 virtual void registerType(
const ValueType& type)
override {
109 types.registerType(type);
111 const ValueType& getType(ValueType::Id type_id)
const override {
112 return types.getType(type_id);
115 void addBootstrap(
const std::string& host,
const std::string& service)
override {
116 bootstrap_nodes.emplace_back(host, service);
120 void clearBootstrap()
override {
121 bootstrap_nodes.clear();
130 void insertNode(
const NodeExport& n)
override {
131 insertNode(n.id,
SockAddr(n.ss, n.sslen));
134 void pingNode(
SockAddr, DoneCallbackSimple&& cb={})
override;
136 time_point periodic(
const uint8_t *buf,
size_t buflen, SockAddr,
const time_point& now)
override;
137 time_point periodic(
const uint8_t *buf,
size_t buflen,
const sockaddr* from, socklen_t fromlen,
const time_point& now)
override {
138 return periodic(buf, buflen, SockAddr(from, fromlen), now);
152 virtual void get(
const InfoHash& key, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f={}, Where&& w = {})
override {
153 get(key, cb, bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
155 virtual void get(
const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {})
override {
156 get(key, bindGetCb(cb), donecb, std::forward<Value::Filter>(f), std::forward<Where>(w));
158 virtual void get(
const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {})
override {
159 get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
171 virtual void query(
const InfoHash& key, QueryCallback cb, DoneCallback done_cb = {},
Query&& q = {})
override;
172 virtual void query(
const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {})
override {
173 query(key, cb, bindDoneCb(done_cb), std::forward<Query>(q));
194 DoneCallback cb=
nullptr,
195 time_point created=time_point::max(),
196 bool permanent =
false)
override;
199 DoneCallbackSimple cb,
200 time_point created=time_point::max(),
201 bool permanent =
false)
override
203 put(key, v, bindDoneCb(cb), created, permanent);
208 DoneCallback cb=
nullptr,
209 time_point created=time_point::max(),
210 bool permanent =
false)
override
212 put(key, std::make_shared<Value>(std::move(v)), cb, created, permanent);
214 void put(
const InfoHash& key,
216 DoneCallbackSimple cb,
217 time_point created=time_point::max(),
218 bool permanent =
false)
override
220 put(key, std::forward<Value>(v), bindDoneCb(cb), created, permanent);
249 return listen(key, [cb](
const std::vector<Sp<Value>>& vals,
bool expired){
253 }, std::forward<Value::Filter>(f), std::forward<Where>(w));
255 size_t listen(
const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w={})
override {
256 return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w));
259 bool cancelListen(
const InfoHash&,
size_t token)
override;
267 void connectivityChanged()
override {
268 reported_addr.clear();
269 connectivityChanged(AF_INET);
270 connectivityChanged(AF_INET6);
279 std::vector<ValuesExport> exportValues()
const override;
280 void importValues(
const std::vector<ValuesExport>&)
override;
282 void saveState(
const std::string& path)
const;
283 void loadState(
const std::string& path);
285 NodeStats getNodesStats(sa_family_t af)
const override;
287 std::string getStorageLog()
const override;
288 std::string getStorageLog(
const InfoHash&)
const override;
290 std::string getRoutingTablesLog(sa_family_t)
const override;
291 std::string getSearchesLog(sa_family_t)
const override;
292 std::string getSearchLog(
const InfoHash&, sa_family_t af = AF_UNSPEC)
const override;
294 void dumpTables()
const override;
295 std::vector<unsigned> getNodeMessageStats(
bool in =
false)
override {
296 return network_engine.getNodeMessageStats(in);
303 max_store_size = limit;
305 size_t getStorageLimit()
const override {
306 return max_store_size;
314 return {total_store_size, total_values};
317 std::vector<SockAddr> getPublicAddress(sa_family_t family = 0)
override;
320 void resubscribe(
unsigned) {}
327 static constexpr unsigned SEARCH_NODES {14};
337 static constexpr unsigned SEARCH_MAX_BAD_NODES {25};
340 static constexpr unsigned MAX_REQUESTED_SEARCH_NODES {4};
343 static constexpr unsigned LISTEN_NODES {4};
346 static constexpr unsigned MAX_HASHES {1024 * 1024 * 1024};
349 static constexpr unsigned MAX_SEARCHES {1024 * 1024};
351 static constexpr std::chrono::minutes MAX_STORAGE_MAINTENANCE_EXPIRE_TIME {10};
354 static constexpr std::chrono::minutes SEARCH_EXPIRE_TIME {62};
357 static constexpr duration LISTEN_EXPIRE_TIME {std::chrono::seconds(30)};
358 static constexpr duration LISTEN_EXPIRE_TIME_PUBLIC {std::chrono::minutes(5)};
360 static constexpr duration REANNOUNCE_MARGIN {std::chrono::seconds(10)};
362 static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10};
364 static constexpr size_t TOKEN_SIZE {32};
373 Dht(
const Dht&) =
delete;
374 Dht& operator=(
const Dht&) =
delete;
376 std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()};
381 uint64_t oldsecret {};
386 using SearchMap = std::map<InfoHash, Sp<Search>>;
388 RoutingTable buckets {};
389 SearchMap searches {};
390 unsigned pending_pings {0};
394 NodeStats getNodesStats(time_point now,
const InfoHash& myid)
const;
400 std::vector<std::pair<std::string,std::string>> bootstrap_nodes {};
401 std::chrono::steady_clock::duration bootstrap_period {BOOTSTRAP_PERIOD};
402 Sp<Scheduler::Job> bootstrapJob {};
404 std::map<InfoHash, Storage> store;
405 std::map<SockAddr, StorageBucket, SockAddr::ipCmp> store_quota;
406 size_t total_values {0};
407 size_t total_store_size {0};
408 size_t max_store_keys {MAX_HASHES};
409 size_t max_store_size {DEFAULT_STORAGE_LIMIT};
411 size_t max_searches {MAX_SEARCHES};
412 size_t search_id {0};
416 std::map<size_t, std::tuple<size_t, size_t, size_t>> listeners {};
417 size_t listener_token {1};
422 Sp<Scheduler::Job> nextNodesConfirmation {};
423 Sp<Scheduler::Job> nextStorageMaintenance {};
425 net::NetworkEngine network_engine;
426 using ReportedAddr = std::pair<unsigned, SockAddr>;
427 std::vector<ReportedAddr> reported_addr;
429 std::string persistPath;
435 const bool is_bootstrap {
false};
436 const bool maintain_storage {
false};
437 const bool public_stable {
false};
439 inline const duration& getListenExpiration()
const {
440 return public_stable ? LISTEN_EXPIRE_TIME_PUBLIC : LISTEN_EXPIRE_TIME;
443 void rotateSecrets();
445 Blob makeToken(
const SockAddr&,
bool old)
const;
446 bool tokenMatch(
const Blob& token,
const SockAddr&)
const;
448 void reportedAddr(
const SockAddr&);
451 void storageAddListener(
const InfoHash&
id,
const Sp<Node>& node,
size_t tid, Query&& = {},
int version = 0);
452 bool storageStore(
const InfoHash&
id,
const Sp<Value>& value, time_point created,
const SockAddr& sa = {},
bool permanent =
false);
453 bool storageRefresh(
const InfoHash&
id, Value::Id vid);
455 void expireStorage(InfoHash h);
456 void expireStore(
decltype(store)::iterator);
458 void storageRemoved(
const InfoHash&
id, Storage& st,
const std::vector<Sp<Value>>& values,
size_t totalSize);
459 void storageChanged(
const InfoHash&
id, Storage& st,
const Sp<Value>&,
bool newValue);
460 std::string printStorageLog(
const decltype(store)::value_type&)
const;
467 void dataPersistence(InfoHash
id);
468 size_t maintainStorage(
decltype(store)::value_type&,
bool force=
false,
const DoneCallback& donecb={});
471 Kad&
dht(sa_family_t af) {
return af == AF_INET ? dht4 : dht6; }
472 const Kad&
dht(sa_family_t af)
const {
return af == AF_INET ? dht4 : dht6; }
473 RoutingTable& buckets(sa_family_t af) {
return dht(af).buckets; }
474 const RoutingTable& buckets(sa_family_t af)
const {
return dht(af).buckets; }
475 Bucket* findBucket(
const InfoHash&
id, sa_family_t af) {
476 auto& b = buckets(af);
477 auto it = b.findBucket(
id);
478 return it == b.end() ? nullptr : &(*it);
480 const Bucket* findBucket(
const InfoHash&
id, sa_family_t af)
const {
481 return const_cast<Dht*
>(
this)->findBucket(
id, af);
484 void expireBuckets(RoutingTable&);
485 void sendCachedPing(Bucket& b);
486 bool bucketMaintenance(RoutingTable&);
487 void dumpBucket(
const Bucket& b, std::ostream& out)
const;
489 void startBootstrap();
490 void stopBootstrap();
493 void onNewNode(
const Sp<Node>& node,
int confirm);
494 const Sp<Node> findNode(
const InfoHash&
id, sa_family_t af)
const;
495 bool trySearchInsert(
const Sp<Node>& node);
498 inline SearchMap& searches(sa_family_t af) {
return dht(af).searches; }
499 inline const SearchMap& searches(sa_family_t af)
const {
return dht(af).searches; }
505 Sp<Search> search(
const InfoHash&
id, sa_family_t af, GetCallback = {}, QueryCallback = {}, DoneCallback = {}, Value::Filter = {},
const Sp<Query>& q = {});
507 void announce(
const InfoHash&
id, sa_family_t af, Sp<Value> value, DoneCallback callback, time_point created=time_point::max(),
bool permanent =
false);
508 size_t listenTo(
const InfoHash&
id, sa_family_t af, ValueCallback cb, Value::Filter f = {},
const Sp<Query>& q = {});
517 unsigned refill(Search& sr);
518 void expireSearches();
524 void onDisconnected();
534 void searchNodeGetDone(
const net::Request& status,
535 net::RequestAnswer&& answer,
536 std::weak_ptr<Search> ws,
548 void searchNodeGetExpired(
const net::Request& status,
bool over, std::weak_ptr<Search> ws, Sp<Query> query);
557 void paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n);
562 SearchNode* searchSendGetValues(Sp<Search> sr, SearchNode *n =
nullptr,
bool update =
true);
570 void searchSendAnnounceValue(
const Sp<Search>& sr);
579 void searchStep(std::weak_ptr<Search> ws);
581 void searchSynchedNodeListen(
const Sp<Search>&, SearchNode&);
583 void dumpSearch(
const Search& sr, std::ostream& out)
const;
585 bool neighbourhoodMaintenance(RoutingTable&);
587 void onError(Sp<net::Request> node, net::DhtProtocolException e);
589 void onReportedAddr(
const InfoHash&
id,
const SockAddr&);
591 net::RequestAnswer onPing(Sp<Node> node);
593 net::RequestAnswer onFindNode(Sp<Node> node,
const InfoHash& hash, want_t want);
594 void onFindNodeDone(
const Sp<Node>& status,
595 net::RequestAnswer& a,
598 net::RequestAnswer onGetValues(Sp<Node> node,
599 const InfoHash& hash,
602 void onGetValuesDone(
const Sp<Node>& status,
603 net::RequestAnswer& a,
605 const Sp<Query>& orig_query);
607 net::RequestAnswer onListen(Sp<Node> node,
608 const InfoHash& hash,
613 void onListenDone(
const Sp<Node>& status,
614 net::RequestAnswer& a,
617 net::RequestAnswer onAnnounce(Sp<Node> node,
618 const InfoHash& hash,
620 const std::vector<Sp<Value>>& v,
621 const time_point& created);
622 net::RequestAnswer onRefresh(Sp<Node> node,
623 const InfoHash& hash,
625 const Value::Id& vid);
626 void onAnnounceDone(
const Sp<Node>& status,
627 net::RequestAnswer& a,
void insertNode(const InfoHash &id, const SockAddr &) override
void setStorageLimit(size_t limit=DEFAULT_STORAGE_LIMIT) override
NodeStatus updateStatus(sa_family_t af) override
NodeStatus getStatus(sa_family_t af) const override
std::pair< size_t, size_t > getStoreSize() const override
Sp< Value > getLocalById(const InfoHash &key, Value::Id vid) const override
std::vector< Sp< Value > > getPut(const InfoHash &) const override
std::vector< NodeExport > exportNodes() const override
size_t listen(const InfoHash &, ValueCallback, Value::Filter={}, Where={}) override
bool cancelPut(const InfoHash &, const Value::Id &) override
void put(const InfoHash &key, Sp< Value >, DoneCallback cb=nullptr, time_point created=time_point::max(), bool permanent=false) override
void connectivityChanged(sa_family_t) override
void pushNotificationReceived(const std::map< std::string, std::string > &) override
size_t listen(const InfoHash &key, GetCallback cb, Value::Filter f={}, Where w={}) override
bool isRunning(sa_family_t af=0) const override
Sp< Value > getPut(const InfoHash &, const Value::Id &) const override
void shutdown(ShutdownCallback cb, bool stop=false) override
virtual void get(const InfoHash &key, GetCallback cb, DoneCallback donecb={}, Value::Filter &&f={}, Where &&w={}) override
std::vector< Sp< Value > > getLocal(const InfoHash &key, const Value::Filter &f={}) const override
virtual void query(const InfoHash &key, QueryCallback cb, DoneCallback done_cb={}, Query &&q={}) override
const InfoHash & getNodeId() const override
Dht(std::unique_ptr< net::DatagramSocket > &&sock, const Config &config, const Sp< Logger > &l={})
std::vector< uint8_t > Blob
Describes a query destined to another peer.
Serializable dht::Value filter.