1#ifndef UVW_STREAM_INCLUDE_H
2#define UVW_STREAM_INCLUDE_H
34 explicit data_event(std::unique_ptr<
char[]> buf, std::size_t len)
noexcept;
36 std::unique_ptr<char[]>
data;
42class connect_req final:
public request<connect_req, uv_connect_t, connect_event> {
43 static void connect_callback(uv_connect_t *req,
int status);
46 using request::request;
48 template<
typename F,
typename... Args>
49 auto connect(F &&f, Args &&...args) -> std::enable_if_t<std::is_same_v<decltype(std::forward<F>(f)(
raw(), std::forward<Args>(args)..., &connect_callback)),
void>,
int> {
50 std::forward<F>(f)(
raw(), std::forward<Args>(args)..., &connect_callback);
51 return this->leak_if(0);
54 template<
typename F,
typename... Args>
55 auto connect(F &&f, Args &&...args) -> std::enable_if_t<!std::is_same_v<decltype(std::forward<F>(f)(
raw(), std::forward<Args>(args)..., &connect_callback)),
void>,
int> {
56 return this->leak_if(std::forward<F>(f)(
raw(), std::forward<Args>(args)..., &connect_callback));
60class shutdown_req final:
public request<shutdown_req, uv_shutdown_t, shutdown_event> {
61 static void shoutdown_callback(uv_shutdown_t *req,
int status);
64 using request::request;
66 int shutdown(uv_stream_t *hndl);
69template<
typename Deleter>
70class write_req final:
public request<write_req<Deleter>, uv_write_t, write_event> {
71 static void write_callback(uv_write_t *req,
int status) {
72 if(
auto ptr = request<write_req<Deleter>, uv_write_t, write_event>::reserve(req); status) {
73 ptr->publish(error_event{status});
75 ptr->publish(write_event{});
80 write_req(loop::token token, std::shared_ptr<loop>
parent, std::unique_ptr<
char[], Deleter> dt,
unsigned int len)
81 : request<write_req<Deleter>, uv_write_t, write_event>{token, std::move(
parent)},
83 buf{uv_buf_init(data.get(), len)} {}
85 int write(uv_stream_t *hndl) {
86 return this->leak_if(uv_write(this->
raw(), hndl, &buf, 1, &write_callback));
89 int write(uv_stream_t *hndl, uv_stream_t *send) {
90 return this->leak_if(uv_write2(this->
raw(), hndl, &buf, 1, send, &write_callback));
94 std::unique_ptr<char[], Deleter> data;
107template<
typename T,
typename U,
typename... E>
108class stream_handle:
public handle<T, U, listen_event, end_event, connect_event, shutdown_event, data_event, write_event, E...> {
111 template<
typename,
typename,
typename...>
112 friend class stream_handle;
114 static constexpr unsigned int DEFAULT_BACKLOG = 128;
116 static void read_callback(uv_stream_t *hndl, ssize_t nread,
const uv_buf_t *buf) {
117 T &ref = *(
static_cast<T *
>(hndl->data));
119 std::unique_ptr<char[]>
data{buf->base};
125 if(nread == UV_EOF) {
128 }
else if(nread > 0) {
130 ref.publish(
data_event{std::move(
data),
static_cast<std::size_t
>(nread)});
131 }
else if(nread < 0) {
137 static void listen_callback(uv_stream_t *hndl,
int status) {
138 if(T &ref = *(
static_cast<T *
>(hndl->data)); status) {
145 uv_stream_t *as_uv_stream() {
146 return reinterpret_cast<uv_stream_t *
>(this->
raw());
149 const uv_stream_t *as_uv_stream()
const {
150 return reinterpret_cast<const uv_stream_t *
>(this->
raw());
155 stream_handle(loop::token token, std::shared_ptr<loop> ref)
156 : base{token, std::move(ref)} {}
171 auto listener = [ptr = this->shared_from_this()](
const auto &event,
const auto &) {
179 return shutdown->shutdown(as_uv_stream());
193 int listen(
int backlog = DEFAULT_BACKLOG) {
194 return uv_listen(as_uv_stream(), backlog, &listen_callback);
218 return uv_accept(as_uv_stream(), ref.as_uv_stream());
231 return uv_read_start(as_uv_stream(), &details::common_alloc_callback, &read_callback);
242 return uv_read_start(as_uv_stream(), &details::common_alloc_callback<T, Alloc>, &read_callback);
253 return uv_read_stop(as_uv_stream());
268 template<
typename Deleter>
269 int write(std::unique_ptr<
char[], Deleter>
data,
unsigned int len) {
271 auto listener = [ptr = this->shared_from_this()](
const auto &event,
const auto &) {
278 return req->write(as_uv_stream());
294 auto req = this->
parent().template
resource<details::write_req<void (*)(
char *)>>(std::unique_ptr<
char[],
void (*)(
char *)>{
data, [](
char *) {}}, len);
295 auto listener = [ptr = this->shared_from_this()](
const auto &event,
const auto &) {
302 return req->write(as_uv_stream());
324 template<
typename S,
typename Deleter>
325 int write(S &send, std::unique_ptr<
char[], Deleter>
data,
unsigned int len) {
327 auto listener = [ptr = this->shared_from_this()](
const auto &event,
const auto &) {
334 return req->write(as_uv_stream(), send.as_uv_stream());
358 auto req = this->
parent().template
resource<details::write_req<void (*)(
char *)>>(std::unique_ptr<
char[],
void (*)(
char *)>{
data, [](
char *) {}}, len);
359 auto listener = [ptr = this->shared_from_this()](
const auto &event,
const auto &) {
366 return req->write(as_uv_stream(), send.as_uv_stream());
380 uv_buf_t bufs[] = {uv_buf_init(
data.get(), len)};
381 return uv_try_write(as_uv_stream(), bufs, 1);
394 template<
typename V,
typename W>
395 int try_write(std::unique_ptr<
char[]>
data,
unsigned int len, stream_handle<V, W> &send) {
396 uv_buf_t bufs[] = {uv_buf_init(
data.get(), len)};
397 return uv_try_write2(as_uv_stream(), bufs, 1, send.
raw());
411 uv_buf_t bufs[] = {uv_buf_init(
data, len)};
412 return uv_try_write(as_uv_stream(), bufs, 1);
425 template<
typename V,
typename W>
427 uv_buf_t bufs[] = {uv_buf_init(
data, len)};
428 return uv_try_write2(as_uv_stream(), bufs, 1, send.
raw());
436 return (uv_is_readable(as_uv_stream()) == 1);
444 return (uv_is_writable(as_uv_stream()) == 1);
463 return (0 == uv_stream_set_blocking(as_uv_stream(), enable));
471 return uv_stream_get_write_queue_size(as_uv_stream());
478# include "stream.cpp"
void on(listener_t< Type > f)
Common class for almost all the resources available in uvw.
std::shared_ptr< R > data() const
int accept(S &ref)
Accepts incoming connections.
bool writable() const noexcept
Checks if the stream is writable.
int read()
Starts reading data from an incoming stream.
int read()
Starts reading data from an incoming stream.
int write(S &send, std::unique_ptr< char[], Deleter > data, unsigned int len)
Extended write function for sending handles over a pipe handle.
size_t write_queue_size() const noexcept
Gets the amount of queued bytes waiting to be sent.
bool readable() const noexcept
Checks if the stream is readable.
int try_write(char *data, unsigned int len)
Queues a write request if it can be completed immediately.
int try_write(char *data, unsigned int len, stream_handle< V, W > &send)
Queues a write request if it can be completed immediately.
int shutdown()
Shutdowns the outgoing (write) side of a duplex stream.
bool blocking(bool enable=false)
Enables or disables blocking mode for a stream.
int try_write(std::unique_ptr< char[]> data, unsigned int len, stream_handle< V, W > &send)
Queues a write request if it can be completed immediately.
int write(S &send, char *data, unsigned int len)
Extended write function for sending handles over a pipe handle.
int write(std::unique_ptr< char[], Deleter > data, unsigned int len)
Writes data to the stream.
int stop()
Stops reading data from the stream.
int listen(int backlog=DEFAULT_BACKLOG)
Starts listening for incoming connections.
int try_write(std::unique_ptr< char[]> data, unsigned int len)
Queues a write request if it can be completed immediately.
int write(char *data, unsigned int len)
Writes data to the stream.
std::unique_ptr< char[]> data
const uv_connect_t * raw() const noexcept
loop & parent() const noexcept