My Project 2.4.4
C++ Distributed Hash Table
Loading...
Searching...
No Matches
network_engine.h
1/*
2 * Copyright (C) 2014-2022 Savoir-faire Linux Inc.
3 * Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4 * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20#pragma once
21
22#include "node_cache.h"
23#include "value.h"
24#include "infohash.h"
25#include "node.h"
26#include "scheduler.h"
27#include "utils.h"
28#include "rng.h"
29#include "rate_limiter.h"
30#include "log_enable.h"
31#include "network_utils.h"
32
33#include <vector>
34#include <string>
35#include <functional>
36#include <algorithm>
37#include <memory>
38#include <queue>
39
40namespace dht {
41namespace net {
42
43struct Request;
44struct Socket;
45struct TransId;
46
47#ifndef MSG_CONFIRM
48#define MSG_CONFIRM 0
49#endif
50
52 NetId network {0};
53 ssize_t max_req_per_sec {0};
54 ssize_t max_peer_req_per_sec {0};
55};
56
58public:
59 // sent to another peer (http-like).
60 static const constexpr uint16_t NON_AUTHORITATIVE_INFORMATION {203}; /* incomplete request packet. */
61 static const constexpr uint16_t UNAUTHORIZED {401}; /* wrong tokens. */
62 static const constexpr uint16_t NOT_FOUND {404}; /* storage not found */
63 // for internal use (custom).
64 static const constexpr uint16_t INVALID_TID_SIZE {421}; /* id was truncated. */
65 static const constexpr uint16_t UNKNOWN_TID {422}; /* unknown tid */
66 static const constexpr uint16_t WRONG_NODE_INFO_BUF_LEN {423}; /* node info length is wrong */
67
68 static const std::string GET_NO_INFOHASH; /* received "get" request with no infohash */
69 static const std::string LISTEN_NO_INFOHASH; /* got "listen" request without infohash */
70 static const std::string LISTEN_WRONG_TOKEN; /* wrong token in "listen" request */
71 static const std::string PUT_NO_INFOHASH; /* no infohash in "put" request */
72 static const std::string PUT_WRONG_TOKEN; /* got "put" request with wrong token */
73 static const std::string STORAGE_NOT_FOUND; /* got access request for an unknown storage */
74 static const std::string PUT_INVALID_ID; /* invalid id in "put" request */
75
76 DhtProtocolException(uint16_t code, const std::string& msg="", InfoHash failing_node_id={})
77 : DhtException(msg), msg(msg), code(code), failing_node_id(failing_node_id) {}
78
79 const std::string& getMsg() const { return msg; }
80 uint16_t getCode() const { return code; }
81 const InfoHash& getNodeId() const { return failing_node_id; }
82
83private:
84 std::string msg;
85 uint16_t code;
86 InfoHash failing_node_id;
87};
88
89struct ParsedMessage;
90
95 Blob ntoken {};
96 Value::Id vid {};
97 std::vector<Sp<Value>> values {};
98 std::vector<Value::Id> refreshed_values {};
99 std::vector<Value::Id> expired_values {};
100 std::vector<Sp<FieldValueIndex>> fields {};
101 std::vector<Sp<Node>> nodes4 {};
102 std::vector<Sp<Node>> nodes6 {};
103 RequestAnswer() {}
104 RequestAnswer(ParsedMessage&& msg);
105};
106
125class NetworkEngine final
126{
127private:
131 std::function<void(Sp<Request>, DhtProtocolException)> onError;
132
139 std::function<void(const Sp<Node>&, int)> onNewNode;
146 std::function<void(const InfoHash&, const SockAddr&)> onReportedAddr;
152 std::function<RequestAnswer(Sp<Node>)> onPing {};
161 std::function<RequestAnswer(Sp<Node>, const InfoHash&, want_t)> onFindNode {};
170 std::function<RequestAnswer(Sp<Node>, const InfoHash&, want_t, const Query&)> onGetValues {};
179 std::function<RequestAnswer(Sp<Node>,
180 const InfoHash&,
181 const Blob&,
182 Tid,
183 const Query&,
184 int)> onListen {};
194 std::function<RequestAnswer(Sp<Node>,
195 const InfoHash&,
196 const Blob&,
197 const std::vector<Sp<Value>>&,
198 const time_point&)> onAnnounce {};
207 std::function<RequestAnswer(Sp<Node>,
208 const InfoHash&,
209 const Blob&,
210 const Value::Id&)> onRefresh {};
211
212public:
213 using RequestCb = std::function<void(const Request&, RequestAnswer&&)>;
214 using RequestErrorCb = std::function<bool(const Request&, DhtProtocolException&&)>;
215 using RequestExpiredCb = std::function<void(const Request&, bool)>;
216
218 InfoHash& myid,
219 NetworkConfig config,
220 std::unique_ptr<DatagramSocket>&& sock,
221 const Sp<Logger>& log,
222 std::mt19937_64& rd,
223 Scheduler& scheduler,
224 decltype(NetworkEngine::onError)&& onError,
225 decltype(NetworkEngine::onNewNode)&& onNewNode,
226 decltype(NetworkEngine::onReportedAddr)&& onReportedAddr,
227 decltype(NetworkEngine::onPing)&& onPing,
228 decltype(NetworkEngine::onFindNode)&& onFindNode,
229 decltype(NetworkEngine::onGetValues)&& onGetValues,
230 decltype(NetworkEngine::onListen)&& onListen,
231 decltype(NetworkEngine::onAnnounce)&& onAnnounce,
232 decltype(NetworkEngine::onRefresh)&& onRefresh);
233
235
236 net::DatagramSocket* getSocket() const { return dht_socket.get(); };
237
238 void clear();
239
255 void tellListener(const Sp<Node>& n, Tid socket_id, const InfoHash& hash, want_t want, const Blob& ntoken,
256 std::vector<Sp<Node>>&& nodes, std::vector<Sp<Node>>&& nodes6,
257 std::vector<Sp<Value>>&& values, const Query& q, int version);
258
259 void tellListenerRefreshed(const Sp<Node>& n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values, int version);
260 void tellListenerExpired(const Sp<Node>& n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values, int version);
261
262 bool isRunning(sa_family_t af) const;
263 inline want_t want () const { return dht_socket->hasIPv4() and dht_socket->hasIPv6() ? (WANT4 | WANT6) : -1; }
264
265 void connectivityChanged(sa_family_t);
266
267 /**************
268 * Requests *
269 **************/
270
280 Sp<Request>
281 sendPing(const Sp<Node>& n, RequestCb&& on_done, RequestExpiredCb&& on_expired);
282
293 Sp<Request>
294 sendPing(SockAddr&& sa, RequestCb&& on_done, RequestExpiredCb&& on_expired) {
295 return sendPing(std::make_shared<Node>(InfoHash::zero(), std::move(sa), rd),
296 std::forward<RequestCb>(on_done),
297 std::forward<RequestExpiredCb>(on_expired));
298 }
299
312 Sp<Request> sendFindNode(const Sp<Node>& n,
313 const InfoHash& hash,
314 want_t want = -1,
315 RequestCb&& on_done = {},
316 RequestExpiredCb&& on_expired = {});
331 Sp<Request> sendGetValues(const Sp<Node>& n,
332 const InfoHash& hash,
333 const Query& query,
334 want_t want,
335 RequestCb&& on_done,
336 RequestExpiredCb&& on_expired);
360 Sp<Request> sendListen(const Sp<Node>& n,
361 const InfoHash& hash,
362 const Query& query,
363 const Blob& token,
364 Tid socketId,
365 RequestCb&& on_done,
366 RequestExpiredCb&& on_expired);
380 Sp<Request> sendAnnounceValue(const Sp<Node>& n,
381 const InfoHash& hash,
382 const Sp<Value>& v,
383 time_point created,
384 const Blob& token,
385 RequestCb&& on_done,
386 RequestExpiredCb&& on_expired);
400 Sp<Request> sendRefreshValue(const Sp<Node>& n,
401 const InfoHash& hash,
402 const Value::Id& vid,
403 const Blob& token,
404 RequestCb&& on_done,
405 RequestErrorCb&& on_error,
406 RequestExpiredCb&& on_expired);
419 Sp<Request> sendUpdateValues(const Sp<Node>& n,
420 const InfoHash& infohash,
421 const std::vector<Sp<Value>>& values,
422 time_point created,
423 const Blob& token,
424 const size_t& sid);
425
435 void processMessage(const uint8_t *buf, size_t buflen, SockAddr addr);
436
437 Sp<Node> insertNode(const InfoHash& id, const SockAddr& addr) {
438 auto n = cache.getNode(id, addr, scheduler.time(), 0);
439 onNewNode(n, 0);
440 return n;
441 }
442
443 std::vector<unsigned> getNodeMessageStats(bool in) {
444 auto& st = in ? in_stats : out_stats;
445 std::vector<unsigned> stats {st.ping, st.find, st.get, st.listen, st.put};
446 st = {};
447 return stats;
448 }
449
450 void blacklistNode(const Sp<Node>& n);
451
452 std::vector<Sp<Node>> getCachedNodes(const InfoHash& id, sa_family_t sa_f, size_t count) {
453 return cache.getCachedNodes(id, sa_f, count);
454 }
455
456 size_t getNodeCacheSize() const {
457 return cache.size();
458 }
459 size_t getNodeCacheSize(sa_family_t af) const {
460 return cache.size(af);
461 }
462
463 size_t getRateLimiterSize() const {
464 return address_rate_limiter.size();
465 }
466
467 size_t getPartialCount() const {
468 return partial_messages.size();
469 }
470
471private:
472
473 struct PartialMessage;
474
475 /***************
476 * Constants *
477 ***************/
478 /* the length of a node info buffer in ipv4 format */
479 static const constexpr size_t NODE4_INFO_BUF_LEN {HASH_LEN + sizeof(in_addr) + sizeof(in_port_t)};
480 /* the length of a node info buffer in ipv6 format */
481 static const constexpr size_t NODE6_INFO_BUF_LEN {HASH_LEN + sizeof(in6_addr) + sizeof(in_port_t)};
482 /* after a UDP reply, the period during which we tell the link layer about it */
483 static constexpr std::chrono::seconds UDP_REPLY_TIME {15};
484
485 /* Max. time to receive a full fragmented packet */
486 static constexpr std::chrono::seconds RX_MAX_PACKET_TIME {10};
487 /* Max. time between packet fragments */
488 static constexpr std::chrono::seconds RX_TIMEOUT {3};
489 /* The maximum number of nodes that we snub. There is probably little
490 reason to increase this value. */
491 static constexpr unsigned BLACKLISTED_MAX {10};
492
493 static constexpr size_t MTU {1280};
494 static constexpr size_t MAX_PACKET_VALUE_SIZE {600};
495
496 static const std::string my_v;
497
498 void process(std::unique_ptr<ParsedMessage>&&, const SockAddr& from);
499
500 bool rateLimit(const SockAddr& addr);
501
502 static bool isMartian(const SockAddr& addr);
503 bool isNodeBlacklisted(const SockAddr& addr) const;
504
505 void requestStep(Sp<Request> req);
506
511 void sendRequest(const Sp<Request>& request);
512
513 struct MessageStats {
514 unsigned ping {0};
515 unsigned find {0};
516 unsigned get {0};
517 unsigned put {0};
518 unsigned listen {0};
519 unsigned refresh {0};
520 unsigned updateValue {0};
521 };
522
523
524 // basic wrapper for socket sendto function
525 int send(const SockAddr& addr, const char *buf, size_t len, bool confirmed = false);
526
527 void sendValueParts(Tid tid, const std::vector<Blob>& svals, const SockAddr& addr);
528 std::vector<Blob> packValueHeader(msgpack::sbuffer&, const std::vector<Sp<Value>>&);
529 void maintainRxBuffer(Tid tid);
530
531 /*************
532 * Answers *
533 *************/
534 /* answer to a ping request */
535 void sendPong(const SockAddr& addr, Tid tid);
536 /* answer to findnodes/getvalues request */
537 void sendNodesValues(const SockAddr& addr,
538 Tid tid,
539 const Blob& nodes,
540 const Blob& nodes6,
541 const std::vector<Sp<Value>>& st,
542 const Query& query,
543 const Blob& token);
544 Blob bufferNodes(sa_family_t af, const InfoHash& id, std::vector<Sp<Node>>& nodes);
545
546 std::pair<Blob, Blob> bufferNodes(sa_family_t af,
547 const InfoHash& id,
548 want_t want,
549 std::vector<Sp<Node>>& nodes,
550 std::vector<Sp<Node>>& nodes6);
551 /* answer to a listen request */
552 void sendListenConfirmation(const SockAddr& addr, Tid tid);
553 /* answer to put request */
554 void sendValueAnnounced(const SockAddr& addr, Tid, Value::Id);
555 /* answer in case of error */
556 void sendError(const SockAddr& addr,
557 Tid tid,
558 uint16_t code,
559 const std::string& message,
560 bool include_id=false);
561
562 void deserializeNodes(ParsedMessage& msg, const SockAddr& from);
563
564 /* DHT info */
565 const InfoHash& myid;
566 const NetworkConfig config {};
567 const std::unique_ptr<DatagramSocket> dht_socket;
568 Sp<Logger> logger_;
569 std::mt19937_64& rd;
570
571 NodeCache cache;
572
573 // global limiting should be triggered by at least 8 different IPs
574 using IpLimiter = RateLimiter;
575 using IpLimiterMap = std::map<SockAddr, IpLimiter, SockAddr::ipCmp>;
576 IpLimiterMap address_rate_limiter;
577 RateLimiter rate_limiter;
578 ssize_t limiter_maintenance {0};
579
580 // requests handling
581 std::map<Tid, Sp<Request>> requests {};
582 std::map<Tid, PartialMessage> partial_messages;
583
584 MessageStats in_stats {}, out_stats {};
585 std::set<SockAddr> blacklist {};
586
587 Scheduler& scheduler;
588
589 bool logIncoming_ {false};
590};
591
592} /* namespace net */
593} /* namespace dht */
Job scheduler.
Definition: scheduler.h:36
const time_point & time() const
Definition: scheduler.h:123
An abstraction of communication protocol on the network.
void processMessage(const uint8_t *buf, size_t buflen, SockAddr addr)
void tellListener(const Sp< Node > &n, Tid socket_id, const InfoHash &hash, want_t want, const Blob &ntoken, std::vector< Sp< Node > > &&nodes, std::vector< Sp< Node > > &&nodes6, std::vector< Sp< Value > > &&values, const Query &q, int version)
Sp< Request > sendPing(SockAddr &&sa, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendFindNode(const Sp< Node > &n, const InfoHash &hash, want_t want=-1, RequestCb &&on_done={}, RequestExpiredCb &&on_expired={})
Sp< Request > sendUpdateValues(const Sp< Node > &n, const InfoHash &infohash, const std::vector< Sp< Value > > &values, time_point created, const Blob &token, const size_t &sid)
Sp< Request > sendRefreshValue(const Sp< Node > &n, const InfoHash &hash, const Value::Id &vid, const Blob &token, RequestCb &&on_done, RequestErrorCb &&on_error, RequestExpiredCb &&on_expired)
Sp< Request > sendGetValues(const Sp< Node > &n, const InfoHash &hash, const Query &query, want_t want, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendPing(const Sp< Node > &n, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendAnnounceValue(const Sp< Node > &n, const InfoHash &hash, const Sp< Value > &v, time_point created, const Blob &token, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendListen(const Sp< Node > &n, const InfoHash &hash, const Query &query, const Blob &token, Tid socketId, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Definition: callbacks.h:35
std::vector< uint8_t > Blob
Definition: utils.h:151
Describes a query destined to another peer.
Definition: value.h:925