Monero
Loading...
Searching...
No Matches
abstract_tcp_server2.inl
Go to the documentation of this file.
1
7// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
8// All rights reserved.
9//
10// Redistribution and use in source and binary forms, with or without
11// modification, are permitted provided that the following conditions are met:
12// * Redistributions of source code must retain the above copyright
13// notice, this list of conditions and the following disclaimer.
14// * Redistributions in binary form must reproduce the above copyright
15// notice, this list of conditions and the following disclaimer in the
16// documentation and/or other materials provided with the distribution.
17// * Neither the name of the Andrey N. Sabelnikov nor the
18// names of its contributors may be used to endorse or promote products
19// derived from this software without specific prior written permission.
20//
21// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
22// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
24// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY
25// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
26// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
28// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31//
32
33
34#include <boost/asio/post.hpp>
35#include <boost/foreach.hpp>
36#include <boost/uuid/random_generator.hpp>
37#include <boost/uuid/uuid_io.hpp>
38#include <boost/chrono.hpp>
39#include <boost/utility/value_init.hpp>
40#include <boost/asio/bind_executor.hpp>
41#include <boost/asio/deadline_timer.hpp>
42#include <boost/date_time/posix_time/posix_time.hpp> // TODO
43#include <boost/thread/condition_variable.hpp> // TODO
44#include <boost/make_shared.hpp>
45#include <boost/thread.hpp>
46#include "warnings.h"
48#include "misc_language.h"
49
50#include <sstream>
51#include <iomanip>
52#include <algorithm>
53#include <functional>
54#include <random>
55
56#undef MONERO_DEFAULT_LOG_CATEGORY
57#define MONERO_DEFAULT_LOG_CATEGORY "net"
58
59#define AGGRESSIVE_TIMEOUT_THRESHOLD 120 // sockets
60#define NEW_CONNECTION_TIMEOUT_LOCAL 1200000 // 2 minutes
61#define NEW_CONNECTION_TIMEOUT_REMOTE 10000 // 10 seconds
62#define DEFAULT_TIMEOUT_MS_LOCAL 1800000 // 30 minutes
63#define DEFAULT_TIMEOUT_MS_REMOTE 300000 // 5 minutes
64#define TIMEOUT_EXTRA_MS_PER_BYTE 0.2
65
66
67namespace epee
68{
69namespace net_utils
70{
71 template<typename T>
72 T& check_and_get(std::shared_ptr<T>& ptr)
73 {
74 CHECK_AND_ASSERT_THROW_MES(bool(ptr), "shared_state cannot be null");
75 return *ptr;
76 }
77
78 /************************************************************************/
79 /* */
80 /************************************************************************/
81 template<typename T>
82 unsigned int connection<T>::host_count(int delta)
83 {
84 static std::mutex hosts_mutex;
85 std::lock_guard<std::mutex> guard(hosts_mutex);
86 static std::map<std::string, unsigned int> hosts;
87 unsigned int &val = hosts[m_host];
88 if (delta > 0)
89 MTRACE("New connection from host " << m_host << ": " << val);
90 else if (delta < 0)
91 MTRACE("Closed connection from host " << m_host << ": " << val);
92 CHECK_AND_ASSERT_THROW_MES(delta >= 0 || val >= (unsigned)-delta, "Count would go negative");
93 CHECK_AND_ASSERT_THROW_MES(delta <= 0 || val <= std::numeric_limits<unsigned int>::max() - (unsigned)delta, "Count would wrap");
94 val += delta;
95 return val;
96 }
97
98 template<typename T>
100 {
101 unsigned count{};
102 try { count = host_count(); } catch (...) {}
103 const unsigned shift = (
105 std::min(std::max(count, 1u) - 1, 8u) :
106 0
107 );
108 return (
109 m_local ?
110 std::chrono::milliseconds(DEFAULT_TIMEOUT_MS_LOCAL >> shift) :
111 std::chrono::milliseconds(DEFAULT_TIMEOUT_MS_REMOTE >> shift)
112 );
113 }
114
115 template<typename T>
117 {
118 return std::chrono::duration_cast<connection<T>::duration_t>(
119 std::chrono::duration<double, std::chrono::milliseconds::period>(
121 )
122 );
123 }
124
125 template<typename T>
127 {
128 switch (m_state.status)
129 {
131 interrupt();
132 break;
135 break;
138 break;
139 default:
140 break;
141 }
142 }
143
144 template<typename T>
145 void connection<T>::start_timer(duration_t duration, bool add)
146 {
147 if (m_state.timers.general.wait_expire) {
148 m_state.timers.general.cancel_expire = true;
149 m_state.timers.general.reset_expire = true;
150 m_timers.general.expires_after(
151 std::min(
152 duration + (add ? (m_timers.general.expiry() - std::chrono::steady_clock::now()) : duration_t{}),
154 )
155 );
156 }
157 else {
158 m_timers.general.expires_after(
159 std::min(
160 duration + (add ? (m_timers.general.expiry() - std::chrono::steady_clock::now()) : duration_t{}),
162 )
163 );
165 }
166 }
167
168 template<typename T>
170 {
171 if (m_state.timers.general.wait_expire)
172 return;
173 m_state.timers.general.wait_expire = true;
175 auto on_wait = [this, self] {
176 std::lock_guard<std::mutex> guard(m_state.lock);
177 m_state.timers.general.wait_expire = false;
178 if (m_state.timers.general.cancel_expire) {
179 m_state.timers.general.cancel_expire = false;
180 if (m_state.timers.general.reset_expire) {
181 m_state.timers.general.reset_expire = false;
183 }
184 else if (m_state.status == status_t::INTERRUPTED)
186 else if (m_state.status == status_t::TERMINATING)
188 }
189 else if (m_state.status == status_t::RUNNING)
190 interrupt();
191 else if (m_state.status == status_t::INTERRUPTED)
192 terminate();
193 };
194 m_timers.general.async_wait([this, self, on_wait](const ec_t & ec){
195 boost::asio::post(m_strand, on_wait);
196 });
197 }
198
199 template<typename T>
201 {
202 if (!m_state.timers.general.wait_expire)
203 return;
204 m_state.timers.general.cancel_expire = true;
205 m_state.timers.general.reset_expire = false;
206 m_timers.general.cancel();
207 }
208
209 template<typename T>
211 {
212 if (m_state.socket.wait_handshake)
213 return;
214 static_assert(
215 epee::net_utils::get_ssl_magic_size() <= sizeof(m_state.data.read.buffer),
216 ""
217 );
219 if (!m_state.ssl.forced && !m_state.ssl.detected) {
220 m_state.socket.wait_read = true;
221 boost::asio::async_read(
222 connection_basic::socket_.next_layer(),
223 boost::asio::buffer(
224 m_state.data.read.buffer.data(),
225 m_state.data.read.buffer.size()
226 ),
227 boost::asio::transfer_exactly(epee::net_utils::get_ssl_magic_size()),
228 boost::asio::bind_executor(
229 m_strand,
230 [this, self](const ec_t &ec, size_t bytes_transferred){
231 std::lock_guard<std::mutex> guard(m_state.lock);
232 m_state.socket.wait_read = false;
233 if (m_state.socket.cancel_read) {
234 m_state.socket.cancel_read = false;
235 state_status_check();
236 }
237 else if (ec.value()) {
238 terminate();
239 }
240 else if (
242 static_cast<const unsigned char *>(
243 m_state.data.read.buffer.data()
244 ),
245 bytes_transferred
246 )
247 ) {
248 m_state.ssl.enabled = false;
249 handle_read(bytes_transferred);
250 }
251 else {
252 m_state.ssl.detected = true;
253 start_handshake();
254 }
255 }
256 )
257 );
258 return;
259 }
260
261 m_state.socket.wait_handshake = true;
262 auto on_handshake = [this, self](const ec_t &ec, size_t bytes_transferred){
263 std::lock_guard<std::mutex> guard(m_state.lock);
264 m_state.socket.wait_handshake = false;
265 if (m_state.socket.cancel_handshake) {
266 m_state.socket.cancel_handshake = false;
267 state_status_check();
268 }
269 else if (ec.value()) {
270 ec_t ec;
271 connection_basic::socket_.next_layer().shutdown(
272 socket_t::shutdown_both,
273 ec
274 );
275 connection_basic::socket_.next_layer().close(ec);
276 m_state.socket.connected = false;
277 interrupt();
278 }
279 else {
280 m_state.ssl.handshaked = true;
281 start_write();
282 start_read();
283 }
284 };
285 const auto handshake = handshake_t::server;
286 static_cast<shared_state&>(
288 ).ssl_options().configure(connection_basic::socket_, handshake);
289 boost::asio::post(
290 m_strand,
291 [this, self, on_handshake]{
292 connection_basic::socket_.async_handshake(
293 handshake,
294 boost::asio::buffer(
295 m_state.data.read.buffer.data(),
296 m_state.ssl.forced ? 0 :
298 ),
299 boost::asio::bind_executor(m_strand, on_handshake)
300 );
301 }
302 );
303 }
304
305 template<typename T>
307 {
308 if (m_state.timers.throttle.in.wait_expire || m_state.socket.wait_read ||
309 m_state.socket.handle_read || m_state.socket.shutdown_read
310 ) {
311 return;
312 }
315 auto calc_duration = []{
318 );
319 return std::chrono::duration_cast<connection<T>::duration_t>(
320 std::chrono::duration<double, std::chrono::seconds::period>(
321 std::min(
323 ).get_sleep_time_after_tick(1),
324 1.0
325 )
326 )
327 );
328 };
329 const auto duration = calc_duration();
330 if (duration > duration_t{}) {
331 m_timers.throttle.in.expires_after(duration);
332 m_state.timers.throttle.in.wait_expire = true;
333 auto on_wait = [this, self](const ec_t &ec){
334 std::lock_guard<std::mutex> guard(m_state.lock);
335 m_state.timers.throttle.in.wait_expire = false;
336 if (m_state.timers.throttle.in.cancel_expire) {
337 m_state.timers.throttle.in.cancel_expire = false;
339 }
340 else if (ec.value())
341 interrupt();
342 };
343 m_timers.throttle.in.async_wait([this, self, on_wait](const ec_t &ec){
344 std::lock_guard<std::mutex> guard(m_state.lock);
345 const bool error_status = m_state.timers.throttle.in.cancel_expire || ec.value();
346 if (error_status)
347 boost::asio::post(m_strand, std::bind(on_wait, ec));
348 else {
349 m_state.timers.throttle.in.wait_expire = false;
350 start_read();
351 }
352 });
353 return;
354 }
355 }
356 m_state.socket.wait_read = true;
357 auto on_read = [this, self](const ec_t &ec, size_t bytes_transferred){
358 std::lock_guard<std::mutex> guard(m_state.lock);
359 m_state.socket.wait_read = false;
360 if (m_state.socket.cancel_read) {
361 m_state.socket.cancel_read = false;
362 state_status_check();
364 else if (ec.value())
365 terminate();
366 else {
367 {
368 m_state.stat.in.throttle.handle_trafic_exact(bytes_transferred);
369 const auto speed = m_state.stat.in.throttle.get_current_speed();
370 m_conn_context.m_current_speed_down = speed;
371 m_conn_context.m_max_speed_down = std::max(
372 m_conn_context.m_max_speed_down,
373 speed
374 );
375 if (speed_limit_is_enabled()) {
377 network_throttle_manager_t::m_lock_get_global_throttle_in
378 );
379 network_throttle_manager_t::get_global_throttle_in(
380 ).handle_trafic_exact(bytes_transferred);
381 }
383 m_conn_context.m_last_recv = time(NULL);
384 m_conn_context.m_recv_cnt += bytes_transferred;
385 start_timer(get_timeout_from_bytes_read(bytes_transferred), true);
386 }
387 handle_read(bytes_transferred);
389 };
390 if (!m_state.ssl.enabled)
391 connection_basic::socket_.next_layer().async_read_some(
392 boost::asio::buffer(
393 m_state.data.read.buffer.data(),
394 m_state.data.read.buffer.size()
395 ),
396 boost::asio::bind_executor(m_strand, on_read)
397 );
398 else
399 boost::asio::post(
400 m_strand,
401 [this, self, on_read]{
402 connection_basic::socket_.async_read_some(
403 boost::asio::buffer(
404 m_state.data.read.buffer.data(),
405 m_state.data.read.buffer.size()
406 ),
407 boost::asio::bind_executor(m_strand, on_read)
408 );
409 }
410 );
411 }
412
413 template<typename T>
414 void connection<T>::handle_read(size_t bytes_transferred)
415 {
416 // Post handle_recv to a separate `strand_`, distinct from `m_strand`
417 // which is listening for reads/writes. This avoids a circular dep.
418 // handle_recv can queue many writes, and `m_strand` will process those
419 // writes until the connection terminates without deadlocking waiting
420 // for handle_recv.
421 m_state.socket.handle_read = true;
423 boost::asio::post(
425 [this, self, bytes_transferred]{
426 bool success = false;
427 TRY_ENTRY();
428 success = m_handler.handle_recv(
429 reinterpret_cast<char *>(m_state.data.read.buffer.data()),
430 bytes_transferred
431 );
432 CATCH_ENTRY_SWALLOW_EX("m_handler.handle_recv");
433 std::lock_guard<std::mutex> guard(m_state.lock);
434 const bool error_status = m_state.status == status_t::INTERRUPTED
435 || m_state.status == status_t::TERMINATING
436 || !success;
437 if (!error_status) {
438 m_state.socket.handle_read = false;
439 start_read();
440 return;
441 }
442 boost::asio::post(
443 m_strand,
444 [this, self, success]{
445 // expect error_status == true
446 std::lock_guard<std::mutex> guard(m_state.lock);
447 m_state.socket.handle_read = false;
448 if (m_state.status == status_t::INTERRUPTED)
450 else if (m_state.status == status_t::TERMINATING)
452 else if (!success) {
453 ec_t ec;
454 if (m_state.socket.wait_write) {
455 // Allow the already queued writes time to finish, but no more new reads
456 connection_basic::socket_.next_layer().shutdown(
457 socket_t::shutdown_receive,
458 ec
459 );
460 m_state.socket.shutdown_read = true;
461 }
462 if (!m_state.socket.wait_write || ec.value()) {
463 interrupt();
464 }
465 }
466 }
467 );
468 }
469 );
470 }
471
472 template<typename T>
474 {
475 if (m_state.timers.throttle.out.wait_expire || m_state.socket.wait_write ||
476 m_state.data.write.queue.empty() ||
477 (m_state.ssl.enabled && !m_state.ssl.handshaked)
478 ) {
479 return;
480 }
483 auto calc_duration = [this]{
486 );
487 return std::chrono::duration_cast<connection<T>::duration_t>(
488 std::chrono::duration<double, std::chrono::seconds::period>(
489 std::min(
491 ).get_sleep_time_after_tick(
492 m_state.data.write.queue.back().size()
493 ),
494 1.0
495 )
496 )
497 );
498 };
499 const auto duration = calc_duration();
500 if (duration > duration_t{}) {
501 m_timers.throttle.out.expires_after(duration);
502 m_state.timers.throttle.out.wait_expire = true;
503 auto on_wait = [this, self](const ec_t &ec){
504 std::lock_guard<std::mutex> guard(m_state.lock);
505 m_state.timers.throttle.out.wait_expire = false;
506 if (m_state.timers.throttle.out.cancel_expire) {
507 m_state.timers.throttle.out.cancel_expire = false;
508 state_status_check();
509 }
510 else if (ec.value())
511 interrupt();
512 };
513 m_timers.throttle.out.async_wait([this, self, on_wait](const ec_t &ec){
514 std::lock_guard<std::mutex> guard(m_state.lock);
515 const bool error_status = m_state.timers.throttle.out.cancel_expire || ec.value();
516 if (error_status)
517 boost::asio::post(m_strand, std::bind(on_wait, ec));
518 else {
519 m_state.timers.throttle.out.wait_expire = false;
520 start_write();
521 }
522 });
523 return;
524 }
525 }
526
527 m_state.socket.wait_write = true;
528 auto on_write = [this, self](const ec_t &ec, size_t bytes_transferred){
529 std::lock_guard<std::mutex> guard(m_state.lock);
530 m_state.socket.wait_write = false;
531 if (m_state.socket.cancel_write) {
532 m_state.socket.cancel_write = false;
533 m_state.data.write.queue.clear();
534 m_state.data.write.total_bytes = 0;
535 state_status_check();
536 }
537 else if (ec.value()) {
538 m_state.data.write.queue.clear();
539 m_state.data.write.total_bytes = 0;
540 interrupt();
541 }
542 else {
543 {
544 m_state.stat.out.throttle.handle_trafic_exact(bytes_transferred);
545 const auto speed = m_state.stat.out.throttle.get_current_speed();
546 m_conn_context.m_current_speed_up = speed;
547 m_conn_context.m_max_speed_down = std::max(
548 m_conn_context.m_max_speed_down,
549 speed
550 );
551 if (speed_limit_is_enabled()) {
553 network_throttle_manager_t::m_lock_get_global_throttle_out
554 );
555 network_throttle_manager_t::get_global_throttle_out(
556 ).handle_trafic_exact(bytes_transferred);
557 }
558 connection_basic::logger_handle_net_write(bytes_transferred);
559 m_conn_context.m_last_send = time(NULL);
560 m_conn_context.m_send_cnt += bytes_transferred;
561
562 start_timer(get_default_timeout(), true);
563 }
564 const std::size_t byte_count = m_state.data.write.queue.back().size();
565 assert(bytes_transferred == byte_count);
566 m_state.data.write.queue.pop_back();
567 m_state.data.write.total_bytes -=
568 std::min(m_state.data.write.total_bytes, byte_count);
569 m_state.condition.notify_all();
570 if (m_state.data.write.queue.empty() && m_state.socket.shutdown_read) {
571 // All writes have been sent and reads shutdown already, connection can be closed
572 interrupt();
573 } else {
574 start_write();
575 }
576 }
577 };
578 if (!m_state.ssl.enabled)
579 boost::asio::async_write(
580 connection_basic::socket_.next_layer(),
581 boost::asio::buffer(
582 m_state.data.write.queue.back().data(),
583 m_state.data.write.queue.back().size()
584 ),
585 boost::asio::bind_executor(m_strand, on_write)
586 );
587 else
588 boost::asio::post(
589 m_strand,
590 [this, self, on_write]{
591 boost::asio::async_write(
592 connection_basic::socket_,
593 boost::asio::buffer(
594 m_state.data.write.queue.back().data(),
595 m_state.data.write.queue.back().size()
596 ),
597 boost::asio::bind_executor(m_strand, on_write)
598 );
599 }
600 );
601 }
602
603 template<typename T>
605 {
606 if (m_state.socket.wait_shutdown)
607 return;
609 m_state.socket.wait_shutdown = true;
610 auto on_shutdown = [this, self](const ec_t &ec){
611 std::lock_guard<std::mutex> guard(m_state.lock);
612 m_state.socket.wait_shutdown = false;
613 if (m_state.socket.cancel_shutdown) {
614 m_state.socket.cancel_shutdown = false;
615 switch (m_state.status)
616 {
618 interrupt();
619 break;
621 terminate();
622 break;
625 break;
626 default:
627 break;
628 }
629 }
630 else {
631 terminate();
632 }
633 };
634 boost::asio::post(
635 m_strand,
636 [this, self, on_shutdown]{
637 connection_basic::socket_.async_shutdown(
638 boost::asio::bind_executor(m_strand, on_shutdown)
639 );
640 }
641 );
643 }
644
645 template<typename T>
647 {
648 bool wait_socket = false;
649 if (m_state.socket.wait_handshake)
650 wait_socket = m_state.socket.cancel_handshake = true;
651 if (m_state.timers.throttle.in.wait_expire) {
652 m_state.timers.throttle.in.cancel_expire = true;
653 m_timers.throttle.in.cancel();
654 }
655 if (m_state.socket.wait_read)
656 wait_socket = m_state.socket.cancel_read = true;
657 if (m_state.timers.throttle.out.wait_expire) {
658 m_state.timers.throttle.out.cancel_expire = true;
659 m_timers.throttle.out.cancel();
660 }
661 if (m_state.socket.wait_write)
662 wait_socket = m_state.socket.cancel_write = true;
663 if (m_state.socket.wait_shutdown)
664 wait_socket = m_state.socket.cancel_shutdown = true;
665 if (wait_socket) {
666 ec_t ec;
667 connection_basic::socket_.next_layer().cancel(ec);
668 }
669 }
670
671 template<typename T>
673 {
674 if (m_state.protocol.released || m_state.protocol.wait_release)
675 return;
676 m_state.protocol.wait_release = true;
677 m_state.lock.unlock();
678 m_handler.release_protocol();
679 m_state.lock.lock();
680 m_state.protocol.wait_release = false;
681 m_state.protocol.released = true;
682 if (m_state.status == status_t::INTERRUPTED)
684 else if (m_state.status == status_t::TERMINATING)
686 }
687
688 template<typename T>
690 {
691 if (m_state.status != status_t::RUNNING)
692 return;
694 cancel_timer();
697 m_state.condition.notify_all();
699 }
700
701 template<typename T>
703 {
704 assert(m_state.status == status_t::INTERRUPTED);
705 if (m_state.timers.general.wait_expire)
706 return;
707 if (m_state.socket.wait_handshake)
708 return;
709 if (m_state.timers.throttle.in.wait_expire)
710 return;
711 if (m_state.socket.wait_read)
712 return;
713 if (m_state.socket.handle_read)
714 return;
715 if (m_state.timers.throttle.out.wait_expire)
716 return;
717 // \NOTE See on_terminating() comments
718 //if (m_state.socket.wait_write)
719 // return;
720 if (m_state.socket.wait_shutdown)
721 return;
722 if (m_state.protocol.wait_init)
723 return;
724 if (m_state.protocol.wait_callback)
725 return;
726 if (m_state.protocol.wait_release)
727 return;
728 if (m_state.socket.connected) {
729 if (!m_state.ssl.enabled) {
730 ec_t ec;
731 connection_basic::socket_.next_layer().shutdown(
732 socket_t::shutdown_both,
733 ec
734 );
735 connection_basic::socket_.next_layer().close(ec);
736 m_state.socket.connected = false;
737 m_state.status = status_t::WASTED;
738 }
739 else
741 }
742 else
743 m_state.status = status_t::WASTED;
744 m_state.condition.notify_all();
745 }
746
747 template<typename T>
749 {
750 if (m_state.status != status_t::RUNNING &&
752 )
753 return;
755 cancel_timer();
758 m_state.condition.notify_all();
760 }
761
762 template<typename T>
764 {
765 assert(m_state.status == status_t::TERMINATING);
766 if (m_state.timers.general.wait_expire)
767 return;
768 if (m_state.socket.wait_handshake)
769 return;
770 if (m_state.timers.throttle.in.wait_expire)
771 return;
772 if (m_state.socket.wait_read)
773 return;
774 if (m_state.socket.handle_read)
775 return;
776 if (m_state.timers.throttle.out.wait_expire)
777 return;
778 // Writes cannot be canceled due to `async_write` being a "composed"
779 // handler. ASIO has new cancellation routines, not available in 1.66, to
780 // handle this situation. The problem is that if cancel is called after an
781 // intermediate handler is queued, the op will not check the cancel flag in
782 // our code, and will instead queue up another write.
783 //if (m_state.socket.wait_write)
784 // return;
785 if (m_state.socket.wait_shutdown)
786 return;
787 if (m_state.protocol.wait_init)
788 return;
789 if (m_state.protocol.wait_callback)
790 return;
791 if (m_state.protocol.wait_release)
792 return;
793 if (m_state.socket.connected) {
794 ec_t ec;
795 connection_basic::socket_.next_layer().shutdown(
796 socket_t::shutdown_both,
797 ec
798 );
799 connection_basic::socket_.next_layer().close(ec);
800 m_state.socket.connected = false;
801 }
802 m_state.status = status_t::WASTED;
803 m_state.condition.notify_all();
804 }
805
806 template<typename T>
808 {
809 // synchronize with intermediate writes on `m_strand`
811 boost::asio::post(m_strand, [this, self] {
812 std::lock_guard<std::mutex> guard(m_state.lock);
813 terminate();
814 });
815 }
816
817 template<typename T>
819 {
820 std::lock_guard<std::mutex> guard(m_state.lock);
821 if (m_state.status != status_t::RUNNING || m_state.socket.wait_handshake)
822 return false;
823 if (std::numeric_limits<std::size_t>::max() - m_state.data.write.total_bytes < message.size())
824 return false;
825
826 // Wait for the write queue to fall below the max. If it doesn't after a
827 // randomized delay, drop the connection.
828 auto wait_consume = [this] {
829 auto random_delay = []{
830 using engine = std::mt19937;
831 std::random_device dev;
832 std::seed_seq::result_type rand[
833 engine::state_size // Use complete bit space
834 ]{};
835 std::generate_n(rand, engine::state_size, std::ref(dev));
836 std::seed_seq seed(rand, rand + engine::state_size);
837 engine rng(seed);
838 return std::chrono::milliseconds(
839 std::uniform_int_distribution<>(5000, 6000)(rng)
840 );
841 };
842
843 // The bytes check intentionally does not include incoming message size.
844 // This allows for a soft overflow; a single http response will never fail
845 // this check, but multiple responses could. Clients can avoid this case
846 // by reading the entire response before making another request. P2P
847 // should never hit the MAX_BYTES check (when using default values).
848 if (m_state.data.write.queue.size() <= ABSTRACT_SERVER_SEND_QUE_MAX_COUNT &&
849 m_state.data.write.total_bytes <= static_cast<shared_state&>(connection_basic::get_state()).response_soft_limit)
850 return true;
851 m_state.data.write.wait_consume = true;
852 bool success = m_state.condition.wait_for(
853 m_state.lock,
854 random_delay(),
855 [this]{
856 return (
857 m_state.status != status_t::RUNNING ||
858 (
859 m_state.data.write.queue.size() <=
861 m_state.data.write.total_bytes <=
863 )
864 );
865 }
866 );
867 m_state.data.write.wait_consume = false;
868 if (!success) {
870 return false;
871 }
872 else
873 return m_state.status == status_t::RUNNING;
874 };
875 auto wait_sender = [this] {
876 m_state.condition.wait(
877 m_state.lock,
878 [this] {
879 return (
880 m_state.status != status_t::RUNNING ||
881 !m_state.data.write.wait_consume
882 );
883 }
884 );
885 return m_state.status == status_t::RUNNING;
886 };
887 if (!wait_sender())
888 return false;
889 constexpr size_t CHUNK_SIZE = 32 * 1024;
891 message.size() <= 2 * CHUNK_SIZE
892 ) {
893 if (!wait_consume())
894 return false;
895 const std::size_t byte_count = message.size();
896 m_state.data.write.queue.emplace_front(std::move(message));
897 m_state.data.write.total_bytes += byte_count;
898 start_write();
899 }
900 else {
901 while (!message.empty()) {
902 if (!wait_consume())
903 return false;
904 m_state.data.write.queue.emplace_front(
905 message.take_slice(CHUNK_SIZE)
906 );
907 m_state.data.write.total_bytes += m_state.data.write.queue.front().size();
908 start_write();
909 }
910 }
911 m_state.condition.notify_all();
912 return true;
913 }
914
915 template<typename T>
917 bool is_income,
918 bool is_multithreaded,
919 boost::optional<network_address> real_remote
920 )
921 {
922 std::unique_lock<std::mutex> guard(m_state.lock);
923 if (m_state.status != status_t::TERMINATED)
924 return false;
925 if (!real_remote) {
926 ec_t ec;
927 auto endpoint = connection_basic::socket_.next_layer().remote_endpoint(
928 ec
929 );
930 if (ec.value())
931 return false;
932 real_remote = (
933 endpoint.address().is_v6() ?
935 ipv6_network_address{endpoint.address().to_v6(), endpoint.port()}
936 } :
939 uint32_t{
940 boost::asio::detail::socket_ops::host_to_network_long(
941 endpoint.address().to_v4().to_uint()
942 )
943 },
944 endpoint.port()
945 }
946 }
947 );
948 }
949 auto *filter = static_cast<shared_state&>(
951 ).pfilter;
952 if (filter && !filter->is_remote_host_allowed(*real_remote))
953 return false;
954
955 auto *limit = static_cast<shared_state&>(
957 ).plimit;
958 if (is_income && limit && limit->is_host_limit(*real_remote))
959 return false;
960
961 ec_t ec;
962 #if !defined(_WIN32) || !defined(__i686)
963 connection_basic::socket_.next_layer().set_option(
964 boost::asio::detail::socket_option::integer<IPPROTO_IP, IP_TOS>{
966 },
967 ec
968 );
969 if (ec.value())
970 return false;
971 #endif
972 connection_basic::socket_.next_layer().set_option(
973 boost::asio::ip::tcp::no_delay{false},
974 ec
975 );
976 if (ec.value())
977 return false;
978 connection_basic::m_is_multithreaded = is_multithreaded;
979 m_conn_context.set_details(
980 boost::uuids::random_generator()(),
981 *real_remote,
982 is_income,
984 );
985 m_host = real_remote->host_str();
986 try { host_count(1); } catch(...) { /* ignore */ }
987 m_local = real_remote->is_loopback() || real_remote->is_local();
988 m_state.ssl.enabled = (
990 );
991 m_state.ssl.forced = (
993 );
994 m_state.socket.connected = true;
995 m_state.status = status_t::RUNNING;
997 std::chrono::milliseconds(
999 )
1000 );
1001 m_state.protocol.wait_init = true;
1002 guard.unlock();
1003 m_handler.after_init_connection();
1004 guard.lock();
1005 m_state.protocol.wait_init = false;
1006 m_state.protocol.initialized = true;
1007 if (m_state.status == status_t::INTERRUPTED)
1009 else if (m_state.status == status_t::TERMINATING)
1011 else if (!is_income || !m_state.ssl.enabled)
1012 start_read();
1013 else
1015 return true;
1016 }
1017
1018 template<typename T>
1020 io_context_t &io_context,
1021 std::shared_ptr<shared_state> shared_state,
1022 t_connection_type connection_type,
1023 ssl_support_t ssl_support,
1024 t_connection_context&& initial
1025 ):
1026 connection(
1027 io_context,
1028 socket_t{io_context},
1029 std::move(shared_state),
1030 connection_type,
1031 ssl_support,
1032 std::move(initial)
1033 )
1034 {
1035 }
1036
1037 template<typename T>
1039 io_context_t &io_context,
1040 socket_t &&socket,
1041 std::shared_ptr<shared_state> shared_state,
1042 t_connection_type connection_type,
1043 ssl_support_t ssl_support,
1044 t_connection_context&& initial
1045 ):
1046 connection_basic(io_context, std::move(socket), shared_state, ssl_support),
1048 m_connection_type(connection_type),
1049 m_io_context{io_context},
1050 m_conn_context(std::move(initial)),
1053 {
1054 }
1055
1056 template<typename T>
1058 {
1059 std::lock_guard<std::mutex> guard(m_state.lock);
1060 assert(m_state.status == status_t::TERMINATED ||
1061 m_state.status == status_t::WASTED ||
1062 m_io_context.stopped()
1063 );
1064 if (m_state.status != status_t::WASTED)
1065 return;
1066 try { host_count(-1); } catch (...) { /* ignore */ }
1067 }
1068
1069 template<typename T>
1071 bool is_income,
1072 bool is_multithreaded
1073 )
1074 {
1075 return start_internal(is_income, is_multithreaded, {});
1076 }
1077
1078 template<typename T>
1080 bool is_income,
1081 bool is_multithreaded,
1082 network_address real_remote
1083 )
1084 {
1085 return start_internal(is_income, is_multithreaded, real_remote);
1086 }
1087
1088 template<typename T>
1090 {
1091 std::lock_guard<std::mutex> guard(m_state.lock);
1092 std::string address;
1093 std::string port;
1094 ec_t ec;
1095 auto endpoint = connection_basic::socket().remote_endpoint(ec);
1096 if (ec.value()) {
1097 address = "<not connected>";
1098 port = "<not connected>";
1099 }
1100 else {
1101 address = endpoint.address().to_string();
1102 port = std::to_string(endpoint.port());
1103 }
1104 MDEBUG(
1105 " connection type " << std::to_string(m_connection_type) <<
1106 " " << connection_basic::socket().local_endpoint().address().to_string() <<
1107 ":" << connection_basic::socket().local_endpoint().port() <<
1108 " <--> " << m_conn_context.m_remote_address.str() <<
1109 " (via " << address << ":" << port << ")"
1110 );
1111 }
1112
1113 template<typename T>
1118
1119 template<typename T>
1121 {
1122 return close(false);
1123 }
1124
1125 template<typename T>
1127 {
1128 return send(std::move(message));
1129 }
1130
1131 template<typename T>
1133 {
1134 return true;
1135 }
1136
1137 template<typename T>
1138 bool connection<T>::close(const bool wait_for_shutdown)
1139 {
1140 std::lock_guard<std::mutex> guard(m_state.lock);
1141 if (m_state.status != status_t::RUNNING)
1142 return false;
1144
1145 // Sometimes we do *not* want to wait for the connection to shut down because for example handle_recv might try to
1146 // close the connection when handling a request. But handle_recv can't complete the shutdown sequence because
1147 // handle_read is set to true. So, in that case we call terminate_async and return here.
1148 if (!wait_for_shutdown)
1149 return true;
1150
1151 // Sometimes we *do* want to wait for the connection to shut down for example when stopping the server. When
1152 // stopping the server, we don't want the io_context to stop before the shutdown sequence completes, since we
1153 // execute terminate inside m_strand. So we wait for the connection's shutdown sequence to complete before stopping
1154 // the io_context.
1155 MDEBUG("Waiting for connection " << m_conn_context.m_connection_id << " to shutdown, current state: " << m_state.status);
1156 const bool shutdown = m_state.condition.wait_for(
1157 m_state.lock,
1158 std::chrono::seconds(5),
1159 [this]{
1160 return (
1161 m_state.status == status_t::TERMINATED || m_state.status == status_t::WASTED
1162 );
1163 }
1164 );
1165 if (shutdown)
1166 MDEBUG("Shut down connection " << m_conn_context.m_connection_id);
1167 else
1168 MERROR("Connection " << m_conn_context.m_connection_id << " did not shut down");
1169
1170 return shutdown;
1171 }
1172
1173 template<typename T>
1175 {
1177 if (!m_io_context.poll_one())
1179 }
1180 else {
1181 if (!m_io_context.run_one())
1182 return false;
1183 }
1184 return true;
1185 }
1186
1187 template<typename T>
1189 {
1190 std::lock_guard<std::mutex> guard(m_state.lock);
1191 if (m_state.status != status_t::RUNNING)
1192 return false;
1194 ++m_state.protocol.wait_callback;
1195 boost::asio::post(connection_basic::strand_, [this, self]{
1196 TRY_ENTRY();
1197 m_handler.handle_qued_callback();
1198 CATCH_ENTRY_SWALLOW_EX("m_handler.handle_qued_callback");
1199 std::lock_guard<std::mutex> guard(m_state.lock);
1200 --m_state.protocol.wait_callback;
1201 if (m_state.status == status_t::INTERRUPTED)
1203 else if (m_state.status == status_t::TERMINATING)
1205 });
1206 return true;
1207 }
1208
1209 template<typename T>
1214
1215 template<typename T>
1217 {
1218 try {
1220 std::lock_guard<std::mutex> guard(m_state.lock);
1221 this->self = std::move(self);
1222 ++m_state.protocol.reference_counter;
1223 return true;
1224 }
1225 catch (boost::bad_weak_ptr &exception) {
1226 return false;
1227 }
1228 }
1229
1230 template<typename T>
1232 {
1234 std::lock_guard<std::mutex> guard(m_state.lock);
1235 if (!(--m_state.protocol.reference_counter))
1236 self = std::move(this->self);
1237 return true;
1238 }
1239
1240 template<typename T>
1242 {
1243 std::lock_guard<std::mutex> guard(m_state.lock);
1245 }
1246
1247 template<class t_protocol_handler>
1249 m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()),
1256 m_threads_count(0),
1257 m_thread_index(0),
1258 m_connection_type( connection_type ),
1261 {
1263 m_thread_name_prefix = "NET";
1264 }
1265
1266 template<class t_protocol_handler>
1267 boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_context& extarnal_io_context, t_connection_type connection_type) :
1268 m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()),
1269 io_context_(extarnal_io_context),
1274 m_threads_count(0),
1275 m_thread_index(0),
1276 m_connection_type(connection_type),
1279 {
1281 m_thread_name_prefix = "NET";
1282 }
1283 //---------------------------------------------------------------------------------
1284 template<class t_protocol_handler>
1290 //---------------------------------------------------------------------------------
1291 template<class t_protocol_handler>
1298 //---------------------------------------------------------------------------------
1299 template<class t_protocol_handler>
1301 uint32_t port_ipv6, const std::string& address_ipv6, bool use_ipv6, bool require_ipv4,
1302 ssl_options_t ssl_options)
1303 {
1304 TRY_ENTRY();
1305 m_stop_signal_sent = false;
1306 m_port = port;
1307 m_port_ipv6 = port_ipv6;
1309 m_address_ipv6 = address_ipv6;
1310 m_use_ipv6 = use_ipv6;
1311 m_require_ipv4 = require_ipv4;
1312
1313 if (ssl_options)
1314 m_state->configure_ssl(std::move(ssl_options));
1315
1316 std::string ipv4_failed = "";
1317 std::string ipv6_failed = "";
1318
1319 boost::asio::ip::tcp::resolver resolver(io_context_);
1320
1321 try
1322 {
1323 const auto results = resolver.resolve(
1324 address, boost::lexical_cast<std::string>(port), boost::asio::ip::tcp::resolver::canonical_name
1325 );
1326 acceptor_.open(results.begin()->endpoint().protocol());
1327#if !defined(_WIN32)
1328 acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
1329#endif
1330 acceptor_.bind(*results.begin());
1331 acceptor_.listen();
1332 boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_.local_endpoint();
1333 m_port = binded_endpoint.port();
1334 MDEBUG("start accept (IPv4)");
1336 acceptor_.async_accept(new_connection_->socket(),
1338 boost::asio::placeholders::error));
1339 }
1340 catch (const std::exception &e)
1341 {
1342 ipv4_failed = e.what();
1343 }
1344
1345 if (ipv4_failed != "")
1346 {
1347 MERROR("Failed to bind IPv4: " << ipv4_failed);
1348 if (require_ipv4)
1349 {
1350 throw std::runtime_error("Failed to bind IPv4 (set to required)");
1351 }
1352 }
1353
1354 if (use_ipv6)
1355 {
1356 try
1357 {
1358 if (port_ipv6 == 0) port_ipv6 = port; // default arg means bind to same port as ipv4
1359
1360 const auto results = resolver.resolve(
1361 address_ipv6, boost::lexical_cast<std::string>(port_ipv6), boost::asio::ip::tcp::resolver::canonical_name
1362 );
1363
1364 acceptor_ipv6.open(results.begin()->endpoint().protocol());
1365#if !defined(_WIN32)
1366 acceptor_ipv6.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
1367#endif
1368 acceptor_ipv6.set_option(boost::asio::ip::v6_only(true));
1369 acceptor_ipv6.bind(*results.begin());
1370 acceptor_ipv6.listen();
1371 boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_ipv6.local_endpoint();
1372 m_port_ipv6 = binded_endpoint.port();
1373 MDEBUG("start accept (IPv6)");
1375 acceptor_ipv6.async_accept(new_connection_ipv6->socket(),
1377 boost::asio::placeholders::error));
1378 }
1379 catch (const std::exception &e)
1380 {
1381 ipv6_failed = e.what();
1382 }
1383 }
1384
1385 if (use_ipv6 && ipv6_failed != "")
1386 {
1387 MERROR("Failed to bind IPv6: " << ipv6_failed);
1388 if (ipv4_failed != "")
1389 {
1390 throw std::runtime_error("Failed to bind IPv4 and IPv6");
1391 }
1392 }
1393
1394 return true;
1395 }
1396 catch (const std::exception &e)
1397 {
1398 MFATAL("Error starting server: " << e.what());
1399 return false;
1400 }
1401 catch (...)
1402 {
1403 MFATAL("Error starting server");
1404 return false;
1405 }
1406 }
1407 //-----------------------------------------------------------------------------
1408 template<class t_protocol_handler>
1409 bool boosted_tcp_server<t_protocol_handler>::init_server(const std::string port, const std::string& address,
1410 const std::string port_ipv6, const std::string address_ipv6, bool use_ipv6, bool require_ipv4,
1411 ssl_options_t ssl_options)
1412 {
1413 uint32_t p = 0;
1414 uint32_t p_ipv6 = 0;
1415
1416 if (port.size() && !string_tools::get_xtype_from_string(p, port)) {
1417 MERROR("Failed to convert port no = " << port);
1418 return false;
1419 }
1420
1421 if (port_ipv6.size() && !string_tools::get_xtype_from_string(p_ipv6, port_ipv6)) {
1422 MERROR("Failed to convert port no = " << port_ipv6);
1423 return false;
1424 }
1425 return this->init_server(p, address, p_ipv6, address_ipv6, use_ipv6, require_ipv4, std::move(ssl_options));
1426 }
1427 //---------------------------------------------------------------------------------
1428 template<class t_protocol_handler>
1430 {
1431 TRY_ENTRY();
1432 const uint32_t local_thr_index = m_thread_index++; // atomically increment, getting value before increment
1433 std::string thread_name = std::string("[") + m_thread_name_prefix;
1434 thread_name += boost::to_string(local_thr_index) + "]";
1435 MLOG_SET_THREAD_NAME(thread_name);
1436 // _fact("Thread name: " << m_thread_name_prefix);
1437 while(!m_stop_signal_sent)
1438 {
1439 try
1440 {
1441 io_context_.run();
1442 return true;
1443 }
1444 catch(const std::exception& ex)
1445 {
1446 _erro("Exception at server worker thread, what=" << ex.what());
1447 }
1448 catch(...)
1449 {
1450 _erro("Exception at server worker thread, unknown execption");
1451 }
1452 }
1453 //_info("Worker thread finished");
1454 return true;
1455 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::worker_thread", false);
1456 }
1457 //---------------------------------------------------------------------------------
1458 template<class t_protocol_handler>
1460 {
1461 m_thread_name_prefix = prefix_name;
1462 auto it = server_type_map.find(m_thread_name_prefix);
1463 if (it==server_type_map.end()) throw std::runtime_error("Unknown prefix/server type:" + std::string(prefix_name));
1464 auto connection_type = it->second; // the value of type
1465 MINFO("Set server type to: " << connection_type << " from name: " << m_thread_name_prefix << ", prefix_name = " << prefix_name);
1466 }
1467 //---------------------------------------------------------------------------------
1468 template<class t_protocol_handler>
1470 {
1471 assert(m_state != nullptr); // always set in constructor
1472 m_state->pfilter = pfilter;
1473 }
1474 //---------------------------------------------------------------------------------
1475 template<class t_protocol_handler>
1477 {
1478 assert(m_state != nullptr); // always set in constructor
1479 m_state->plimit = plimit;
1480 }
1481 //---------------------------------------------------------------------------------
1482 template<class t_protocol_handler>
1484 {
1485 assert(m_state != nullptr); // always set in constructor
1486 m_state->response_soft_limit = limit;
1487 }
1488 //---------------------------------------------------------------------------------
1489 template<class t_protocol_handler>
1490 bool boosted_tcp_server<t_protocol_handler>::run_server(size_t threads_count, bool wait, const boost::thread::attributes& attrs)
1491 {
1492 TRY_ENTRY();
1493 m_threads_count = threads_count;
1494 m_main_thread_id = boost::this_thread::get_id();
1495 MLOG_SET_THREAD_NAME("[SRV_MAIN]");
1496 while(!m_stop_signal_sent)
1497 {
1498
1499 // Create a pool of threads to run all of the io_contexts.
1501 for (std::size_t i = 0; i < threads_count; ++i)
1502 {
1503 boost::shared_ptr<boost::thread> thread(new boost::thread(
1504 attrs, boost::bind(&boosted_tcp_server<t_protocol_handler>::worker_thread, this)));
1505 _note("Run server thread name: " << m_thread_name_prefix);
1506 m_threads.push_back(thread);
1507 }
1509 // Wait for all threads in the pool to exit.
1510 if (wait)
1511 {
1512 _fact("JOINING all threads");
1513 for (std::size_t i = 0; i < m_threads.size(); ++i) {
1514 m_threads[i]->join();
1515 }
1516 _fact("JOINING all threads - almost");
1517 m_threads.clear();
1518 _fact("JOINING all threads - DONE");
1519
1520 }
1521 else {
1522 _dbg1("Reiniting OK.");
1523 return true;
1524 }
1525
1526 if(wait && !m_stop_signal_sent)
1527 {
1528 //some problems with the listening socket ?..
1529 _dbg1("Net service stopped without stop request, restarting...");
1531 {
1532 _dbg1("Reiniting service failed, exit.");
1533 return false;
1534 }else
1535 {
1536 _dbg1("Reiniting OK.");
1537 }
1538 }
1539 }
1540 return true;
1541 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::run_server", false);
1542 }
1543 //---------------------------------------------------------------------------------
1544 template<class t_protocol_handler>
1546 {
1547 TRY_ENTRY();
1549 BOOST_FOREACH(boost::shared_ptr<boost::thread>& thp, m_threads)
1550 {
1551 if(thp->get_id() == boost::this_thread::get_id())
1552 return true;
1553 }
1554 if(m_threads_count == 1 && boost::this_thread::get_id() == m_main_thread_id)
1555 return true;
1556 return false;
1557 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::is_thread_worker", false);
1558 }
1559 //---------------------------------------------------------------------------------
1560 template<class t_protocol_handler>
1562 {
1563 TRY_ENTRY();
1564 boost::chrono::milliseconds ms(wait_mseconds);
1565 for (std::size_t i = 0; i < m_threads.size(); ++i)
1566 {
1567 if(m_threads[i]->joinable() && !m_threads[i]->try_join_for(ms))
1568 {
1569 _dbg1("Interrupting thread " << m_threads[i]->native_handle());
1570 m_threads[i]->interrupt();
1571 }
1572 }
1573 return true;
1574 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::timed_wait_server_stop", false);
1575 }
1576 //---------------------------------------------------------------------------------
1577 template<class t_protocol_handler>
1578 void boosted_tcp_server<t_protocol_handler>::send_stop_signal(std::function<void()> close_all_connections)
1579 {
1580 m_stop_signal_sent = true;
1582 state->stop_signal_sent = true;
1583 TRY_ENTRY();
1584 connections_mutex.lock();
1585 for (auto &c: connections_)
1586 {
1587 c->cancel();
1588 }
1589 connections_.clear();
1590 connections_mutex.unlock();
1591
1592 // Since we shut down connections in the strand, we want to make sure to complete the shutdown sequence before
1593 // stopping the io_context. We let the caller handle closing because the caller is the one keeping track of all
1594 // connections (connections_ is only a subset of all connections).
1595 close_all_connections();
1596 io_context_.stop();
1597 MDEBUG("Done with send_stop_signal");
1598 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::send_stop_signal()", void());
1599 }
1600 //---------------------------------------------------------------------------------
1601 template<class t_protocol_handler>
1602 void boosted_tcp_server<t_protocol_handler>::handle_accept_ipv4(const boost::system::error_code& e)
1603 {
1604 this->handle_accept(e, false);
1605 }
1606 //---------------------------------------------------------------------------------
1607 template<class t_protocol_handler>
1608 void boosted_tcp_server<t_protocol_handler>::handle_accept_ipv6(const boost::system::error_code& e)
1609 {
1610 this->handle_accept(e, true);
1611 }
1612 //---------------------------------------------------------------------------------
1613 template<class t_protocol_handler>
1614 void boosted_tcp_server<t_protocol_handler>::handle_accept(const boost::system::error_code& e, bool ipv6)
1615 {
1616 MDEBUG("handle_accept");
1617
1618 boost::asio::ip::tcp::acceptor* current_acceptor = &acceptor_;
1619 connection_ptr* current_new_connection = &new_connection_;
1620 auto accept_function_pointer = &boosted_tcp_server<t_protocol_handler>::handle_accept_ipv4;
1621 if (ipv6)
1622 {
1623 current_acceptor = &acceptor_ipv6;
1624 current_new_connection = &new_connection_ipv6;
1626 }
1627
1628 bool accept_started = false;
1629 try
1630 {
1631 if (!e)
1632 {
1634 const char *ssl_message = "unknown";
1635 switch ((*current_new_connection)->get_ssl_support())
1636 {
1637 case epee::net_utils::ssl_support_t::e_ssl_support_disabled: ssl_message = "disabled"; break;
1638 case epee::net_utils::ssl_support_t::e_ssl_support_enabled: ssl_message = "enabled"; break;
1639 case epee::net_utils::ssl_support_t::e_ssl_support_autodetect: ssl_message = "autodetection"; break;
1640 }
1641 MDEBUG("New server for RPC connections, SSL " << ssl_message);
1642 (*current_new_connection)->setRpcStation(); // hopefully this is not needed actually
1643 }
1644 connection_ptr conn(std::move((*current_new_connection)));
1645 (*current_new_connection).reset(new connection<t_protocol_handler>(io_context_, m_state, m_connection_type, conn->get_ssl_support()));
1646 current_acceptor->async_accept((*current_new_connection)->socket(),
1647 boost::bind(accept_function_pointer, this,
1648 boost::asio::placeholders::error));
1649 accept_started = true;
1650
1651 boost::asio::socket_base::keep_alive opt(true);
1652 conn->socket().set_option(opt);
1653
1654 bool res;
1656 res = conn->start(true, 1 < m_threads_count);
1657 else
1658 res = conn->start(true, 1 < m_threads_count, default_remote);
1659 if (!res)
1660 {
1661 conn->cancel();
1662 return;
1663 }
1664 conn->save_dbg_log();
1665 return;
1666 }
1667 else
1668 {
1669 MERROR("Error in boosted_tcp_server<t_protocol_handler>::handle_accept: " << e);
1670 }
1671 }
1672 catch (const std::exception &e)
1673 {
1674 MERROR("Exception in boosted_tcp_server<t_protocol_handler>::handle_accept: " << e.what());
1675 if (accept_started)
1676 return;
1677 }
1678
1679 // error path, if e or exception
1680 assert(m_state != nullptr); // always set in constructor
1681 _erro("Some problems at accept: " << e.message() << ", connections_count = " << m_state->sock_count);
1683 (*current_new_connection).reset(new connection<t_protocol_handler>(io_context_, m_state, m_connection_type, (*current_new_connection)->get_ssl_support()));
1684 current_acceptor->async_accept((*current_new_connection)->socket(),
1685 boost::bind(accept_function_pointer, this,
1686 boost::asio::placeholders::error));
1687 }
1688 //---------------------------------------------------------------------------------
1689 template<class t_protocol_handler>
1691 {
1692 if(std::addressof(get_io_context()) == std::addressof(sock.get_executor().context()))
1693 {
1695 if(conn->start(false, 1 < m_threads_count, std::move(real_remote)))
1696 {
1697 conn->get_context(out);
1698 conn->save_dbg_log();
1699 return true;
1700 }
1701 }
1702 else
1703 {
1704 MWARNING(out << " was not added, socket/io_context mismatch");
1705 }
1706 return false;
1707 }
1708 //---------------------------------------------------------------------------------
1709 template<class t_protocol_handler>
1710 typename boosted_tcp_server<t_protocol_handler>::try_connect_result_t boosted_tcp_server<t_protocol_handler>::try_connect(connection_ptr new_connection_l, const std::string& adr, const std::string& port, boost::asio::ip::tcp::socket &sock_, const boost::asio::ip::tcp::endpoint &remote_endpoint, const std::string &bind_ip, uint32_t conn_timeout, epee::net_utils::ssl_support_t ssl_support)
1711 {
1712 TRY_ENTRY();
1713
1714 sock_.open(remote_endpoint.protocol());
1715 if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" )
1716 {
1717 boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::make_address(bind_ip), 0);
1718 boost::system::error_code ec;
1719 sock_.bind(local_endpoint, ec);
1720 if (ec)
1721 {
1722 MERROR("Error binding to " << bind_ip << ": " << ec.message());
1723 if (sock_.is_open())
1724 sock_.close();
1725 return CONNECT_FAILURE;
1726 }
1727 }
1728
1729 /*
1730 NOTICE: be careful to make sync connection from event handler: in case if all threads suddenly do sync connect, there will be no thread to dispatch events from io service.
1731 */
1732
1733 boost::system::error_code ec = boost::asio::error::would_block;
1734
1735 //have another free thread(s), work in wait mode, without event handling
1736 struct local_async_context
1737 {
1738 boost::system::error_code ec;
1739 boost::mutex connect_mut;
1740 boost::condition_variable cond;
1741 };
1742
1743 boost::shared_ptr<local_async_context> local_shared_context(new local_async_context());
1744 local_shared_context->ec = boost::asio::error::would_block;
1745 boost::unique_lock<boost::mutex> lock(local_shared_context->connect_mut);
1746 auto connect_callback = [](boost::system::error_code ec_, boost::shared_ptr<local_async_context> shared_context)
1747 {
1748 shared_context->connect_mut.lock(); shared_context->ec = ec_; shared_context->cond.notify_one(); shared_context->connect_mut.unlock();
1749 };
1750
1751 sock_.async_connect(remote_endpoint, std::bind<void>(connect_callback, std::placeholders::_1, local_shared_context));
1752 while(local_shared_context->ec == boost::asio::error::would_block)
1753 {
1754 bool r = local_shared_context->cond.timed_wait(lock, boost::get_system_time() + boost::posix_time::milliseconds(conn_timeout));
1756 {
1757 if (sock_.is_open())
1758 sock_.close();
1759 return CONNECT_FAILURE;
1760 }
1761 if(local_shared_context->ec == boost::asio::error::would_block && !r)
1762 {
1763 //timeout
1764 sock_.close();
1765 _dbg3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")");
1766 return CONNECT_FAILURE;
1767 }
1768 }
1769 ec = local_shared_context->ec;
1770
1771 if (ec || !sock_.is_open())
1772 {
1773 _dbg3("Some problems at connect, message: " << ec.message());
1774 if (sock_.is_open())
1775 sock_.close();
1776 return CONNECT_FAILURE;
1777 }
1778
1779 _dbg3("Connected success to " << adr << ':' << port);
1780
1781 const ssl_support_t ssl_support = new_connection_l->get_ssl_support();
1783 {
1784 // Handshake
1785 MDEBUG("Handshaking SSL...");
1786 if (!new_connection_l->handshake(boost::asio::ssl::stream_base::client))
1787 {
1789 {
1790 boost::system::error_code ignored_ec;
1791 sock_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec);
1792 sock_.close();
1793 return CONNECT_NO_SSL;
1794 }
1795 MERROR("SSL handshake failed");
1796 if (sock_.is_open())
1797 sock_.close();
1798 return CONNECT_FAILURE;
1799 }
1800 }
1801
1802 return CONNECT_SUCCESS;
1803
1804 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::try_connect", CONNECT_FAILURE);
1805 }
1806 //---------------------------------------------------------------------------------
1807 template<class t_protocol_handler>
1808 bool boosted_tcp_server<t_protocol_handler>::connect(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_connection_context& conn_context, const std::string& bind_ip, epee::net_utils::ssl_support_t ssl_support)
1809 {
1810 TRY_ENTRY();
1811
1813 connections_mutex.lock();
1814 connections_.insert(new_connection_l);
1815 MDEBUG("connections_ size now " << connections_.size());
1816 connections_mutex.unlock();
1818 boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket();
1819
1820 bool try_ipv6 = false;
1821
1822 boost::asio::ip::tcp::resolver resolver(io_context_);
1823 boost::asio::ip::tcp::resolver::results_type results{};
1824 boost::system::error_code resolve_error;
1825
1826 try
1827 {
1828 //resolving ipv4 address as ipv6 throws, catch here and move on
1829 results = resolver.resolve(
1830 boost::asio::ip::tcp::v4(), adr, port, boost::asio::ip::tcp::resolver::canonical_name, resolve_error
1831 );
1832 }
1833 catch (const boost::system::system_error& e)
1834 {
1835 if (!m_use_ipv6 || (resolve_error != boost::asio::error::host_not_found &&
1836 resolve_error != boost::asio::error::host_not_found_try_again))
1837 {
1838 throw;
1839 }
1840 try_ipv6 = true;
1841 }
1842 catch (...)
1843 {
1844 throw;
1845 }
1846
1847 std::string bind_ip_to_use;
1848
1849 if(results.empty())
1850 {
1851 if (!m_use_ipv6)
1852 {
1853 _erro("Failed to resolve " << adr);
1854 return false;
1855 }
1856 else
1857 {
1858 try_ipv6 = true;
1859 MINFO("Resolving address as IPv4 failed, trying IPv6");
1860 }
1861 }
1862 else
1863 {
1864 bind_ip_to_use = bind_ip;
1865 }
1866
1867 if (try_ipv6)
1868 {
1869 results = resolver.resolve(
1870 boost::asio::ip::tcp::v6(), adr, port, boost::asio::ip::tcp::resolver::canonical_name, resolve_error
1871 );
1872
1873 if(results.empty())
1874 {
1875 _erro("Failed to resolve " << adr);
1876 return false;
1877 }
1878 else
1879 {
1880 if (bind_ip == "0.0.0.0")
1881 {
1882 bind_ip_to_use = "::";
1883 }
1884 else
1885 {
1886 bind_ip_to_use = "";
1887 }
1888
1889 }
1890
1891 }
1892
1893 const auto iterator = results.begin();
1894
1895 MDEBUG("Trying to connect to " << adr << ":" << port << ", bind_ip = " << bind_ip_to_use);
1896
1897 //boost::asio::ip::tcp::endpoint remote_endpoint(boost::asio::ip::address::from_string(addr.c_str()), port);
1898 boost::asio::ip::tcp::endpoint remote_endpoint(*iterator);
1899
1900 auto try_connect_result = try_connect(new_connection_l, adr, port, sock_, remote_endpoint, bind_ip_to_use, conn_timeout, ssl_support);
1901 if (try_connect_result == CONNECT_FAILURE)
1902 return false;
1903 if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect && try_connect_result == CONNECT_NO_SSL)
1904 {
1905 // we connected, but could not connect with SSL, try without
1906 MERROR("SSL handshake failed on an autodetect connection, reconnecting without SSL");
1907 new_connection_l->disable_ssl();
1908 try_connect_result = try_connect(new_connection_l, adr, port, sock_, remote_endpoint, bind_ip_to_use, conn_timeout, epee::net_utils::ssl_support_t::e_ssl_support_disabled);
1909 if (try_connect_result != CONNECT_SUCCESS)
1910 return false;
1911 }
1912
1913 // start adds the connection to the config object's list, so we don't need to have it locally anymore
1914 connections_mutex.lock();
1915 connections_.erase(new_connection_l);
1916 connections_mutex.unlock();
1917 bool r = new_connection_l->start(false, 1 < m_threads_count);
1918 if (r)
1919 {
1920 new_connection_l->get_context(conn_context);
1921 }
1922 else
1923 {
1924 assert(m_state != nullptr); // always set in constructor
1925 _erro("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection, connections_count = " << m_state->sock_count);
1926 }
1927
1928 new_connection_l->save_dbg_log();
1929
1930 return r;
1931
1932 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect", false);
1933 }
1934 //---------------------------------------------------------------------------------
1935 template<class t_protocol_handler> template<class t_callback>
1936 bool boosted_tcp_server<t_protocol_handler>::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, const t_callback &cb, const std::string& bind_ip, epee::net_utils::ssl_support_t ssl_support, t_connection_context&& initial)
1937 {
1938 TRY_ENTRY();
1939 connection_ptr new_connection_l(new connection<t_protocol_handler>(io_context_, m_state, m_connection_type, ssl_support, std::move(initial)) );
1940 connections_mutex.lock();
1941 connections_.insert(new_connection_l);
1942 MDEBUG("connections_ size now " << connections_.size());
1943 connections_mutex.unlock();
1945 boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket();
1946
1947 bool try_ipv6 = false;
1948
1949 boost::asio::ip::tcp::resolver resolver(io_context_);
1950 boost::asio::ip::tcp::resolver::results_type results{};
1951 boost::system::error_code resolve_error;
1952
1953 try
1954 {
1955 //resolving ipv4 address as ipv6 throws, catch here and move on
1956 results = resolver.resolve(
1957 boost::asio::ip::tcp::v4(), adr, port, boost::asio::ip::tcp::resolver::canonical_name, resolve_error
1958 );
1959 }
1960 catch (const boost::system::system_error& e)
1961 {
1962 if (!m_use_ipv6 || (resolve_error != boost::asio::error::host_not_found &&
1963 resolve_error != boost::asio::error::host_not_found_try_again))
1964 {
1965 throw;
1966 }
1967 try_ipv6 = true;
1968 }
1969 catch (...)
1970 {
1971 throw;
1972 }
1973
1974 if(results.empty())
1975 {
1976 if (!try_ipv6)
1977 {
1978 _erro("Failed to resolve " << adr);
1979 return false;
1980 }
1981 else
1982 {
1983 MINFO("Resolving address as IPv4 failed, trying IPv6");
1984 }
1985 }
1986
1987 if (try_ipv6)
1988 {
1989 results = resolver.resolve(
1990 boost::asio::ip::tcp::v6(), adr, port, boost::asio::ip::tcp::resolver::canonical_name, resolve_error
1991 );
1992
1993 if(results.empty())
1994 {
1995 _erro("Failed to resolve " << adr);
1996 return false;
1997 }
1998 }
1999
2000 boost::asio::ip::tcp::endpoint remote_endpoint(*results.begin());
2001
2002 sock_.open(remote_endpoint.protocol());
2003 if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" )
2004 {
2005 boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::make_address(bind_ip.c_str()), 0);
2006 boost::system::error_code ec;
2007 sock_.bind(local_endpoint, ec);
2008 if (ec)
2009 {
2010 MERROR("Error binding to " << bind_ip << ": " << ec.message());
2011 if (sock_.is_open())
2012 sock_.close();
2013 return false;
2014 }
2015 }
2016
2017 boost::shared_ptr<boost::asio::deadline_timer> sh_deadline(new boost::asio::deadline_timer(io_context_));
2018 //start deadline
2019 sh_deadline->expires_from_now(boost::posix_time::milliseconds(conn_timeout));
2020 sh_deadline->async_wait([=](const boost::system::error_code& error)
2021 {
2022 if(error != boost::asio::error::operation_aborted)
2023 {
2024 _dbg3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")");
2025 new_connection_l->socket().close();
2026 }
2027 });
2028 //start async connect
2029 sock_.async_connect(remote_endpoint, [=](const boost::system::error_code& ec_)
2030 {
2031 t_connection_context conn_context = AUTO_VAL_INIT(conn_context);
2032 boost::system::error_code ignored_ec;
2033 boost::asio::ip::tcp::socket::endpoint_type lep = new_connection_l->socket().local_endpoint(ignored_ec);
2034 if(!ec_)
2035 {//success
2036 if(!sh_deadline->cancel())
2037 {
2038 cb(conn_context, boost::asio::error::operation_aborted);//this mean that deadline timer already queued callback with cancel operation, rare situation
2039 }else
2040 {
2041 _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port <<
2042 " from " << lep.address().to_string() << ':' << lep.port());
2043
2044 // start adds the connection to the config object's list, so we don't need to have it locally anymore
2045 connections_mutex.lock();
2046 connections_.erase(new_connection_l);
2047 connections_mutex.unlock();
2048 bool r = new_connection_l->start(false, 1 < m_threads_count);
2049 if (r)
2050 {
2051 new_connection_l->get_context(conn_context);
2052 cb(conn_context, ec_);
2053 }
2054 else
2055 {
2056 _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection to " << adr << ':' << port);
2057 cb(conn_context, boost::asio::error::fault);
2058 }
2059 }
2060 }else
2061 {
2062 _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port <<
2063 " from " << lep.address().to_string() << ':' << lep.port() << ": " << ec_.message() << ':' << ec_.value());
2064 cb(conn_context, ec_);
2065 }
2066 });
2067 return true;
2068 CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect_async", false);
2069 }
2070
2071} // namespace
2072} // namespace
#define ABSTRACT_SERVER_SEND_QUE_MAX_COUNT
Definition abstract_tcp_server2.h:67
#define NEW_CONNECTION_TIMEOUT_LOCAL
Definition abstract_tcp_server2.inl:60
#define NEW_CONNECTION_TIMEOUT_REMOTE
Definition abstract_tcp_server2.inl:61
#define TIMEOUT_EXTRA_MS_PER_BYTE
Definition abstract_tcp_server2.inl:64
#define AGGRESSIVE_TIMEOUT_THRESHOLD
Definition abstract_tcp_server2.inl:59
#define DEFAULT_TIMEOUT_MS_LOCAL
Definition abstract_tcp_server2.inl:62
#define DEFAULT_TIMEOUT_MS_REMOTE
Definition abstract_tcp_server2.inl:63
static void close()
Definition blockchain_blackball.cpp:279
Definition byte_slice.h:69
boost::asio::ip::tcp::acceptor acceptor_ipv6
Definition abstract_tcp_server2.h:522
boost::shared_ptr< connection< t_protocol_handler > > connection_ptr
Definition abstract_tcp_server2.h:356
uint32_t m_port
Definition abstract_tcp_server2.h:526
size_t m_threads_count
Definition abstract_tcp_server2.h:533
void set_connection_limit(i_connection_limit *plimit)
Definition abstract_tcp_server2.inl:1476
void handle_accept_ipv6(const boost::system::error_code &e)
Definition abstract_tcp_server2.inl:1608
bool m_require_ipv4
Definition abstract_tcp_server2.h:531
std::map< std::string, t_connection_type > server_type_map
Definition abstract_tcp_server2.h:365
std::vector< boost::shared_ptr< boost::thread > > m_threads
Definition abstract_tcp_server2.h:534
boost::asio::ip::tcp::acceptor acceptor_
Acceptor used to listen for incoming connections.
Definition abstract_tcp_server2.h:521
try_connect_result_t
Definition abstract_tcp_server2.h:349
@ CONNECT_FAILURE
Definition abstract_tcp_server2.h:351
@ CONNECT_NO_SSL
Definition abstract_tcp_server2.h:352
@ CONNECT_SUCCESS
Definition abstract_tcp_server2.h:350
std::atomic< bool > m_stop_signal_sent
Definition abstract_tcp_server2.h:525
~boosted_tcp_server()
Definition abstract_tcp_server2.inl:1285
bool connect(const std::string &adr, const std::string &port, uint32_t conn_timeot, t_connection_context &cn, const std::string &bind_ip="0.0.0.0", epee::net_utils::ssl_support_t ssl_support=epee::net_utils::ssl_support_t::e_ssl_support_autodetect)
Definition abstract_tcp_server2.inl:1808
bool timed_wait_server_stop(uint64_t wait_mseconds)
wait for service workers stop
Definition abstract_tcp_server2.inl:1561
connection_ptr new_connection_
The next connection to be accepted.
Definition abstract_tcp_server2.h:542
boost::asio::io_context & get_io_context()
Definition abstract_tcp_server2.h:437
t_protocol_handler::connection_context t_connection_context
Definition abstract_tcp_server2.h:357
boosted_tcp_server(t_connection_type connection_type)
Definition abstract_tcp_server2.inl:1248
void send_stop_signal(std::function< void()> close_all_connections=[](){})
Stop the server.
Definition abstract_tcp_server2.inl:1578
try_connect_result_t try_connect(connection_ptr new_connection_l, const std::string &adr, const std::string &port, boost::asio::ip::tcp::socket &sock_, const boost::asio::ip::tcp::endpoint &remote_endpoint, const std::string &bind_ip, uint32_t conn_timeout, epee::net_utils::ssl_support_t ssl_support)
Definition abstract_tcp_server2.inl:1710
boost::mutex connections_mutex
Definition abstract_tcp_server2.h:546
void handle_accept_ipv4(const boost::system::error_code &e)
Handle completion of an asynchronous accept operation.
Definition abstract_tcp_server2.inl:1602
connection_ptr new_connection_ipv6
Definition abstract_tcp_server2.h:543
bool worker_thread()
Run the server's io_context loop.
Definition abstract_tcp_server2.inl:1429
void create_server_type_map()
Definition abstract_tcp_server2.inl:1292
bool add_connection(t_connection_context &out, boost::asio::ip::tcp::socket &&sock, network_address real_remote, epee::net_utils::ssl_support_t ssl_support=epee::net_utils::ssl_support_t::e_ssl_support_autodetect)
Definition abstract_tcp_server2.inl:1690
std::string m_address_ipv6
Definition abstract_tcp_server2.h:529
critical_section m_threads_lock
Definition abstract_tcp_server2.h:536
void set_connection_filter(i_connection_filter *pfilter)
Definition abstract_tcp_server2.inl:1469
t_connection_type m_connection_type
Definition abstract_tcp_server2.h:539
std::string m_address
Definition abstract_tcp_server2.h:528
std::string m_thread_name_prefix
Definition abstract_tcp_server2.h:532
epee::net_utils::network_address default_remote
Definition abstract_tcp_server2.h:523
bool run_server(size_t threads_count, bool wait=true, const boost::thread::attributes &attrs=boost::thread::attributes())
Run the server's io_context loop.
Definition abstract_tcp_server2.inl:1490
void handle_accept(const boost::system::error_code &e, bool ipv6=false)
Definition abstract_tcp_server2.inl:1614
bool connect_async(const std::string &adr, const std::string &port, uint32_t conn_timeot, const t_callback &cb, const std::string &bind_ip="0.0.0.0", epee::net_utils::ssl_support_t ssl_support=epee::net_utils::ssl_support_t::e_ssl_support_autodetect, t_connection_context &&initial=t_connection_context{})
void set_threads_prefix(const std::string &prefix_name)
Definition abstract_tcp_server2.inl:1459
bool init_server(uint32_t port, const std::string &address="0.0.0.0", uint32_t port_ipv6=0, const std::string &address_ipv6="::", bool use_ipv6=false, bool require_ipv4=true, ssl_options_t ssl_options=ssl_support_t::e_ssl_support_autodetect)
Definition abstract_tcp_server2.inl:1300
boost::asio::io_context & io_context_
Definition abstract_tcp_server2.h:518
std::unique_ptr< worker > m_io_context_local_instance
Definition abstract_tcp_server2.h:517
uint32_t m_port_ipv6
Definition abstract_tcp_server2.h:527
void set_response_soft_limit(std::size_t limit)
Definition abstract_tcp_server2.inl:1483
bool m_use_ipv6
Definition abstract_tcp_server2.h:530
boost::thread::id m_main_thread_id
Definition abstract_tcp_server2.h:535
std::set< connection_ptr > connections_
Definition abstract_tcp_server2.h:547
bool is_thread_worker()
Definition abstract_tcp_server2.inl:1545
const std::shared_ptr< typename connection< epee::net_utils::http::http_custom_handler< epee::net_utils::connection_context_base > >::shared_state > m_state
Definition abstract_tcp_server2.h:505
std::atomic< uint32_t > m_thread_index
Definition abstract_tcp_server2.h:537
std::atomic< long > sock_count
Definition connection_basic.hpp:67
boost::asio::ssl::stream< boost::asio::ip::tcp::socket > socket_
Socket for the connection.
Definition connection_basic.hpp:117
static int get_tos_flag()
Definition connection_basic.cpp:226
boost::asio::io_context::strand strand_
Strand to ensure the connection's handlers are not called concurrently.
Definition connection_basic.hpp:115
void logger_handle_net_read(size_t size)
Definition connection_basic.cpp:270
boost::asio::ip::tcp::socket & socket()
Definition connection_basic.hpp:130
ssl_support_t m_ssl_support
Definition connection_basic.hpp:118
connection_basic_shared_state & get_state() noexcept
Definition connection_basic.hpp:128
volatile bool m_is_multithreaded
Definition connection_basic.hpp:113
connection_basic(boost::asio::io_context &context, boost::asio::ip::tcp::socket &&sock, std::shared_ptr< connection_basic_shared_state > state, ssl_support_t ssl_support)
Definition connection_basic.cpp:124
Represents a single connection from a client.
Definition abstract_tcp_server2.h:100
void setRpcStation()
Definition abstract_tcp_server2.inl:1241
virtual bool call_run_once_service_io()
Definition abstract_tcp_server2.inl:1174
boost::asio::ip::tcp::socket socket_t
Definition abstract_tcp_server2.h:114
io_context_t & m_io_context
Definition abstract_tcp_server2.h:262
void terminate()
Definition abstract_tcp_server2.inl:748
virtual bool send_done()
Definition abstract_tcp_server2.inl:1132
void start_timer(duration_t duration, bool add={})
Definition abstract_tcp_server2.inl:145
t_protocol_handler::connection_context t_connection_context
Definition abstract_tcp_server2.h:102
void cancel_handler()
Definition abstract_tcp_server2.inl:672
boost::asio::io_context io_context_t
Definition abstract_tcp_server2.h:112
void async_wait_timer()
Definition abstract_tcp_server2.inl:169
bool m_local
Definition abstract_tcp_server2.h:268
void start_write()
Definition abstract_tcp_server2.inl:473
t_protocol_handler m_handler
Definition abstract_tcp_server2.h:271
void start_read()
Definition abstract_tcp_server2.inl:306
void interrupt()
Definition abstract_tcp_server2.inl:689
connection(io_context_t &io_context, std::shared_ptr< shared_state > state, t_connection_type connection_type, epee::net_utils::ssl_support_t ssl_support, t_connection_context &&initial=t_connection_context{})
Construct a connection with the given io_context.
Definition abstract_tcp_server2.inl:1019
void on_terminating()
Definition abstract_tcp_server2.inl:763
virtual bool do_send(byte_slice message)
(see do_send from i_service_endpoint)
Definition abstract_tcp_server2.inl:1126
bool send(epee::byte_slice message)
Definition abstract_tcp_server2.inl:818
virtual bool release()
Definition abstract_tcp_server2.inl:1231
virtual bool request_callback()
Definition abstract_tcp_server2.inl:1188
bool speed_limit_is_enabled() const
tells us should we be sleeping here (e.g. do not sleep on RPC connections)
Definition abstract_tcp_server2.inl:1114
boost::shared_ptr< connection_t > connection_ptr
Definition abstract_tcp_server2.h:105
void on_interrupted()
Definition abstract_tcp_server2.inl:702
std::string m_host
Definition abstract_tcp_server2.h:269
void start_handshake()
Definition abstract_tcp_server2.inl:210
virtual io_context_t & get_io_context()
Definition abstract_tcp_server2.inl:1210
timers_t m_timers
Definition abstract_tcp_server2.h:266
bool start_internal(bool is_income, bool is_multithreaded, boost::optional< network_address > real_remote)
Definition abstract_tcp_server2.inl:916
@ RUNNING
Definition abstract_tcp_server2.h:154
@ INTERRUPTED
Definition abstract_tcp_server2.h:155
@ TERMINATING
Definition abstract_tcp_server2.h:156
@ WASTED
Definition abstract_tcp_server2.h:157
@ TERMINATED
Definition abstract_tcp_server2.h:153
void terminate_async()
Definition abstract_tcp_server2.inl:807
duration_t get_timeout_from_bytes_read(size_t bytes) const
Definition abstract_tcp_server2.inl:116
virtual ~connection() noexcept(false)
Definition abstract_tcp_server2.inl:1057
void cancel_timer()
Definition abstract_tcp_server2.inl:200
void handle_read(size_t bytes_transferred)
Definition abstract_tcp_server2.inl:414
void state_status_check()
Definition abstract_tcp_server2.inl:126
state_t m_state
Definition abstract_tcp_server2.h:270
strand_t m_strand
Definition abstract_tcp_server2.h:265
void start_shutdown()
Definition abstract_tcp_server2.inl:604
epee::net_utils::ssl_support_t ssl_support_t
Definition abstract_tcp_server2.h:106
void cancel_socket()
Definition abstract_tcp_server2.inl:646
t_connection_context m_conn_context
Definition abstract_tcp_server2.h:264
virtual bool close(const bool wait_for_shutdown)
Definition abstract_tcp_server2.inl:1138
void save_dbg_log()
Definition abstract_tcp_server2.inl:1089
virtual bool add_ref()
Definition abstract_tcp_server2.inl:1216
unsigned int host_count(int delta=0)
Definition abstract_tcp_server2.inl:82
bool start(bool is_income, bool is_multithreaded)
Start the first asynchronous operation for the connection.
Definition abstract_tcp_server2.inl:1070
timer_t::duration duration_t
Definition abstract_tcp_server2.h:108
bool cancel()
Definition abstract_tcp_server2.inl:1120
t_connection_type m_connection_type
Definition abstract_tcp_server2.h:263
connection_ptr self
Definition abstract_tcp_server2.h:267
duration_t get_default_timeout()
Definition abstract_tcp_server2.inl:99
boost::system::error_code ec_t
Definition abstract_tcp_server2.h:109
Definition net_utils_base.h:69
Definition net_utils_base.h:172
Definition net_utils_base.h:225
static boost::mutex m_lock_get_global_throttle_in
Definition network_throttle.hpp:107
static i_network_throttle & get_global_throttle_in()
singleton ; for friend class ; caller MUST use proper locks! like m_lock_get_global_throttle_in
Definition network_throttle.cpp:76
static i_network_throttle & get_global_throttle_out()
ditto ; use lock ... use m_lock_get_global_throttle_out obviously
Definition network_throttle.cpp:89
static boost::mutex m_lock_get_global_throttle_out
Definition network_throttle.hpp:109
Definition net_ssl.h:77
const uint8_t seed[32]
Definition code-generator.cpp:37
bool success
Definition cold-transaction.cpp:57
std::unique_ptr< test_connection > conn(new test_connection(io_service, m_handler_config))
#define false
const char * res
Definition hmac_keccak.cpp:42
#define AUTO_VAL_INIT(v)
Definition misc_language.h:36
boost::shared_ptr< call_befor_die_base > auto_scope_leave_caller
Definition misc_language.h:80
bool sleep_no_w(long ms)
Definition misc_language.cpp:35
auto_scope_leave_caller create_scope_leave_handler(t_scope_leave_handler f)
Definition misc_language.h:97
T & check_and_get(std::shared_ptr< T > &ptr)
Definition abstract_tcp_server2.inl:72
@ ipv6
Definition enums.h:44
@ invalid
Definition enums.h:42
std::string to_string(t_connection_type type)
Definition connection_basic.cpp:70
bool is_ssl(const unsigned char *data, size_t len)
Definition net_ssl.cpp:424
constexpr size_t get_ssl_magic_size()
Definition net_ssl.h:150
t_connection_type
Definition connection_basic.hpp:93
@ e_connection_type_NET
Definition connection_basic.hpp:94
@ e_connection_type_RPC
Definition connection_basic.hpp:95
@ e_connection_type_P2P
Definition connection_basic.hpp:96
ssl_support_t
Definition net_ssl.h:49
@ e_ssl_support_disabled
Definition net_ssl.h:50
@ e_ssl_support_autodetect
Definition net_ssl.h:52
@ e_ssl_support_enabled
Definition net_ssl.h:51
PUSH_WARNINGS bool get_xtype_from_string(OUT XType &val, const std::string &str_id)
Definition string_tools_lexical.h:45
TODO: (mj-xmr) This will be reduced in an another PR.
Definition byte_slice.h:40
int time
Definition gen_wide_data.py:40
Definition speed.py:1
Definition enums.h:68
if(!cryptonote::get_account_address_from_str_or_url(info, cryptonote::TESTNET, "9uVsvEryzpN8WH2t1WWhFFCG5tS8cBNdmJYNRuckLENFimfauV5pZKeS1P2CbxGkSDTUPHXWwiYE5ZGSXDAGbaZgDxobqDN"))
Definition signature.cpp:53
static cryptonote::account_public_address address
Definition signature.cpp:38
unsigned int uint32_t
Definition stdint.h:126
unsigned __int64 uint64_t
Definition stdint.h:136
The io_context used to perform asynchronous operations.
Definition abstract_tcp_server2.h:509
Definition abstract_tcp_server2.h:274
std::size_t response_soft_limit
Definition abstract_tcp_server2.h:286
Definition abstract_tcp_server2.h:76
Definition abstract_tcp_server2.h:83
Definition blake256.h:36
#define CRITICAL_REGION_LOCAL(x)
Definition syncobj.h:153
#define CRITICAL_REGION_END()
Definition syncobj.h:158
#define CRITICAL_REGION_BEGIN(x)
Definition syncobj.h:154
#define T(x)