22#include "node_cache.h"
29#include "rate_limiter.h"
30#include "log_enable.h"
31#include "network_utils.h"
53 ssize_t max_req_per_sec {0};
54 ssize_t max_peer_req_per_sec {0};
60 static const constexpr uint16_t NON_AUTHORITATIVE_INFORMATION {203};
61 static const constexpr uint16_t UNAUTHORIZED {401};
62 static const constexpr uint16_t NOT_FOUND {404};
64 static const constexpr uint16_t INVALID_TID_SIZE {421};
65 static const constexpr uint16_t UNKNOWN_TID {422};
66 static const constexpr uint16_t WRONG_NODE_INFO_BUF_LEN {423};
68 static const std::string GET_NO_INFOHASH;
69 static const std::string LISTEN_NO_INFOHASH;
70 static const std::string LISTEN_WRONG_TOKEN;
71 static const std::string PUT_NO_INFOHASH;
72 static const std::string PUT_WRONG_TOKEN;
73 static const std::string STORAGE_NOT_FOUND;
74 static const std::string PUT_INVALID_ID;
77 :
DhtException(msg), msg(msg), code(code), failing_node_id(failing_node_id) {}
79 const std::string& getMsg()
const {
return msg; }
80 uint16_t getCode()
const {
return code; }
81 const InfoHash& getNodeId()
const {
return failing_node_id; }
97 std::vector<Sp<Value>> values {};
98 std::vector<Value::Id> refreshed_values {};
99 std::vector<Value::Id> expired_values {};
100 std::vector<Sp<FieldValueIndex>> fields {};
101 std::vector<Sp<Node>> nodes4 {};
102 std::vector<Sp<Node>> nodes6 {};
139 std::function<void(
const Sp<Node>&,
int)> onNewNode;
197 const std::vector<Sp<Value>>&,
198 const time_point&)> onAnnounce {};
210 const Value::Id&)> onRefresh {};
213 using RequestCb = std::function<void(
const Request&,
RequestAnswer&&)>;
215 using RequestExpiredCb = std::function<void(
const Request&,
bool)>;
220 std::unique_ptr<DatagramSocket>&& sock,
221 const Sp<Logger>& log,
224 decltype(NetworkEngine::onError)&& onError,
225 decltype(NetworkEngine::onNewNode)&& onNewNode,
226 decltype(NetworkEngine::onReportedAddr)&& onReportedAddr,
227 decltype(NetworkEngine::onPing)&& onPing,
228 decltype(NetworkEngine::onFindNode)&& onFindNode,
229 decltype(NetworkEngine::onGetValues)&& onGetValues,
230 decltype(NetworkEngine::onListen)&& onListen,
231 decltype(NetworkEngine::onAnnounce)&& onAnnounce,
232 decltype(NetworkEngine::onRefresh)&& onRefresh);
256 std::vector<Sp<Node>>&& nodes, std::vector<Sp<Node>>&& nodes6,
257 std::vector<Sp<Value>>&& values,
const Query& q,
int version);
259 void tellListenerRefreshed(
const Sp<Node>& n, Tid socket_id,
const InfoHash& hash,
const Blob& ntoken,
const std::vector<Value::Id>& values,
int version);
260 void tellListenerExpired(
const Sp<Node>& n, Tid socket_id,
const InfoHash& hash,
const Blob& ntoken,
const std::vector<Value::Id>& values,
int version);
262 bool isRunning(sa_family_t af)
const;
263 inline want_t want ()
const {
return dht_socket->hasIPv4() and dht_socket->hasIPv6() ? (WANT4 | WANT6) : -1; }
265 void connectivityChanged(sa_family_t);
281 sendPing(
const Sp<Node>& n, RequestCb&& on_done, RequestExpiredCb&& on_expired);
295 return sendPing(std::make_shared<Node>(InfoHash::zero(), std::move(sa), rd),
296 std::forward<RequestCb>(on_done),
297 std::forward<RequestExpiredCb>(on_expired));
315 RequestCb&& on_done = {},
316 RequestExpiredCb&& on_expired = {});
336 RequestExpiredCb&& on_expired);
366 RequestExpiredCb&& on_expired);
386 RequestExpiredCb&& on_expired);
402 const Value::Id& vid,
405 RequestErrorCb&& on_error,
406 RequestExpiredCb&& on_expired);
421 const std::vector<Sp<Value>>& values,
438 auto n = cache.getNode(
id, addr, scheduler.
time(), 0);
443 std::vector<unsigned> getNodeMessageStats(
bool in) {
444 auto& st = in ? in_stats : out_stats;
445 std::vector<unsigned> stats {st.ping, st.find, st.get, st.listen, st.put};
450 void blacklistNode(
const Sp<Node>& n);
452 std::vector<Sp<Node>> getCachedNodes(
const InfoHash&
id, sa_family_t sa_f,
size_t count) {
453 return cache.getCachedNodes(
id, sa_f, count);
456 size_t getNodeCacheSize()
const {
459 size_t getNodeCacheSize(sa_family_t af)
const {
460 return cache.size(af);
463 size_t getRateLimiterSize()
const {
464 return address_rate_limiter.size();
467 size_t getPartialCount()
const {
468 return partial_messages.size();
473 struct PartialMessage;
479 static const constexpr size_t NODE4_INFO_BUF_LEN {HASH_LEN +
sizeof(in_addr) +
sizeof(in_port_t)};
481 static const constexpr size_t NODE6_INFO_BUF_LEN {HASH_LEN +
sizeof(in6_addr) +
sizeof(in_port_t)};
483 static constexpr std::chrono::seconds UDP_REPLY_TIME {15};
486 static constexpr std::chrono::seconds RX_MAX_PACKET_TIME {10};
488 static constexpr std::chrono::seconds RX_TIMEOUT {3};
491 static constexpr unsigned BLACKLISTED_MAX {10};
493 static constexpr size_t MTU {1280};
494 static constexpr size_t MAX_PACKET_VALUE_SIZE {600};
496 static const std::string my_v;
498 void process(std::unique_ptr<ParsedMessage>&&,
const SockAddr& from);
500 bool rateLimit(
const SockAddr& addr);
502 static bool isMartian(
const SockAddr& addr);
503 bool isNodeBlacklisted(
const SockAddr& addr)
const;
505 void requestStep(Sp<Request> req);
511 void sendRequest(
const Sp<Request>& request);
513 struct MessageStats {
519 unsigned refresh {0};
520 unsigned updateValue {0};
525 int send(
const SockAddr& addr,
const char *buf,
size_t len,
bool confirmed =
false);
527 void sendValueParts(Tid tid,
const std::vector<Blob>& svals,
const SockAddr& addr);
528 std::vector<Blob> packValueHeader(msgpack::sbuffer&,
const std::vector<Sp<Value>>&);
529 void maintainRxBuffer(Tid tid);
535 void sendPong(
const SockAddr& addr, Tid tid);
537 void sendNodesValues(
const SockAddr& addr,
541 const std::vector<Sp<Value>>& st,
544 Blob bufferNodes(sa_family_t af,
const InfoHash&
id, std::vector<Sp<Node>>& nodes);
546 std::pair<Blob, Blob> bufferNodes(sa_family_t af,
549 std::vector<Sp<Node>>& nodes,
550 std::vector<Sp<Node>>& nodes6);
552 void sendListenConfirmation(
const SockAddr& addr, Tid tid);
554 void sendValueAnnounced(
const SockAddr& addr, Tid, Value::Id);
556 void sendError(
const SockAddr& addr,
559 const std::string& message,
560 bool include_id=
false);
562 void deserializeNodes(ParsedMessage& msg,
const SockAddr& from);
565 const InfoHash& myid;
566 const NetworkConfig config {};
567 const std::unique_ptr<DatagramSocket> dht_socket;
574 using IpLimiter = RateLimiter;
575 using IpLimiterMap = std::map<SockAddr, IpLimiter, SockAddr::ipCmp>;
576 IpLimiterMap address_rate_limiter;
577 RateLimiter rate_limiter;
578 ssize_t limiter_maintenance {0};
581 std::map<Tid, Sp<Request>> requests {};
582 std::map<Tid, PartialMessage> partial_messages;
584 MessageStats in_stats {}, out_stats {};
585 std::set<SockAddr> blacklist {};
587 Scheduler& scheduler;
589 bool logIncoming_ {
false};
const time_point & time() const
An abstraction of communication protocol on the network.
void processMessage(const uint8_t *buf, size_t buflen, SockAddr addr)
void tellListener(const Sp< Node > &n, Tid socket_id, const InfoHash &hash, want_t want, const Blob &ntoken, std::vector< Sp< Node > > &&nodes, std::vector< Sp< Node > > &&nodes6, std::vector< Sp< Value > > &&values, const Query &q, int version)
Sp< Request > sendPing(SockAddr &&sa, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendFindNode(const Sp< Node > &n, const InfoHash &hash, want_t want=-1, RequestCb &&on_done={}, RequestExpiredCb &&on_expired={})
Sp< Request > sendUpdateValues(const Sp< Node > &n, const InfoHash &infohash, const std::vector< Sp< Value > > &values, time_point created, const Blob &token, const size_t &sid)
Sp< Request > sendRefreshValue(const Sp< Node > &n, const InfoHash &hash, const Value::Id &vid, const Blob &token, RequestCb &&on_done, RequestErrorCb &&on_error, RequestExpiredCb &&on_expired)
Sp< Request > sendGetValues(const Sp< Node > &n, const InfoHash &hash, const Query &query, want_t want, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendPing(const Sp< Node > &n, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendAnnounceValue(const Sp< Node > &n, const InfoHash &hash, const Sp< Value > &v, time_point created, const Blob &token, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendListen(const Sp< Node > &n, const InfoHash &hash, const Query &query, const Blob &token, Tid socketId, RequestCb &&on_done, RequestExpiredCb &&on_expired)
std::vector< uint8_t > Blob
Describes a query destined to another peer.