28#include "dht_interface.h"
32#include <restinio/all.hpp>
56 std::shared_ptr<crypto::Certificate> serverCA, crypto::Identity clientIdentity,
57 std::function<
void()> loopSignal,
const std::string& serverHost,
58 const std::string& pushClientId =
"", std::shared_ptr<Logger> logger = {});
62 virtual void setPushNotificationToken(
const std::string& token)
override {
63#ifdef OPENDHT_PUSH_NOTIFICATIONS
82 return std::max(getStatus(AF_INET), getStatus(AF_INET6));
88 void shutdown(ShutdownCallback cb,
bool)
override;
109 virtual void get(
const InfoHash& key, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f={}, Where&& w = {})
override {
110 get(key, cb, bindDoneCb(std::move(donecb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
112 virtual void get(
const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {})
override {
113 get(key, bindGetCb(cb), std::move(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
115 virtual void get(
const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {})
override {
116 get(key, bindGetCb(cb), bindDoneCb(std::move(donecb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
128 DoneCallback cb=
nullptr,
129 time_point created=time_point::max(),
130 bool permanent =
false)
override;
133 DoneCallbackSimple cb,
134 time_point created=time_point::max(),
135 bool permanent =
false)
override
137 put(key, v, bindDoneCb(std::move(cb)), created, permanent);
142 DoneCallback cb=
nullptr,
143 time_point created=time_point::max(),
144 bool permanent =
false)
override
146 put(key, std::make_shared<Value>(std::move(v)), std::move(cb), created, permanent);
148 void put(
const InfoHash& key,
150 DoneCallbackSimple cb,
151 time_point created=time_point::max(),
152 bool permanent =
false)
override
154 put(key, std::forward<Value>(v), bindDoneCb(std::move(cb)), created, permanent);
179 return listen(key, [cb=std::move(cb)](
const std::vector<Sp<Value>>& vals,
bool expired){
183 }, std::forward<Value::Filter>(f), std::forward<Where>(w));
185 virtual size_t listen(
const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w={})
override {
186 return listen(key, bindGetCb(std::move(cb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
192 virtual bool cancelListen(
const InfoHash& key,
size_t token)
override;
200 time_point periodic(
const uint8_t*,
size_t,
SockAddr,
const time_point& now)
override;
201 time_point periodic(
const uint8_t* buf,
size_t buflen,
const sockaddr* from, socklen_t fromlen,
const time_point& now)
override {
202 return periodic(buf, buflen,
SockAddr(from, fromlen), now);
216 virtual void query(
const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {})
override {
217 query(key, cb, bindDoneCb(std::move(done_cb)), std::forward<Query>(q));
236 void pingNode(
SockAddr, DoneCallbackSimple&& ={})
override { }
238 virtual void registerType(
const ValueType& type)
override {
239 types.registerType(type);
241 const ValueType& getType(ValueType::Id type_id)
const override {
242 return types.getType(type_id);
253 void insertNode(
const NodeExport&)
override { }
255 std::vector<NodeExport>
exportNodes()
const override {
return {}; }
256 std::vector<ValuesExport> exportValues()
const override {
return {}; }
257 void importValues(
const std::vector<ValuesExport>&)
override {}
258 std::string getStorageLog()
const override {
return {}; }
259 std::string getStorageLog(
const InfoHash&)
const override {
return {}; }
260 std::string getRoutingTablesLog(sa_family_t)
const override {
return {}; }
261 std::string getSearchesLog(sa_family_t)
const override {
return {}; }
262 std::string getSearchLog(
const InfoHash&, sa_family_t)
const override {
return {}; }
263 void dumpTables()
const override {}
264 std::vector<unsigned> getNodeMessageStats(
bool)
override {
return {}; }
266 virtual size_t getStorageLimit()
const {
return 0; }
270 void connectivityChanged()
override {
287 void getProxyInfos();
288 void queryProxyInfo(
const std::shared_ptr<InfoState>& infoState,
const std::shared_ptr<http::Resolver>& resolver, sa_family_t family);
289 void onProxyInfos(
const Json::Value& val,
const sa_family_t family);
290 SockAddr parsePublicAddress(
const Json::Value& val);
294 void handleExpireListener(
const asio::error_code &ec,
const InfoHash& key);
297 struct OperationState;
298 enum class ListenMethod {
303 using CacheValueCallback = std::function<bool(
const std::vector<std::shared_ptr<Value>>& values,
bool expired, system_clock::time_point)>;
308 void sendListen(
const restinio::http_request_header_t& header,
const CacheValueCallback& cb,
309 const Sp<OperationState>& opstate, Listener& listener, ListenMethod method = ListenMethod::LISTEN);
310 void handleResubscribe(
const asio::error_code& ec,
const InfoHash& key,
311 const size_t token, std::shared_ptr<OperationState> opstate);
313 void doPut(
const InfoHash&, Sp<Value>, DoneCallbackSimple, time_point created,
bool permanent);
314 void handleRefreshPut(
const asio::error_code& ec, InfoHash key, Value::Id
id);
319 void getConnectivityStatus();
323 void cancelAllListeners();
325 std::atomic_bool isDestroying_ {
false};
327 std::string proxyUrl_;
328 dht::crypto::Identity clientIdentity_;
329 std::shared_ptr<dht::crypto::Certificate> serverCertificate_;
331 std::string pushClientId_;
332 std::string pushSessionId_;
334 mutable std::mutex lockCurrentProxyInfos_;
335 NodeStatus statusIpv4_ {NodeStatus::Disconnected};
336 NodeStatus statusIpv6_ {NodeStatus::Disconnected};
337 NodeStats stats4_ {};
338 NodeStats stats6_ {};
339 SockAddr publicAddressV4_;
340 SockAddr publicAddressV6_;
341 std::atomic_bool launchConnectedCbs_ {
false};
352 asio::io_context httpContext_;
353 std::shared_ptr<http::Resolver> resolver_;
355 mutable std::mutex requestLock_;
356 std::map<unsigned, std::shared_ptr<http::Request>> requests_;
360 std::thread httpClientThread_;
367 mutable std::mutex searchLock_;
368 size_t listenerToken_ {0};
369 std::map<InfoHash, ProxySearch> searches_;
374 std::mutex lockCallbacks_;
375 std::vector<std::function<void()>> callbacks_;
377 Sp<InfoState> infoState_;
382 void handleProxyConfirm(
const asio::error_code &ec);
383 std::unique_ptr<asio::steady_timer> nextProxyConfirmationTimer_;
384 std::unique_ptr<asio::steady_timer> listenerRestartTimer_;
389 void restartListeners(
const asio::error_code &ec);
395 void resubscribe(
const InfoHash& key,
const size_t token, Listener& listener);
401 std::string deviceKey_ {};
403 const std::function<void()> loopSignal_;
405#ifdef OPENDHT_PUSH_NOTIFICATIONS
406 std::string fillBody(
bool resubscribe);
407 void getPushRequest(Json::Value&)
const;
410 Json::StreamWriterBuilder jsonBuilder_;
411 std::unique_ptr<Json::CharReader> jsonReader_;
413 std::shared_ptr<http::Request> buildRequest(
const std::string& target = {});
bool cancelPut(const InfoHash &, const Value::Id &) override
void insertNode(const InfoHash &, const SockAddr &) override
NodeStatus getStatus(sa_family_t af) const override
Sp< Value > getPut(const InfoHash &, const Value::Id &) const override
virtual void query(const InfoHash &, QueryCallback, DoneCallback={}, Query &&={}) override
std::vector< Sp< Value > > getPut(const InfoHash &) const override
const InfoHash & getNodeId() const override
virtual void get(const InfoHash &key, GetCallback cb, DoneCallback donecb={}, Value::Filter &&f={}, Where &&w={}) override
virtual size_t listen(const InfoHash &, ValueCallback, Value::Filter={}, Where={}) override
bool isRunning(sa_family_t af=0) const override
Sp< Value > getLocalById(const InfoHash &k, Value::Id id) const override
std::pair< size_t, size_t > getStoreSize() const override
void setStorageLimit(size_t) override
std::vector< NodeExport > exportNodes() const override
std::vector< Sp< Value > > getLocal(const InfoHash &k, const Value::Filter &filter) const override
std::vector< SockAddr > getPublicAddress(sa_family_t family=0) override
void connectivityChanged(sa_family_t) override
void put(const InfoHash &key, Sp< Value >, DoneCallback cb=nullptr, time_point created=time_point::max(), bool permanent=false) override
virtual size_t listen(const InfoHash &key, GetCallback cb, Value::Filter f={}, Where w={}) override
NodeStats getNodesStats(sa_family_t af) const override
void shutdown(ShutdownCallback cb, bool) override
void pushNotificationReceived(const std::map< std::string, std::string > ¬ification) override
Describes a query destined to another peer.
Serializable dht::Value filter.