My Project 2.4.4
C++ Distributed Hash Table
Loading...
Searching...
No Matches
dht_proxy_server.h
1/*
2 * Copyright (C) 2014-2022 Savoir-faire Linux Inc.
3 * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4 * Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5 * Vsevolod Ivanov <vsevolod.ivanov@savoirfairelinux.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include "callbacks.h"
24#include "def.h"
25#include "infohash.h"
26#include "proxy.h"
27#include "scheduler.h"
28#include "sockaddr.h"
29#include "value.h"
30#include "http.h"
31
32#include <restinio/all.hpp>
33#include <restinio/tls.hpp>
34#include <json/json.h>
35
36#include <memory>
37#include <mutex>
38
39namespace dht {
40enum class PushType {
41 None = 0,
42 Android,
43 iOS
44};
45}
46MSGPACK_ADD_ENUM(dht::PushType)
47
48namespace http {
49class Request;
50struct ListenerSession;
51}
52
53namespace Json {
54class Value;
55}
56
57namespace dht {
58
59class DhtRunner;
60
61using RestRouter = restinio::router::express_router_t<>;
62using RequestStatus = restinio::request_handling_status_t;
63
65 in_port_t port {8000};
66 std::string pushServer {};
67 std::string persistStatePath {};
68 dht::crypto::Identity identity {};
69};
70
74class OPENDHT_PUBLIC DhtProxyServer
75{
76public:
85 DhtProxyServer(const std::shared_ptr<DhtRunner>& dht,
86 const ProxyServerConfig& config = {},
87 const std::shared_ptr<dht::Logger>& logger = {});
88
89 virtual ~DhtProxyServer();
90
91 DhtProxyServer(const DhtProxyServer& other) = delete;
92 DhtProxyServer(DhtProxyServer&& other) = delete;
93 DhtProxyServer& operator=(const DhtProxyServer& other) = delete;
94 DhtProxyServer& operator=(DhtProxyServer&& other) = delete;
95
96 asio::io_context& io_context() const;
97
98 struct ServerStats {
100 size_t listenCount {0};
102 size_t putCount {0};
104 size_t totalPermanentPuts {0};
106 size_t pushListenersCount {0};
108 double requestRate {0};
110 std::shared_ptr<NodeInfo> nodeInfo {};
111
112 std::string toString() const {
113 std::ostringstream ss;
114 ss << "Listens: " << listenCount << " Puts: " << putCount << " PushListeners: " << pushListenersCount << std::endl;
115 ss << "Requests: " << requestRate << " per second." << std::endl;
116 if (nodeInfo) {
117 auto& ipv4 = nodeInfo->ipv4;
118 if (ipv4.table_depth > 1)
119 ss << "IPv4 Network estimation: " << ipv4.getNetworkSizeEstimation() << std::endl;;
120 auto& ipv6 = nodeInfo->ipv6;
121 if (ipv6.table_depth > 1)
122 ss << "IPv6 Network estimation: " << ipv6.getNetworkSizeEstimation() << std::endl;;
123 }
124 return ss.str();
125 }
126
130 Json::Value toJson() const {
131 Json::Value result;
132 result["listenCount"] = static_cast<Json::UInt64>(listenCount);
133 result["putCount"] = static_cast<Json::UInt64>(putCount);
134 result["totalPermanentPuts"] = static_cast<Json::UInt64>(totalPermanentPuts);
135 result["pushListenersCount"] = static_cast<Json::UInt64>(pushListenersCount);
136 result["requestRate"] = requestRate;
137 if (nodeInfo)
138 result["nodeInfo"] = nodeInfo->toJson();
139 return result;
140 }
141 };
142
143 std::shared_ptr<ServerStats> stats() const { return stats_; }
144
145 std::shared_ptr<ServerStats> updateStats(std::shared_ptr<NodeInfo> info) const;
146
147 std::shared_ptr<DhtRunner> getNode() const { return dht_; }
148
149private:
150 class ConnectionListener;
151 struct RestRouterTraitsTls;
152 struct RestRouterTraits;
153
154 template <typename HttpResponse>
155 static HttpResponse initHttpResponse(HttpResponse response);
156 static restinio::request_handling_status_t serverError(restinio::request_t& request);
157
158 template< typename ServerSettings >
159 void addServerSettings(ServerSettings& serverSettings,
160 const unsigned int max_pipelined_requests = 16);
161
162 std::unique_ptr<RestRouter> createRestRouter();
163
164 void onConnectionClosed(restinio::connection_id_t);
165
173 RequestStatus getNodeInfo(restinio::request_handle_t request,
174 restinio::router::route_params_t params) const;
175
182 RequestStatus getStats(restinio::request_handle_t request,
183 restinio::router::route_params_t params);
184
195 RequestStatus get(restinio::request_handle_t request,
196 restinio::router::route_params_t params);
197
208 RequestStatus listen(restinio::request_handle_t request,
209 restinio::router::route_params_t params);
210
220 RequestStatus put(restinio::request_handle_t request,
221 restinio::router::route_params_t params);
222
223 void handleCancelPermamentPut(const asio::error_code &ec, const InfoHash& key, Value::Id vid);
224
225#ifdef OPENDHT_PROXY_SERVER_IDENTITY
235 RequestStatus putSigned(restinio::request_handle_t request,
236 restinio::router::route_params_t params) const;
237
247 RequestStatus putEncrypted(restinio::request_handle_t request,
248 restinio::router::route_params_t params);
249
250#endif // OPENDHT_PROXY_SERVER_IDENTITY
251
262 RequestStatus getFiltered(restinio::request_handle_t request,
263 restinio::router::route_params_t params);
264
272 RequestStatus options(restinio::request_handle_t request,
273 restinio::router::route_params_t params);
274
275#ifdef OPENDHT_PUSH_NOTIFICATIONS
285 RequestStatus subscribe(restinio::request_handle_t request,
286 restinio::router::route_params_t params);
287
295 RequestStatus unsubscribe(restinio::request_handle_t request,
296 restinio::router::route_params_t params);
297
303 void sendPushNotification(const std::string& key, Json::Value&& json, PushType type, bool highPriority);
304
312 void handleNotifyPushListenExpire(const asio::error_code &ec, const std::string pushToken,
313 std::function<Json::Value()> json, PushType type);
314
322 void handleCancelPushListen(const asio::error_code &ec, const std::string pushToken,
323 const InfoHash key, const std::string clientId);
324
325#endif //OPENDHT_PUSH_NOTIFICATIONS
326
327 void handlePrintStats(const asio::error_code &ec);
328 void updateStats();
329
330 template <typename Os>
331 void saveState(Os& stream);
332
333 template <typename Is>
334 void loadState(Is& is, size_t size);
335
336 using clock = std::chrono::steady_clock;
337 using time_point = clock::time_point;
338
339 std::shared_ptr<asio::io_context> ioContext_;
340 std::shared_ptr<DhtRunner> dht_;
341 Json::StreamWriterBuilder jsonBuilder_;
342 Json::CharReaderBuilder jsonReaderBuilder_;
343 std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()};
344
345 std::string persistPath_;
346
347 // http server
348 std::thread serverThread_;
349 std::unique_ptr<restinio::http_server_t<RestRouterTraitsTls>> httpsServer_;
350 std::unique_ptr<restinio::http_server_t<RestRouterTraits>> httpServer_;
351
352 // http client
353 std::pair<std::string, std::string> pushHostPort_;
354
355 mutable std::mutex requestLock_;
356 std::map<unsigned int /*id*/, std::shared_ptr<http::Request>> requests_;
357
358 std::shared_ptr<dht::Logger> logger_;
359
360 std::shared_ptr<ServerStats> stats_;
361 std::shared_ptr<NodeInfo> nodeInfo_ {};
362 std::unique_ptr<asio::steady_timer> printStatsTimer_;
363
364 // Thread-safe access to listeners map.
365 std::mutex lockListener_;
366 // Shared with connection listener.
367 std::map<restinio::connection_id_t, http::ListenerSession> listeners_;
368 // Connection Listener observing conn state changes.
369 std::shared_ptr<ConnectionListener> connListener_;
370
371 struct PushSessionContext {
372 std::mutex lock;
373 std::string sessionId;
374 PushSessionContext(const std::string& id) : sessionId(id) {}
375 };
376 struct PermanentPut {
377 time_point expiration;
378 std::string pushToken;
379 std::string clientId;
380 std::shared_ptr<PushSessionContext> sessionCtx;
381 std::unique_ptr<asio::steady_timer> expireTimer;
382 std::unique_ptr<asio::steady_timer> expireNotifyTimer;
383 Sp<Value> value;
384 PushType type;
385
386 template <typename Packer>
387 void msgpack_pack(Packer& p) const
388 {
389 p.pack_map(2 + (sessionCtx ? 1 : 0) + (clientId.empty() ? 0 : 1) + (type == PushType::None ? 0 : 2));
390 p.pack("value"); p.pack(value);
391 p.pack("exp"); p.pack(to_time_t(expiration));
392 if (not clientId.empty()) {
393 p.pack("cid"); p.pack(clientId);
394 }
395 if (sessionCtx) {
396 std::lock_guard<std::mutex> l(sessionCtx->lock);
397 p.pack("sid"); p.pack(sessionCtx->sessionId);
398 }
399 if (type != PushType::None) {
400 p.pack("t"); p.pack(type);
401 p.pack("token"); p.pack(pushToken);
402 }
403 }
404
405 void msgpack_unpack(const msgpack::object& o);
406 };
407 struct SearchPuts {
408 std::map<dht::Value::Id, PermanentPut> puts;
409 MSGPACK_DEFINE_ARRAY(puts)
410 };
411 std::mutex lockSearchPuts_;
412 std::map<InfoHash, SearchPuts> puts_;
413
414 mutable std::atomic<size_t> requestNum_ {0};
415 mutable std::atomic<time_point> lastStatsReset_ {time_point::min()};
416
417 std::string pushServer_;
418
419#ifdef OPENDHT_PUSH_NOTIFICATIONS
420 struct Listener {
421 time_point expiration;
422 std::string clientId;
423 std::shared_ptr<PushSessionContext> sessionCtx;
424 std::future<size_t> internalToken;
425 std::unique_ptr<asio::steady_timer> expireTimer;
426 std::unique_ptr<asio::steady_timer> expireNotifyTimer;
427 PushType type;
428
429 template <typename Packer>
430 void msgpack_pack(Packer& p) const
431 {
432 p.pack_map(sessionCtx ? 4 : 3);
433 p.pack("cid"); p.pack(clientId);
434 p.pack("exp"); p.pack(to_time_t(expiration));
435 if (sessionCtx) {
436 std::lock_guard<std::mutex> l(sessionCtx->lock);
437 p.pack("sid"); p.pack(sessionCtx->sessionId);
438 }
439 p.pack("t"); p.pack(type);
440 }
441
442 void msgpack_unpack(const msgpack::object& o);
443 };
444 struct PushListener {
445 std::map<InfoHash, std::vector<Listener>> listeners;
446 MSGPACK_DEFINE_ARRAY(listeners)
447 };
448 std::map<std::string, PushListener> pushListeners_;
449 proxy::ListenToken tokenPushNotif_ {0};
450#endif //OPENDHT_PUSH_NOTIFICATIONS
451};
452
453}
DhtProxyServer(const std::shared_ptr< DhtRunner > &dht, const ProxyServerConfig &config={}, const std::shared_ptr< dht::Logger > &logger={})
Definition: callbacks.h:35