28#include "log_enable.h"
29#include "network_utils.h"
34#include <condition_variable>
45struct SecureDhtConfig;
61 std::string proxy_server {};
62 std::string push_node_id {};
63 std::string push_token {};
64 bool peer_discovery {
false};
65 bool peer_publish {
false};
66 std::shared_ptr<dht::crypto::Certificate> server_ca;
67 dht::crypto::Identity client_identity;
72 std::shared_ptr<Logger> logger {};
73 std::unique_ptr<net::DatagramSocket> sock;
74 std::shared_ptr<PeerDiscovery> peerDiscovery {};
75 StatusCallback statusChangedCallback {};
76 CertificateStoreQuery certificateStore {};
77 IdentityAnnouncedCb identityAnnouncedCb {};
84 void get(
InfoHash id, GetCallbackSimple cb, DoneCallback donecb={},
Value::Filter f = {}, Where w = {}) {
85 get(
id, bindGetCb(cb), donecb, f, w);
88 void get(InfoHash
id, GetCallbackSimple cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) {
89 get(
id, bindGetCb(cb), donecb, f, w);
92 void get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f={}, Where w = {});
94 void get(InfoHash
id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) {
95 get(
id, cb, bindDoneCb(donecb), f, w);
97 void get(
const std::string& key, GetCallback vcb, DoneCallbackSimple dcb={}, Value::Filter f = {}, Where w = {});
100 void get(InfoHash hash, std::function<
bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb={})
102 get(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
103 return cb(unpackVector<T>(vals));
109 void get(InfoHash hash, std::function<
bool(T&&)> cb, DoneCallbackSimple dcb={})
111 get(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
112 for (
const auto& v : vals) {
114 if (not cb(Value::unpack<T>(*v)))
116 }
catch (
const std::exception&) {
126 std::future<std::vector<std::shared_ptr<dht::Value>>> get(InfoHash key, Value::Filter f = {}, Where w = {}) {
127 auto p = std::make_shared<std::promise<std::vector<std::shared_ptr< dht::Value >>>>();
128 auto values = std::make_shared<std::vector<std::shared_ptr< dht::Value >>>();
129 get(key, [=](
const std::vector<std::shared_ptr<dht::Value>>& vlist) {
130 values->insert(values->end(), vlist.begin(), vlist.end());
133 p->set_value(std::move(*values));
136 return p->get_future();
140 std::future<std::vector<T>> get(InfoHash key) {
141 auto p = std::make_shared<std::promise<std::vector<T>>>();
142 auto values = std::make_shared<std::vector<T>>();
143 get<T>(key, [=](T&& v) {
144 values->emplace_back(std::move(v));
147 p->set_value(std::move(*values));
149 return p->get_future();
152 void query(
const InfoHash& hash, QueryCallback cb, DoneCallback done_cb = {}, Query q = {});
153 void query(
const InfoHash& hash, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query q = {}) {
154 query(hash, cb, bindDoneCb(done_cb), q);
157 std::future<size_t> listen(InfoHash key, ValueCallback vcb, Value::Filter f = {}, Where w = {});
159 std::future<size_t> listen(InfoHash key, GetCallback cb, Value::Filter f={}, Where w={}) {
160 return listen(key, [cb=std::move(cb)](
const std::vector<Sp<Value>>& vals,
bool expired){
164 }, std::forward<Value::Filter>(f), std::forward<Where>(w));
166 std::future<size_t> listen(
const std::string& key, GetCallback vcb, Value::Filter f = {}, Where w = {});
167 std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = {}, Where w = {}) {
168 return listen(key, bindGetCb(cb), f, w);
172 std::future<size_t> listen(InfoHash hash, std::function<
bool(std::vector<T>&&)> cb)
174 return listen(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
175 return cb(unpackVector<T>(vals));
180 std::future<size_t> listen(InfoHash hash, std::function<
bool(std::vector<T>&&,
bool)> cb)
182 return listen(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals,
bool expired) {
183 return cb(unpackVector<T>(vals), expired);
188 template <
typename T>
189 std::future<size_t> listen(InfoHash hash, std::function<
bool(T&&)> cb, Value::Filter f = {}, Where w = {})
191 return listen(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
192 for (
const auto& v : vals) {
194 if (not cb(Value::unpack<T>(*v)))
196 }
catch (
const std::exception&) {
202 getFilterSet<T>(f), w);
204 template <
typename T>
205 std::future<size_t> listen(InfoHash hash, std::function<
bool(T&&,
bool)> cb, Value::Filter f = {}, Where w = {})
207 return listen(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals,
bool expired) {
208 for (
const auto& v : vals) {
210 if (not cb(Value::unpack<T>(*v), expired))
212 }
catch (
const std::exception&) {
218 getFilterSet<T>(f), w);
221 void cancelListen(InfoHash h,
size_t token);
222 void cancelListen(InfoHash h, std::shared_future<size_t> token);
224 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, time_point created=time_point::max(),
bool permanent =
false);
225 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, time_point created=time_point::max(),
bool permanent =
false) {
226 put(hash, value, bindDoneCb(cb), created, permanent);
229 void put(InfoHash hash, Value&& value, DoneCallback cb={}, time_point created=time_point::max(),
bool permanent =
false);
230 void put(InfoHash hash, Value&& value, DoneCallbackSimple cb, time_point created=time_point::max(),
bool permanent =
false) {
231 put(hash, std::forward<Value>(value), bindDoneCb(cb), created, permanent);
233 void put(
const std::string& key, Value&& value, DoneCallbackSimple cb={}, time_point created=time_point::max(),
bool permanent =
false);
235 void cancelPut(
const InfoHash& h, Value::Id
id);
236 void cancelPut(
const InfoHash& h,
const std::shared_ptr<Value>& value);
238 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={},
bool permanent =
false);
239 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb,
bool permanent =
false) {
240 putSigned(hash, value, bindDoneCb(cb), permanent);
243 void putSigned(InfoHash hash, Value&& value, DoneCallback cb={},
bool permanent =
false);
244 void putSigned(InfoHash hash, Value&& value, DoneCallbackSimple cb,
bool permanent =
false) {
245 putSigned(hash, std::forward<Value>(value), bindDoneCb(cb), permanent);
247 void putSigned(
const std::string& key, Value&& value, DoneCallbackSimple cb={},
bool permanent =
false);
249 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb={},
bool permanent =
false);
250 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallbackSimple cb,
bool permanent =
false) {
251 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
254 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb={},
bool permanent =
false);
255 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallbackSimple cb,
bool permanent =
false) {
256 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
258 void putEncrypted(
const std::string& key, InfoHash to, Value&& value, DoneCallback cb={},
bool permanent =
false);
260 void putEncrypted(InfoHash hash,
const std::shared_ptr<crypto::PublicKey>& to, std::shared_ptr<Value> value, DoneCallback cb={},
bool permanent =
false);
261 void putEncrypted(InfoHash hash,
const std::shared_ptr<crypto::PublicKey>& to, std::shared_ptr<Value> value, DoneCallbackSimple cb,
bool permanent =
false) {
262 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
265 void putEncrypted(InfoHash hash,
const std::shared_ptr<crypto::PublicKey>& to, Value&& value, DoneCallback cb={},
bool permanent =
false);
266 void putEncrypted(InfoHash hash,
const std::shared_ptr<crypto::PublicKey>& to, Value&& value, DoneCallbackSimple cb,
bool permanent =
false) {
267 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
275 void bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple cb={});
276 void bootstrap(
SockAddr addr, DoneCallbackSimple cb={});
290 void bootstrap(
const std::string& host,
const std::string& service);
291 void bootstrap(
const std::string& hostService);
311 void dumpTables()
const;
317 std::shared_ptr<crypto::PublicKey> getPublicKey()
const;
336 std::pair<size_t, size_t> getStoreSize()
const;
338 void getStorageLimit()
const;
339 void setStorageLimit(
size_t limit = DEFAULT_STORAGE_LIMIT);
341 std::vector<NodeExport> exportNodes()
const;
343 std::vector<ValuesExport> exportValues()
const;
345 void setLogger(
const Sp<Logger>& logger = {});
346 void setLogger(
const Logger& logger) {
347 setLogger(std::make_shared<Logger>(logger));
349 void setLoggers(LogMethod err = {}, LogMethod warn = {}, LogMethod debug = {});
356 void registerType(
const ValueType& type);
358 void importValues(
const std::vector<ValuesExport>& values);
360 bool isRunning()
const {
361 return running != State::Idle;
364 NodeStats getNodesStats(sa_family_t af)
const;
365 unsigned getNodesStats(sa_family_t af,
unsigned *good_return,
unsigned *dubious_return,
unsigned *cached_return,
unsigned *incoming_return)
const;
366 NodeInfo getNodeInfo()
const;
367 void getNodeInfo(std::function<
void(std::shared_ptr<NodeInfo>)>);
369 std::vector<unsigned> getNodeMessageStats(
bool in =
false)
const;
370 std::string getStorageLog()
const;
371 std::string getStorageLog(
const InfoHash&)
const;
372 std::string getRoutingTablesLog(sa_family_t af)
const;
373 std::string getSearchesLog(sa_family_t af = AF_UNSPEC)
const;
374 std::string getSearchLog(
const InfoHash&, sa_family_t af = AF_UNSPEC)
const;
375 std::vector<SockAddr> getPublicAddress(sa_family_t af = AF_UNSPEC);
376 std::vector<std::string> getPublicAddressStr(sa_family_t af = AF_UNSPEC);
377 void getPublicAddress(std::function<
void(std::vector<SockAddr>&&)>, sa_family_t af = AF_UNSPEC);
381 void findCertificate(InfoHash hash, std::function<
void(
const std::shared_ptr<crypto::Certificate>&)>);
382 void registerCertificate(std::shared_ptr<crypto::Certificate> cert);
383 void setLocalCertificateStore(CertificateStoreQuery&& query_method);
391 void run(in_port_t port = dht::net::DHT_DEFAULT_PORT,
const crypto::Identity& identity = {},
bool threaded =
true, NetId network = 0) {
393 config.dht_config.node_config.
network = network;
394 config.dht_config.id = identity;
395 config.threaded = threaded;
398 void run(in_port_t port, Config& config, Context&& context = {});
403 void run(
const char* ip4,
const char* ip6,
const char* service,
Config& config,
Context&& context = {});
405 void run(
const Config& config, Context&& context);
407 void setOnStatusChanged(StatusCallback&& cb) {
408 statusCb = std::move(cb);
417 std::lock_guard<std::mutex> lck(dht_mtx);
424 void shutdown(ShutdownCallback cb = {},
bool stop =
false);
433 std::shared_ptr<PeerDiscovery> getPeerDiscovery()
const {
return peerDiscovery_; };
435 void setProxyServer(
const std::string& proxy,
const std::string& pushNodeId =
"");
457 void forwardAllMessages(
bool forward);
469 return std::max(status4, status6);
472 bool checkShutdown();
474 DoneCallback bindOpDoneCallback(DoneCallback&& cb);
475 DoneCallbackSimple bindOpDoneCallback(DoneCallbackSimple&& cb);
478 std::unique_ptr<SecureDht> dht_;
481 std::unique_ptr<SecureDht> dht_via_proxy_;
484 std::atomic_bool use_proxy {
false};
488 IdentityAnnouncedCb identityAnnouncedCb_;
497 SecureDht* activeDht()
const;
503 std::map<size_t, Listener> listeners_;
504 size_t listener_token_ {1};
506 mutable std::mutex dht_mtx {};
507 std::thread dht_thread {};
508 std::condition_variable cv {};
509 std::mutex sock_mtx {};
510 net::PacketList rcv {};
511 decltype(rcv) rcv_free {};
513 std::queue<std::function<void(SecureDht&)>> pending_ops_prio {};
514 std::queue<std::function<void(SecureDht&)>> pending_ops {};
515 std::mutex storage_mtx {};
517 std::atomic<State> running {State::Idle};
518 std::atomic_size_t ongoing_ops {0};
519 std::vector<ShutdownCallback> shutdownCallbacks_;
521 NodeStatus status4 {NodeStatus::Disconnected},
522 status6 {NodeStatus::Disconnected};
523 StatusCallback statusCb {
nullptr};
526 std::shared_ptr<PeerDiscovery> peerDiscovery_;
532 std::shared_ptr<dht::Logger> logger_;
in_port_t getBoundPort(sa_family_t f=AF_INET) const
void shutdown(ShutdownCallback cb={}, bool stop=false)
void bootstrap(const std::string &host, const std::string &service)
void setPushNotificationToken(const std::string &token)
void connectivityChanged()
InfoHash getNodeId() const
void pushNotificationReceived(const std::map< std::string, std::string > &data)
void run(in_port_t port=dht::net::DHT_DEFAULT_PORT, const crypto::Identity &identity={}, bool threaded=true, NetId network=0)
void bootstrap(std::vector< SockAddr > nodes, DoneCallbackSimple cb={})
void setLogFilter(const InfoHash &f={})
SockAddr getBound(sa_family_t f=AF_INET) const
void run(const char *ip4, const char *ip6, const char *service, Config &config, Context &&context={})
void enableProxy(bool proxify)
void bootstrap(const InfoHash &id, const SockAddr &address)
void bootstrap(std::vector< NodeExport > nodes)