32#include <restinio/all.hpp>
33#include <restinio/tls.hpp>
46MSGPACK_ADD_ENUM(dht::PushType)
50struct ListenerSession;
61using RestRouter = restinio::router::express_router_t<>;
62using RequestStatus = restinio::request_handling_status_t;
65 in_port_t port {8000};
66 std::string pushServer {};
67 std::string persistStatePath {};
68 dht::crypto::Identity identity {};
87 const std::shared_ptr<dht::Logger>& logger = {});
89 virtual ~DhtProxyServer();
91 DhtProxyServer(
const DhtProxyServer& other) =
delete;
92 DhtProxyServer(DhtProxyServer&& other) =
delete;
93 DhtProxyServer& operator=(
const DhtProxyServer& other) =
delete;
94 DhtProxyServer& operator=(DhtProxyServer&& other) =
delete;
96 asio::io_context& io_context()
const;
100 size_t listenCount {0};
104 size_t totalPermanentPuts {0};
106 size_t pushListenersCount {0};
108 double requestRate {0};
110 std::shared_ptr<NodeInfo> nodeInfo {};
112 std::string toString()
const {
113 std::ostringstream ss;
114 ss <<
"Listens: " << listenCount <<
" Puts: " << putCount <<
" PushListeners: " << pushListenersCount << std::endl;
115 ss <<
"Requests: " << requestRate <<
" per second." << std::endl;
117 auto& ipv4 = nodeInfo->ipv4;
118 if (ipv4.table_depth > 1)
119 ss <<
"IPv4 Network estimation: " << ipv4.getNetworkSizeEstimation() << std::endl;;
120 auto& ipv6 = nodeInfo->ipv6;
121 if (ipv6.table_depth > 1)
122 ss <<
"IPv6 Network estimation: " << ipv6.getNetworkSizeEstimation() << std::endl;;
132 result[
"listenCount"] =
static_cast<Json::UInt64
>(listenCount);
133 result[
"putCount"] =
static_cast<Json::UInt64
>(putCount);
134 result[
"totalPermanentPuts"] =
static_cast<Json::UInt64
>(totalPermanentPuts);
135 result[
"pushListenersCount"] =
static_cast<Json::UInt64
>(pushListenersCount);
136 result[
"requestRate"] = requestRate;
138 result[
"nodeInfo"] = nodeInfo->toJson();
143 std::shared_ptr<ServerStats> stats()
const {
return stats_; }
145 std::shared_ptr<ServerStats> updateStats(std::shared_ptr<NodeInfo> info)
const;
147 std::shared_ptr<DhtRunner> getNode()
const {
return dht_; }
150 class ConnectionListener;
151 struct RestRouterTraitsTls;
152 struct RestRouterTraits;
154 template <
typename HttpResponse>
155 static HttpResponse initHttpResponse(HttpResponse response);
156 static restinio::request_handling_status_t serverError(restinio::request_t& request);
158 template<
typename ServerSettings >
159 void addServerSettings(ServerSettings& serverSettings,
160 const unsigned int max_pipelined_requests = 16);
162 std::unique_ptr<RestRouter> createRestRouter();
164 void onConnectionClosed(restinio::connection_id_t);
173 RequestStatus getNodeInfo(restinio::request_handle_t request,
174 restinio::router::route_params_t params)
const;
182 RequestStatus getStats(restinio::request_handle_t request,
183 restinio::router::route_params_t params);
195 RequestStatus get(restinio::request_handle_t request,
196 restinio::router::route_params_t params);
208 RequestStatus listen(restinio::request_handle_t request,
209 restinio::router::route_params_t params);
220 RequestStatus put(restinio::request_handle_t request,
221 restinio::router::route_params_t params);
223 void handleCancelPermamentPut(
const asio::error_code &ec,
const InfoHash& key, Value::Id vid);
225#ifdef OPENDHT_PROXY_SERVER_IDENTITY
235 RequestStatus putSigned(restinio::request_handle_t request,
236 restinio::router::route_params_t params)
const;
247 RequestStatus putEncrypted(restinio::request_handle_t request,
248 restinio::router::route_params_t params);
262 RequestStatus getFiltered(restinio::request_handle_t request,
263 restinio::router::route_params_t params);
272 RequestStatus options(restinio::request_handle_t request,
273 restinio::router::route_params_t params);
275#ifdef OPENDHT_PUSH_NOTIFICATIONS
285 RequestStatus subscribe(restinio::request_handle_t request,
286 restinio::router::route_params_t params);
295 RequestStatus unsubscribe(restinio::request_handle_t request,
296 restinio::router::route_params_t params);
303 void sendPushNotification(
const std::string& key, Json::Value&& json, PushType type,
bool highPriority);
312 void handleNotifyPushListenExpire(
const asio::error_code &ec,
const std::string pushToken,
313 std::function<Json::Value()> json, PushType type);
322 void handleCancelPushListen(
const asio::error_code &ec,
const std::string pushToken,
323 const InfoHash key,
const std::string clientId);
327 void handlePrintStats(
const asio::error_code &ec);
330 template <
typename Os>
331 void saveState(Os& stream);
333 template <
typename Is>
334 void loadState(Is& is,
size_t size);
336 using clock = std::chrono::steady_clock;
337 using time_point = clock::time_point;
339 std::shared_ptr<asio::io_context> ioContext_;
340 std::shared_ptr<DhtRunner> dht_;
341 Json::StreamWriterBuilder jsonBuilder_;
342 Json::CharReaderBuilder jsonReaderBuilder_;
343 std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()};
345 std::string persistPath_;
348 std::thread serverThread_;
349 std::unique_ptr<restinio::http_server_t<RestRouterTraitsTls>> httpsServer_;
350 std::unique_ptr<restinio::http_server_t<RestRouterTraits>> httpServer_;
353 std::pair<std::string, std::string> pushHostPort_;
355 mutable std::mutex requestLock_;
356 std::map<
unsigned int , std::shared_ptr<http::Request>> requests_;
358 std::shared_ptr<dht::Logger> logger_;
360 std::shared_ptr<ServerStats> stats_;
361 std::shared_ptr<NodeInfo> nodeInfo_ {};
362 std::unique_ptr<asio::steady_timer> printStatsTimer_;
365 std::mutex lockListener_;
367 std::map<restinio::connection_id_t, http::ListenerSession> listeners_;
369 std::shared_ptr<ConnectionListener> connListener_;
371 struct PushSessionContext {
373 std::string sessionId;
374 PushSessionContext(
const std::string&
id) : sessionId(id) {}
376 struct PermanentPut {
377 time_point expiration;
378 std::string pushToken;
379 std::string clientId;
380 std::shared_ptr<PushSessionContext> sessionCtx;
381 std::unique_ptr<asio::steady_timer> expireTimer;
382 std::unique_ptr<asio::steady_timer> expireNotifyTimer;
386 template <
typename Packer>
387 void msgpack_pack(Packer& p)
const
389 p.pack_map(2 + (sessionCtx ? 1 : 0) + (clientId.empty() ? 0 : 1) + (type == PushType::None ? 0 : 2));
390 p.pack(
"value"); p.pack(value);
391 p.pack(
"exp"); p.pack(to_time_t(expiration));
392 if (not clientId.empty()) {
393 p.pack(
"cid"); p.pack(clientId);
396 std::lock_guard<std::mutex> l(sessionCtx->lock);
397 p.pack(
"sid"); p.pack(sessionCtx->sessionId);
399 if (type != PushType::None) {
400 p.pack(
"t"); p.pack(type);
401 p.pack(
"token"); p.pack(pushToken);
405 void msgpack_unpack(
const msgpack::object& o);
408 std::map<dht::Value::Id, PermanentPut> puts;
409 MSGPACK_DEFINE_ARRAY(puts)
411 std::mutex lockSearchPuts_;
412 std::map<InfoHash, SearchPuts> puts_;
414 mutable std::atomic<size_t> requestNum_ {0};
415 mutable std::atomic<time_point> lastStatsReset_ {time_point::min()};
417 std::string pushServer_;
419#ifdef OPENDHT_PUSH_NOTIFICATIONS
421 time_point expiration;
422 std::string clientId;
423 std::shared_ptr<PushSessionContext> sessionCtx;
424 std::future<size_t> internalToken;
425 std::unique_ptr<asio::steady_timer> expireTimer;
426 std::unique_ptr<asio::steady_timer> expireNotifyTimer;
429 template <
typename Packer>
430 void msgpack_pack(Packer& p)
const
432 p.pack_map(sessionCtx ? 4 : 3);
433 p.pack(
"cid"); p.pack(clientId);
434 p.pack(
"exp"); p.pack(to_time_t(expiration));
436 std::lock_guard<std::mutex> l(sessionCtx->lock);
437 p.pack(
"sid"); p.pack(sessionCtx->sessionId);
439 p.pack(
"t"); p.pack(type);
442 void msgpack_unpack(
const msgpack::object& o);
444 struct PushListener {
445 std::map<InfoHash, std::vector<Listener>> listeners;
446 MSGPACK_DEFINE_ARRAY(listeners)
448 std::map<std::string, PushListener> pushListeners_;
449 proxy::ListenToken tokenPushNotif_ {0};
DhtProxyServer(const std::shared_ptr< DhtRunner > &dht, const ProxyServerConfig &config={}, const std::shared_ptr< dht::Logger > &logger={})
Json::Value toJson() const