5#include "../type_traits.hpp"
7#include "../detail/action_queue.hpp"
8#include "../http/request.hpp"
9#include "../websocket/stream.hpp"
11#include <boost/asio/io_context.hpp>
12#include <boost/asio/post.hpp>
13#include <boost/beast/core/error.hpp>
14#include <fmt/format.h>
15#include <spdlog/spdlog.h>
34 template<
bool isClient>
36 public std::enable_shared_from_this<connection<isClient>>
62 m_logger->trace(
"destructor()");
73 std::shared_ptr<spdlog::logger>
105 std::shared_ptr<connection>
106 make(
const std::shared_ptr<spdlog::logger>
logger,
stream&& ws,
const std::string& agent_string)
112 return std::shared_ptr<connection>{me};
128 template<concepts::accept_handler Callback>
130 connect(
const boost::asio::ip::tcp::resolver::results_type& target,
const std::string& resource, Callback&& done)
133 m_logger->trace(
"connect()");
135 if (m_state != state::inactive)
136 throw std::logic_error{
"connect() called on already active websocket connection"};
139 m_ws.
get_lowest_layer([&, me = this->shared_from_this(),
this, done = std::forward<Callback>(done), resource](
auto& sock)
mutable {
140 sock.expires_after(std::chrono::seconds(30));
145 [
this, me, target, done = std::forward<Callback>(done), resource](
auto ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type ep)
mutable {
154 [
this, done = std::forward<Callback>(done)](
auto ec)
mutable {
156 std::invoke(std::forward<
decltype(done)>(done), ec);
175 template<
class Body,
class Fields, std::invocable<> Callback>
177 accept(
const boost::beast::http::request<Body, Fields>& req, Callback&& done)
180 m_logger->trace(
"accept()");
182 if (m_state != state::inactive)
183 throw std::logic_error{
"accept() called on already active websocket connection"};
186 m_state = state::handshaking;
191 m_ws.async_accept(req, [
this, me = this->shared_from_this(), done = std::forward<
decltype(done)>(done)](
malloy::error_code ec)
mutable {
192 m_logger->trace(
"on_accept()");
196 m_logger->error(
"on_accept(): {}", ec.message());
203 std::invoke(std::forward<
decltype(done)>(done));
217 disconnect(boost::beast::websocket::close_reason why = boost::beast::websocket::normal)
219 m_logger->trace(
"disconnect()");
221 if (m_state == state::closed || m_state == state::closing)
224 auto build_act = [
this, why, me = this->shared_from_this()](
const auto& on_done)
mutable {
226 if (m_state == state::closed || m_state == state::closing) {
231 do_disconnect(why, on_done);
235 m_write_queue.
push(build_act);
236 m_read_queue.
push(build_act);
247 force_disconnect(boost::beast::websocket::close_reason why = boost::beast::websocket::normal)
249 m_logger->trace(
"force_disconnect()");
251 if (m_state == state::inactive)
252 throw std::logic_error{
"force_disconnect() called on inactive websocket connection"};
254 else if (m_state == state::closed || m_state == state::closing)
257 do_disconnect(why, []{});
275 m_logger->trace(
"read()");
280 me = this->shared_from_this(),
282 done = std::forward<
decltype(done)>(done)
284 (
const auto& on_done)
mutable
286 assert(buff !=
nullptr);
287 m_ws.async_read(*buff, [
this, me, on_done, done = std::forward<
decltype(done)>(done)](
auto ec,
auto size)
mutable {
288 std::invoke(std::forward<
decltype(done)>(done), ec, size);
305 template<concepts::async_read_handler Callback>
309 m_logger->trace(
"send(). payload size: {}", payload.size());
311 m_write_queue.
push([buff = payload, done = std::forward<Callback>(done),
this, me = this->shared_from_this()](
const auto& on_done)
mutable {
312 m_ws.async_write(buff, [
this, me, on_done, done = std::forward<
decltype(done)>(done)](
auto ec,
auto size)
mutable {
314 std::invoke(std::forward<Callback>(done), ec, size);
321 enum class sending_state
327 enum sending_state m_sending_state = sending_state::idling;
328 std::shared_ptr<spdlog::logger> m_logger;
330 std::string m_agent_string;
331 act_queue_t m_write_queue;
332 act_queue_t m_read_queue;
333 std::atomic<state> m_state{ state::inactive };
336 std::shared_ptr<spdlog::logger>
logger, stream&& ws, std::string agent_str) :
337 m_logger(std::move(
logger)),
339 m_agent_string{std::move(agent_str)},
340 m_write_queue{boost::asio::make_strand(m_ws.get_executor())},
341 m_read_queue{boost::asio::make_strand(m_ws.get_executor())}
345 throw std::invalid_argument(
"no valid logger provided.");
351 m_logger->trace(
"go_active()");
354 m_state = state::active;
364 m_logger->trace(
"setup_connection()");
368 boost::beast::websocket::stream_base::timeout::suggested(
369 isClient ? boost::beast::role_type::client : boost::beast::role_type::server)
373 const auto agent_field = isClient ? malloy::http::field::user_agent : malloy::http::field::server;
375 boost::beast::websocket::stream_base::decorator(
376 [
this, agent_field](boost::beast::websocket::request_type& req) {
377 req.set(agent_field, m_agent_string);
384 do_disconnect(boost::beast::websocket::close_reason why,
const std::invocable<>
auto& on_done)
386 m_logger->trace(
"do_disconnect()");
389 m_state = state::closing;
391 m_ws.async_close(why, [me = this->shared_from_this(),
this, on_done](
auto ec) {
393 m_logger->error(
"could not close websocket: '{}'", ec.message());
403 boost::beast::error_code ec,
404 boost::asio::ip::tcp::resolver::results_type::endpoint_type ep,
405 const std::string& resource,
406 concepts::accept_handler
auto&& on_handshake)
408 m_logger->trace(
"on_connect()");
411 m_logger->error(
"on_connect(): {}", ec.message());
415 m_ws.get_lowest_layer([](
auto& s) { s.expires_never(); });
420 const std::string host = fmt::format(
"{}:{}", ep.address().to_string(), ep.port());
422#if MALLOY_FEATURE_TLS
423 if constexpr (isClient) {
426 m_ws.async_handshake_tls(
427 boost::asio::ssl::stream_base::handshake_type::client,
428 [on_handshake = std::forward<
decltype(on_handshake)>(on_handshake), resource, host, me = this->shared_from_this()](
auto ec)
mutable
433 me->on_ready_for_handshake(host, resource, std::forward<
decltype(on_handshake)>(on_handshake));
440 on_ready_for_handshake(host, resource, std::forward<
decltype(on_handshake)>(on_handshake));
444 on_ready_for_handshake(
const std::string& host,
const std::string& resource, concepts::accept_handler
auto&& on_handshake)
446 m_logger->trace(
"on_ready_for_handshake()");
450 m_ws.get_lowest_layer([](
auto& s) { s.expires_never(); });
454 m_ws.async_handshake(
457 std::forward<
decltype(on_handshake)>(on_handshake)
462 on_write(
auto ec,
auto size)
464 m_logger->trace(
"on_write() wrote: '{}' bytes", size);
467 m_logger->error(
"on_write failed for websocket connection: '{}'", ec.message());
475 m_logger->trace(
"on_close()");
477 m_state = state::closed;
void push(act_t act)
Add an action to the queue.
Definition: action_queue.hpp:48
Definition: request.hpp:19
Represents a connection via the WebSocket protocol.
Definition: connection.hpp:37
static std::shared_ptr< connection > make(const std::shared_ptr< spdlog::logger > logger, stream &&ws, const std::string &agent_string)
Construct a new connection object.
Definition: connection.hpp:106
void send(const concepts::const_buffer_sequence auto &payload, Callback &&done)
Send the contents of a buffer to the client.
Definition: connection.hpp:307
state
Definition: connection.hpp:48
void force_disconnect(boost::beast::websocket::close_reason why=boost::beast::websocket::normal)
Same as disconnect, but bypasses all queues and runs immediately.
Definition: connection.hpp:247
void set_binary(const bool enabled)
Definition: connection.hpp:83
void read(concepts::dynamic_buffer auto &buff, concepts::async_read_handler auto &&done)
Read a complete message into a buffer.
Definition: connection.hpp:273
bool binary()
Definition: connection.hpp:93
std::shared_ptr< spdlog::logger > logger() const noexcept
Definition: connection.hpp:74
void accept(const boost::beast::http::request< Body, Fields > &req, Callback &&done)
Accept an incoming connection.
Definition: connection.hpp:177
void connect(const boost::asio::ip::tcp::resolver::results_type &target, const std::string &resource, Callback &&done)
Connect to a remote (websocket) endpoint.
Definition: connection.hpp:130
virtual ~connection() noexcept
Definition: connection.hpp:60
void disconnect(boost::beast::websocket::close_reason why=boost::beast::websocket::normal)
Disconnect/stop/close the connection.
Definition: connection.hpp:217
Websocket stream. May use TLS.
Definition: stream.hpp:50
bool binary() const
Checks whether outgoing messages will be indicated as text or binary.
Definition: stream.hpp:222
void get_lowest_layer(Func &&visitor)
Access get_lowest_layer of wrapped stream type.
Definition: stream.hpp:244
void set_binary(const bool enabled)
Controls whether outgoing message will be indicated text or binary.
Definition: stream.hpp:204
auto get_executor()
Get executor of the underlying stream.
Definition: stream.hpp:260
Definition: type_traits.hpp:44
Definition: type_traits.hpp:35
Definition: type_traits.hpp:41
Definition: connection.hpp:22
boost::beast::error_code error_code
Error code used to signify errors without throwing. Truthy means it holds an error.
Definition: error.hpp:9