Nexus HTTP/3
A QUIC and HTTP/3 library
operation.hpp
1 #pragma once
2 
3 #include <condition_variable>
4 #include <memory>
5 #include <mutex>
6 #include <optional>
7 #include <sys/uio.h>
8 #include <boost/asio/associated_executor.hpp>
9 #include <nexus/error_code.hpp>
10 #include <nexus/h3/fields.hpp>
11 #include <nexus/quic/detail/handler_ptr.hpp>
12 
13 namespace nexus::quic::detail {
14 
15 struct stream_impl;
16 
17 enum class completion_type { post, defer, dispatch, destroy };
18 
20 template <typename ...Args>
21 struct operation {
22  using operation_type = operation<Args...>;
23  using tuple_type = std::tuple<Args...>;
24 
29  using complete_fn = void (*)(completion_type, operation_type*, tuple_type&&);
30  complete_fn complete_;
31 
32  explicit operation(complete_fn complete) noexcept
33  : complete_(complete) {}
34 
35  template <typename ...UArgs>
36  void post(UArgs&& ...args) {
37  complete_(completion_type::post, this,
38  tuple_type{std::forward<UArgs>(args)...});
39  }
40  template <typename ...UArgs>
41  void defer(UArgs&& ...args) {
42  complete_(completion_type::defer, this,
43  tuple_type{std::forward<UArgs>(args)...});
44  }
45  template <typename ...UArgs>
46  void dispatch(UArgs&& ...args) {
47  complete_(completion_type::dispatch, this,
48  tuple_type{std::forward<UArgs>(args)...});
49  }
50  template <typename ...UArgs>
51  void destroy(UArgs&& ...args) { // the need for args here is unfortunate
52  complete_(completion_type::destroy, this,
53  tuple_type{std::forward<UArgs>(args)...});
54  }
55 };
56 
61 template <typename Operation>
62 struct sync_operation : Operation {
63  std::mutex mutex;
64  std::condition_variable cond;
65  using operation_type = typename Operation::operation_type;
66  using tuple_type = typename Operation::tuple_type;
67  std::optional<tuple_type> result;
68 
69  template <typename ...Args>
70  explicit sync_operation(Args&& ...args)
71  : Operation(do_complete, std::forward<Args>(args)...) {}
72 
73  static void do_complete(completion_type type, operation_type* op,
74  tuple_type&& result) {
75  auto self = static_cast<sync_operation*>(op);
76  if (type != completion_type::destroy) {
77  auto lock = std::scoped_lock{self->mutex};
78  self->result = std::move(result);
79  self->cond.notify_one();
80  }
81  }
82  void wait() {
83  auto lock = std::unique_lock{mutex};
84  cond.wait(lock, [this] { return result.has_value(); });
85  }
86 };
87 
92 template <typename Operation, typename Handler, typename IoExecutor>
93 struct async_operation : Operation {
94  using operation_type = typename Operation::operation_type;
95  using tuple_type = typename Operation::tuple_type;
96 
97  using Executor = boost::asio::associated_executor_t<Handler, IoExecutor>;
99  using Work = typename boost::asio::prefer_result<Executor,
100  boost::asio::execution::outstanding_work_t::tracked_t>::type;
102  using IoWork = typename boost::asio::prefer_result<IoExecutor,
103  boost::asio::execution::outstanding_work_t::tracked_t>::type;
104  Handler handler;
105  std::pair<Work, IoWork> ex;
106 
109  template <typename ...Args>
110  async_operation(Handler&& handler, const IoExecutor& io_ex, Args&& ...args)
111  : Operation(do_complete, std::forward<Args>(args)...),
112  handler(std::move(handler)),
113  ex(boost::asio::prefer(get_associated_executor(this->handler, io_ex),
114  boost::asio::execution::outstanding_work.tracked),
115  boost::asio::prefer(io_ex,
116  boost::asio::execution::outstanding_work.tracked))
117  {}
118 
119  static void do_complete(completion_type type, operation_type* op,
120  tuple_type&& args) {
121  auto self = static_cast<async_operation*>(op);
122  auto p = handler_ptr<async_operation, Handler>{self, &self->handler}; // take ownership
123  // we're destroying 'self' here, so move the handler and executors out
124  auto handler = std::move(self->handler); // may throw
125  p.get_deleter().handler = &handler; // update deleter
126 
127  auto alloc = boost::asio::get_associated_allocator(handler);
128  // move args into the lambda we'll submit for execution. do this before
129  // deleting 'self' in case any of these args reference that memory
130  auto f = [handler=std::move(handler), args=std::move(args)] () mutable {
131  std::apply(std::move(handler), std::move(args));
132  }; // may throw
133 
134  // save the associated executor for f's submission
135  auto ex = std::move(self->ex.first);
136  // the io executor's work in self->ex.second can be destroyed with 'self'
137  p.reset(); // delete 'self'
138 
139  switch (type) {
140  case completion_type::post:
141  boost::asio::execution::execute(
142  boost::asio::require(
143  boost::asio::prefer(ex,
144  boost::asio::execution::relationship.fork,
145  boost::asio::execution::allocator(alloc)),
146  boost::asio::execution::blocking.never),
147  std::move(f));
148  break;
149  case completion_type::defer:
150  boost::asio::execution::execute(
151  boost::asio::require(
152  boost::asio::prefer(ex,
153  boost::asio::execution::relationship.continuation,
154  boost::asio::execution::allocator(alloc)),
155  boost::asio::execution::blocking.never),
156  std::move(f));
157  break;
158  case completion_type::dispatch:
159  boost::asio::execution::execute(
160  boost::asio::prefer(ex,
161  boost::asio::execution::blocking.possibly,
162  boost::asio::execution::allocator(alloc)),
163  std::move(f));
164  break;
165  case completion_type::destroy: // handled above
166  break;
167  }
168  }
169 };
170 
171 
172 // connection accept
173 struct accept_operation : operation<error_code> {
174  explicit accept_operation(complete_fn complete) noexcept
175  : operation(complete) {}
176 };
177 using accept_sync = sync_operation<accept_operation>;
178 
179 template <typename Handler, typename IoExecutor>
180 using accept_async = async_operation<accept_operation, Handler, IoExecutor>;
181 
182 
183 // stream connection
184 struct stream_connect_operation : operation<error_code> {
185  stream_impl& stream;
186 
187  explicit stream_connect_operation(complete_fn complete,
188  stream_impl& stream) noexcept
189  : operation(complete), stream(stream)
190  {}
191 };
192 using stream_connect_sync = sync_operation<stream_connect_operation>;
193 
194 template <typename Handler, typename IoExecutor>
195 using stream_connect_async = async_operation<
196  stream_connect_operation, Handler, IoExecutor>;
197 
198 
199 // stream accept
200 struct stream_accept_operation : operation<error_code> {
201  stream_impl& stream;
202 
203  explicit stream_accept_operation(complete_fn complete,
204  stream_impl& stream) noexcept
205  : operation(complete), stream(stream)
206  {}
207 };
208 using stream_accept_sync = sync_operation<stream_accept_operation>;
209 
210 template <typename Handler, typename IoExecutor>
211 using stream_accept_async = async_operation<
212  stream_accept_operation, Handler, IoExecutor>;
213 
214 
215 // stream reads and writes
216 struct stream_data_operation : operation<error_code, size_t> {
217  static constexpr uint16_t max_iovs = 128;
218  iovec iovs[max_iovs];
219  uint16_t num_iovs = 0;
220  size_t bytes_transferred = 0;
221 
222  explicit stream_data_operation(complete_fn complete) noexcept
223  : operation(complete) {}
224 };
225 using stream_data_sync = sync_operation<stream_data_operation>;
226 
227 template <typename Handler, typename IoExecutor>
228 using stream_data_async = async_operation<
229  stream_data_operation, Handler, IoExecutor>;
230 
231 
232 // stream header reads
233 struct stream_header_read_operation : operation<error_code> {
234  h3::fields& fields;
235 
236  stream_header_read_operation(complete_fn complete,
237  h3::fields& fields) noexcept
238  : operation(complete), fields(fields)
239  {}
240 };
241 using stream_header_read_sync = sync_operation<stream_header_read_operation>;
242 
243 template <typename Handler, typename IoExecutor>
244 using stream_header_read_async = async_operation<
245  stream_header_read_operation, Handler, IoExecutor>;
246 
247 
248 // stream header writes
249 struct stream_header_write_operation : operation<error_code> {
250  const h3::fields& fields;
251 
252  stream_header_write_operation(complete_fn complete,
253  const h3::fields& fields) noexcept
254  : operation(complete), fields(fields)
255  {}
256 };
257 
258 using stream_header_write_sync = sync_operation<stream_header_write_operation>;
259 
260 template <typename Handler, typename IoExecutor>
261 using stream_header_write_async = async_operation<
262  stream_header_write_operation, Handler, IoExecutor>;
263 
264 
265 // stream close
266 struct stream_close_operation : operation<error_code> {
267  explicit stream_close_operation(complete_fn complete) noexcept
268  : operation(complete) {}
269 };
270 
271 using stream_close_sync = sync_operation<stream_close_operation>;
272 
273 template <typename Handler, typename IoExecutor>
274 using stream_close_async = async_operation<
275  stream_close_operation, Handler, IoExecutor>;
276 
277 } // namespace nexus::quic::detail