Nexus HTTP/3
A QUIC and HTTP/3 library
stream_impl.hpp
1 #pragma once
2 
3 #include <boost/asio/any_io_executor.hpp>
4 #include <boost/asio/buffers_iterator.hpp>
5 
6 #include <nexus/quic/detail/operation.hpp>
7 #include <nexus/quic/detail/service.hpp>
8 #include <nexus/quic/detail/stream_state.hpp>
9 #include <nexus/quic/error.hpp>
10 #include <nexus/h3/fields.hpp>
11 
12 struct lsquic_stream;
13 
14 namespace nexus {
15 namespace quic::detail {
16 
17 struct connection_impl;
18 struct engine_impl;
19 
20 struct stream_impl : public boost::intrusive::list_base_hook<>,
21  public service_list_base_hook {
22  using executor_type = boost::asio::any_io_executor;
23  engine_impl& engine;
24  service<stream_impl>& svc;
25  connection_impl& conn;
26  stream_state::variant state;
27 
28  template <typename BufferSequence>
29  static void init_op(const BufferSequence& buffers,
30  stream_data_operation& op) {
31  const auto end = boost::asio::buffer_sequence_end(buffers);
32  for (auto i = boost::asio::buffer_sequence_begin(buffers);
33  i != end && op.num_iovs < op.max_iovs;
34  ++i, ++op.num_iovs) {
35  op.iovs[op.num_iovs].iov_base = const_cast<void*>(i->data());
36  op.iovs[op.num_iovs].iov_len = i->size();
37  }
38  }
39 
40  explicit stream_impl(connection_impl& conn);
41  ~stream_impl();
42 
43  void service_shutdown();
44 
45  executor_type get_executor() const;
46 
47  bool is_open() const;
48  stream_id id(error_code& ec) const;
49 
50  void read_headers(stream_header_read_operation& op);
51 
52  template <typename CompletionToken>
53  decltype(auto) async_read_headers(h3::fields& fields,
54  CompletionToken&& token) {
55  return boost::asio::async_initiate<CompletionToken, void(error_code)>(
56  [this, &fields] (auto h) {
57  using Handler = std::decay_t<decltype(h)>;
58  using op_type = stream_header_read_async<Handler, executor_type>;
59  auto p = handler_allocate<op_type>(h, std::move(h),
60  get_executor(), fields);
61  auto op = handler_ptr<op_type, Handler>{p, &p->handler};
62  read_headers(*op);
63  op.release(); // release ownership
64  }, token);
65  }
66 
67  void read_some(stream_data_operation& op);
68  void on_read();
69 
70  template <typename MutableBufferSequence, typename CompletionToken>
71  decltype(auto) async_read_some(const MutableBufferSequence& buffers,
72  CompletionToken&& token) {
73  return boost::asio::async_initiate<CompletionToken, void(error_code, size_t)>(
74  [this, &buffers] (auto h) {
75  using Handler = std::decay_t<decltype(h)>;
76  using op_type = stream_data_async<Handler, executor_type>;
77  auto p = handler_allocate<op_type>(h, std::move(h), get_executor());
78  auto op = handler_ptr<op_type, Handler>{p, &p->handler};
79  init_op(buffers, *op);
80  read_some(*op);
81  op.release(); // release ownership
82  }, token);
83  }
84 
85  template <typename MutableBufferSequence>
86  std::enable_if_t<boost::asio::is_mutable_buffer_sequence<
87  MutableBufferSequence>::value, size_t>
88  read_some(const MutableBufferSequence& buffers, error_code& ec) {
89  stream_data_sync op;
90  init_op(buffers, op);
91  read_some(op);
92  op.wait();
93  ec = std::get<0>(*op.result);
94  return std::get<1>(*op.result);
95  }
96 
97  void write_headers(stream_header_write_operation& op);
98 
99  template <typename CompletionToken>
100  decltype(auto) async_write_headers(const h3::fields& fields,
101  CompletionToken&& token) {
102  return boost::asio::async_initiate<CompletionToken, void(error_code)>(
103  [this, &fields] (auto h) {
104  using Handler = std::decay_t<decltype(h)>;
105  using op_type = stream_header_write_async<Handler, executor_type>;
106  auto p = handler_allocate<op_type>(h, std::move(h),
107  get_executor(), fields);
108  auto op = handler_ptr<op_type, Handler>{p, &p->handler};
109  write_headers(*op);
110  op.release(); // release ownership
111  }, token);
112  }
113 
114  void write_some(stream_data_operation& op);
115  void on_write();
116 
117  template <typename ConstBufferSequence, typename CompletionToken>
118  decltype(auto) async_write_some(const ConstBufferSequence& buffers,
119  CompletionToken&& token) {
120  return boost::asio::async_initiate<CompletionToken, void(error_code, size_t)>(
121  [this, &buffers] (auto h) {
122  using Handler = std::decay_t<decltype(h)>;
123  using op_type = stream_data_async<Handler, executor_type>;
124  auto p = handler_allocate<op_type>(h, std::move(h), get_executor());
125  auto op = handler_ptr<op_type, Handler>{p, &p->handler};
126  init_op(buffers, *op);
127  write_some(*op);
128  op.release(); // release ownership
129  }, token);
130  }
131 
132  template <typename ConstBufferSequence>
133  std::enable_if_t<boost::asio::is_const_buffer_sequence<
134  ConstBufferSequence>::value, size_t>
135  write_some(const ConstBufferSequence& buffers, error_code& ec) {
136  stream_data_sync op;
137  init_op(buffers, op);
138  write_some(op);
139  op.wait();
140  ec = std::get<0>(*op.result);
141  return std::get<1>(*op.result);
142  }
143 
144  void flush(error_code& ec);
145  void shutdown(int how, error_code& ec);
146 
147  void close(stream_close_operation& op);
148  void on_close();
149 
150  template <typename CompletionToken>
151  decltype(auto) async_close(CompletionToken&& token) {
152  return boost::asio::async_initiate<CompletionToken, void(error_code)>(
153  [this] (auto h) {
154  using Handler = std::decay_t<decltype(h)>;
155  using op_type = stream_close_async<Handler, executor_type>;
156  auto p = handler_allocate<op_type>(h, std::move(h), get_executor());
157  auto op = handler_ptr<op_type, Handler>{p, &p->handler};
158  close(*op);
159  op.release(); // release ownership
160  }, token);
161  }
162 
163  void reset();
164 };
165 
166 } // namespace quic::detail
167 } // namespace nexus
uint64_t stream_id
stream identifier that is unique to a connection
Definition: stream_id.hpp:8
networking
Definition: error_code.hpp:8