My Project 2.4.4
C++ Distributed Hash Table
Loading...
Searching...
No Matches
dht_proxy_client.h
1/*
2 * Copyright (C) 2014-2022 Savoir-faire Linux Inc.
3 * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4 * Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5 * Vsevolod Ivanov <vsevolod.ivanov@savoirfairelinux.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include <functional>
24#include <mutex>
25
26#include "callbacks.h"
27#include "def.h"
28#include "dht_interface.h"
29#include "proxy.h"
30#include "http.h"
31
32#include <restinio/all.hpp>
33#include <json/json.h>
34
35#include <chrono>
36#include <vector>
37#include <functional>
38
39namespace Json {
40class Value;
41}
42
43namespace http {
44class Resolver;
45class Request;
46}
47
48namespace dht {
49
50class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
51public:
52
54
55 explicit DhtProxyClient(
56 std::shared_ptr<crypto::Certificate> serverCA, crypto::Identity clientIdentity,
57 std::function<void()> loopSignal, const std::string& serverHost,
58 const std::string& pushClientId = "", std::shared_ptr<Logger> logger = {});
59
60 void setHeaderFields(http::Request& request);
61
62 virtual void setPushNotificationToken(const std::string& token) override {
63#ifdef OPENDHT_PUSH_NOTIFICATIONS
64 deviceKey_ = token;
65#else
66 (void) token;
67#endif
68 }
69
70 virtual ~DhtProxyClient();
71
75 inline const InfoHash& getNodeId() const override { return myid; }
76
80 NodeStatus getStatus(sa_family_t af) const override;
81 NodeStatus getStatus() const override {
82 return std::max(getStatus(AF_INET), getStatus(AF_INET6));
83 }
84
88 void shutdown(ShutdownCallback cb, bool) override;
89
96 bool isRunning(sa_family_t af = 0) const override;
97
108 virtual void get(const InfoHash& key, GetCallback cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) override;
109 virtual void get(const InfoHash& key, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f={}, Where&& w = {}) override {
110 get(key, cb, bindDoneCb(std::move(donecb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
111 }
112 virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) override {
113 get(key, bindGetCb(cb), std::move(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
114 }
115 virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) override {
116 get(key, bindGetCb(cb), bindDoneCb(std::move(donecb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
117 }
118
126 void put(const InfoHash& key,
127 Sp<Value>,
128 DoneCallback cb=nullptr,
129 time_point created=time_point::max(),
130 bool permanent = false) override;
131 void put(const InfoHash& key,
132 const Sp<Value>& v,
133 DoneCallbackSimple cb,
134 time_point created=time_point::max(),
135 bool permanent = false) override
136 {
137 put(key, v, bindDoneCb(std::move(cb)), created, permanent);
138 }
139
140 void put(const InfoHash& key,
141 Value&& v,
142 DoneCallback cb=nullptr,
143 time_point created=time_point::max(),
144 bool permanent = false) override
145 {
146 put(key, std::make_shared<Value>(std::move(v)), std::move(cb), created, permanent);
147 }
148 void put(const InfoHash& key,
149 Value&& v,
150 DoneCallbackSimple cb,
151 time_point created=time_point::max(),
152 bool permanent = false) override
153 {
154 put(key, std::forward<Value>(v), bindDoneCb(std::move(cb)), created, permanent);
155 }
156
161 NodeStats getNodesStats(sa_family_t af) const override;
162
167 std::vector<SockAddr> getPublicAddress(sa_family_t family = 0) override;
168
176 virtual size_t listen(const InfoHash&, ValueCallback, Value::Filter={}, Where={}) override;
177
178 virtual size_t listen(const InfoHash& key, GetCallback cb, Value::Filter f={}, Where w={}) override {
179 return listen(key, [cb=std::move(cb)](const std::vector<Sp<Value>>& vals, bool expired){
180 if (not expired)
181 return cb(vals);
182 return true;
183 }, std::forward<Value::Filter>(f), std::forward<Where>(w));
184 }
185 virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w={}) override {
186 return listen(key, bindGetCb(std::move(cb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
187 }
188 /*
189 * This function relies on the cache implementation.
190 * It means that there are no true cancel here, it keeps the caching in higher priority.
191 */
192 virtual bool cancelListen(const InfoHash& key, size_t token) override;
193
198 void pushNotificationReceived(const std::map<std::string, std::string>& notification) override;
199
200 time_point periodic(const uint8_t*, size_t, SockAddr, const time_point& now) override;
201 time_point periodic(const uint8_t* buf, size_t buflen, const sockaddr* from, socklen_t fromlen, const time_point& now) override {
202 return periodic(buf, buflen, SockAddr(from, fromlen), now);
203 }
204
215 virtual void query(const InfoHash& /*key*/, QueryCallback /*cb*/, DoneCallback /*done_cb*/ = {}, Query&& /*q*/ = {}) override { }
216 virtual void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) override {
217 query(key, cb, bindDoneCb(std::move(done_cb)), std::forward<Query>(q));
218 }
219
223 std::vector<Sp<Value>> getPut(const InfoHash&) const override;
224
228 Sp<Value> getPut(const InfoHash&, const Value::Id&) const override;
229
234 bool cancelPut(const InfoHash&, const Value::Id&) override;
235
236 void pingNode(SockAddr, DoneCallbackSimple&& /*cb*/={}) override { }
237
238 virtual void registerType(const ValueType& type) override {
239 types.registerType(type);
240 }
241 const ValueType& getType(ValueType::Id type_id) const override {
242 return types.getType(type_id);
243 }
244
245 std::vector<Sp<Value>> getLocal(const InfoHash& k, const Value::Filter& filter) const override;
246 Sp<Value> getLocalById(const InfoHash& k, Value::Id id) const override;
247
252 void insertNode(const InfoHash&, const SockAddr&) override { }
253 void insertNode(const NodeExport&) override { }
254 std::pair<size_t, size_t> getStoreSize() const override { return {}; }
255 std::vector<NodeExport> exportNodes() const override { return {}; }
256 std::vector<ValuesExport> exportValues() const override { return {}; }
257 void importValues(const std::vector<ValuesExport>&) override {}
258 std::string getStorageLog() const override { return {}; }
259 std::string getStorageLog(const InfoHash&) const override { return {}; }
260 std::string getRoutingTablesLog(sa_family_t) const override { return {}; }
261 std::string getSearchesLog(sa_family_t) const override { return {}; }
262 std::string getSearchLog(const InfoHash&, sa_family_t) const override { return {}; }
263 void dumpTables() const override {}
264 std::vector<unsigned> getNodeMessageStats(bool) override { return {}; }
265 void setStorageLimit(size_t) override {}
266 virtual size_t getStorageLimit() const { return 0; }
267 void connectivityChanged(sa_family_t) override {
268 getProxyInfos();
269 }
270 void connectivityChanged() override {
271 getProxyInfos();
272 loopSignal_();
273 }
274
275private:
279 void startProxy();
280 void stop();
281
286 struct InfoState;
287 void getProxyInfos();
288 void queryProxyInfo(const std::shared_ptr<InfoState>& infoState, const std::shared_ptr<http::Resolver>& resolver, sa_family_t family);
289 void onProxyInfos(const Json::Value& val, const sa_family_t family);
290 SockAddr parsePublicAddress(const Json::Value& val);
291
292 void opFailed();
293
294 void handleExpireListener(const asio::error_code &ec, const InfoHash& key);
295
296 struct Listener;
297 struct OperationState;
298 enum class ListenMethod {
299 LISTEN,
300 SUBSCRIBE,
301 RESUBSCRIBE,
302 };
303 using CacheValueCallback = std::function<bool(const std::vector<std::shared_ptr<Value>>& values, bool expired, system_clock::time_point)>;
304
308 void sendListen(const restinio::http_request_header_t& header, const CacheValueCallback& cb,
309 const Sp<OperationState>& opstate, Listener& listener, ListenMethod method = ListenMethod::LISTEN);
310 void handleResubscribe(const asio::error_code& ec, const InfoHash& key,
311 const size_t token, std::shared_ptr<OperationState> opstate);
312
313 void doPut(const InfoHash&, Sp<Value>, DoneCallbackSimple, time_point created, bool permanent);
314 void handleRefreshPut(const asio::error_code& ec, InfoHash key, Value::Id id);
315
319 void getConnectivityStatus();
323 void cancelAllListeners();
324
325 std::atomic_bool isDestroying_ {false};
326
327 std::string proxyUrl_;
328 dht::crypto::Identity clientIdentity_;
329 std::shared_ptr<dht::crypto::Certificate> serverCertificate_;
330 //std::pair<std::string, std::string> serverHostService_;
331 std::string pushClientId_;
332 std::string pushSessionId_;
333
334 mutable std::mutex lockCurrentProxyInfos_;
335 NodeStatus statusIpv4_ {NodeStatus::Disconnected};
336 NodeStatus statusIpv6_ {NodeStatus::Disconnected};
337 NodeStats stats4_ {};
338 NodeStats stats6_ {};
339 SockAddr publicAddressV4_;
340 SockAddr publicAddressV6_;
341 std::atomic_bool launchConnectedCbs_ {false};
342
343 InfoHash myid {};
344
345 // registred types
346 TypeStore types;
347
348 /*
349 * ASIO I/O Context for sockets in httpClient_
350 * Note: Each context is used in one thread only
351 */
352 asio::io_context httpContext_;
353 std::shared_ptr<http::Resolver> resolver_;
354
355 mutable std::mutex requestLock_;
356 std::map<unsigned, std::shared_ptr<http::Request>> requests_;
357 /*
358 * Thread for executing the http io_context.run() blocking call
359 */
360 std::thread httpClientThread_;
361
365 struct ProxySearch;
366
367 mutable std::mutex searchLock_;
368 size_t listenerToken_ {0};
369 std::map<InfoHash, ProxySearch> searches_;
370
374 std::mutex lockCallbacks_;
375 std::vector<std::function<void()>> callbacks_;
376
377 Sp<InfoState> infoState_;
378
382 void handleProxyConfirm(const asio::error_code &ec);
383 std::unique_ptr<asio::steady_timer> nextProxyConfirmationTimer_;
384 std::unique_ptr<asio::steady_timer> listenerRestartTimer_;
385
389 void restartListeners(const asio::error_code &ec);
390
395 void resubscribe(const InfoHash& key, const size_t token, Listener& listener);
396
401 std::string deviceKey_ {};
402
403 const std::function<void()> loopSignal_;
404
405#ifdef OPENDHT_PUSH_NOTIFICATIONS
406 std::string fillBody(bool resubscribe);
407 void getPushRequest(Json::Value&) const;
408#endif // OPENDHT_PUSH_NOTIFICATIONS
409
410 Json::StreamWriterBuilder jsonBuilder_;
411 std::unique_ptr<Json::CharReader> jsonReader_;
412
413 std::shared_ptr<http::Request> buildRequest(const std::string& target = {});
414};
415
416}
bool cancelPut(const InfoHash &, const Value::Id &) override
void insertNode(const InfoHash &, const SockAddr &) override
NodeStatus getStatus(sa_family_t af) const override
Sp< Value > getPut(const InfoHash &, const Value::Id &) const override
virtual void query(const InfoHash &, QueryCallback, DoneCallback={}, Query &&={}) override
std::vector< Sp< Value > > getPut(const InfoHash &) const override
const InfoHash & getNodeId() const override
virtual void get(const InfoHash &key, GetCallback cb, DoneCallback donecb={}, Value::Filter &&f={}, Where &&w={}) override
virtual size_t listen(const InfoHash &, ValueCallback, Value::Filter={}, Where={}) override
bool isRunning(sa_family_t af=0) const override
Sp< Value > getLocalById(const InfoHash &k, Value::Id id) const override
std::pair< size_t, size_t > getStoreSize() const override
void setStorageLimit(size_t) override
std::vector< NodeExport > exportNodes() const override
std::vector< Sp< Value > > getLocal(const InfoHash &k, const Value::Filter &filter) const override
std::vector< SockAddr > getPublicAddress(sa_family_t family=0) override
void connectivityChanged(sa_family_t) override
void put(const InfoHash &key, Sp< Value >, DoneCallback cb=nullptr, time_point created=time_point::max(), bool permanent=false) override
virtual size_t listen(const InfoHash &key, GetCallback cb, Value::Filter f={}, Where w={}) override
NodeStats getNodesStats(sa_family_t af) const override
void shutdown(ShutdownCallback cb, bool) override
void pushNotificationReceived(const std::map< std::string, std::string > &notification) override
Definition: callbacks.h:35
NodeStatus
Definition: callbacks.h:42
Describes a query destined to another peer.
Definition: value.h:925
Serializable dht::Value filter.
Definition: value.h:801