3 #include <condition_variable>
8 #include <boost/asio/associated_executor.hpp>
9 #include <nexus/error_code.hpp>
10 #include <nexus/h3/fields.hpp>
11 #include <nexus/quic/detail/handler_ptr.hpp>
13 namespace nexus::quic::detail {
17 enum class completion_type { post, defer, dispatch, destroy };
20 template <
typename ...Args>
22 using operation_type = operation<Args...>;
23 using tuple_type = std::tuple<Args...>;
29 using complete_fn = void (*)(completion_type, operation_type*, tuple_type&&);
30 complete_fn complete_;
32 explicit operation(complete_fn complete) noexcept
33 : complete_(complete) {}
35 template <
typename ...UArgs>
36 void post(UArgs&& ...args) {
37 complete_(completion_type::post,
this,
38 tuple_type{std::forward<UArgs>(args)...});
40 template <
typename ...UArgs>
41 void defer(UArgs&& ...args) {
42 complete_(completion_type::defer,
this,
43 tuple_type{std::forward<UArgs>(args)...});
45 template <
typename ...UArgs>
46 void dispatch(UArgs&& ...args) {
47 complete_(completion_type::dispatch,
this,
48 tuple_type{std::forward<UArgs>(args)...});
50 template <
typename ...UArgs>
51 void destroy(UArgs&& ...args) {
52 complete_(completion_type::destroy,
this,
53 tuple_type{std::forward<UArgs>(args)...});
61 template <
typename Operation>
62 struct sync_operation : Operation {
64 std::condition_variable cond;
65 using operation_type =
typename Operation::operation_type;
66 using tuple_type =
typename Operation::tuple_type;
67 std::optional<tuple_type> result;
69 template <
typename ...Args>
70 explicit sync_operation(Args&& ...args)
71 : Operation(do_complete, std::forward<Args>(args)...) {}
73 static void do_complete(completion_type type, operation_type* op,
74 tuple_type&& result) {
75 auto self =
static_cast<sync_operation*
>(op);
76 if (type != completion_type::destroy) {
77 auto lock = std::scoped_lock{
self->mutex};
78 self->result = std::move(result);
79 self->cond.notify_one();
83 auto lock = std::unique_lock{mutex};
84 cond.wait(lock, [
this] {
return result.has_value(); });
92 template <
typename Operation,
typename Handler,
typename IoExecutor>
93 struct async_operation : Operation {
94 using operation_type =
typename Operation::operation_type;
95 using tuple_type =
typename Operation::tuple_type;
97 using Executor = boost::asio::associated_executor_t<Handler, IoExecutor>;
99 using Work =
typename boost::asio::prefer_result<Executor,
100 boost::asio::execution::outstanding_work_t::tracked_t>::type;
102 using IoWork =
typename boost::asio::prefer_result<IoExecutor,
103 boost::asio::execution::outstanding_work_t::tracked_t>::type;
105 std::pair<Work, IoWork> ex;
109 template <
typename ...Args>
110 async_operation(Handler&& handler,
const IoExecutor& io_ex, Args&& ...args)
111 : Operation(do_complete, std::forward<Args>(args)...),
112 handler(std::move(handler)),
113 ex(boost::asio::prefer(get_associated_executor(this->handler, io_ex),
114 boost::asio::execution::outstanding_work.tracked),
115 boost::asio::prefer(io_ex,
116 boost::asio::execution::outstanding_work.tracked))
119 static void do_complete(completion_type type, operation_type* op,
121 auto self =
static_cast<async_operation*
>(op);
122 auto p = handler_ptr<async_operation, Handler>{
self, &
self->handler};
124 auto handler = std::move(self->handler);
125 p.get_deleter().handler = &handler;
127 auto alloc = boost::asio::get_associated_allocator(handler);
130 auto f = [handler=std::move(handler), args=std::move(args)] ()
mutable {
131 std::apply(std::move(handler), std::move(args));
135 auto ex = std::move(self->ex.first);
140 case completion_type::post:
141 boost::asio::execution::execute(
142 boost::asio::require(
143 boost::asio::prefer(ex,
144 boost::asio::execution::relationship.fork,
145 boost::asio::execution::allocator(alloc)),
146 boost::asio::execution::blocking.never),
149 case completion_type::defer:
150 boost::asio::execution::execute(
151 boost::asio::require(
152 boost::asio::prefer(ex,
153 boost::asio::execution::relationship.continuation,
154 boost::asio::execution::allocator(alloc)),
155 boost::asio::execution::blocking.never),
158 case completion_type::dispatch:
159 boost::asio::execution::execute(
160 boost::asio::prefer(ex,
161 boost::asio::execution::blocking.possibly,
162 boost::asio::execution::allocator(alloc)),
165 case completion_type::destroy:
173 struct accept_operation : operation<error_code> {
174 explicit accept_operation(complete_fn complete) noexcept
175 : operation(complete) {}
177 using accept_sync = sync_operation<accept_operation>;
179 template <
typename Handler,
typename IoExecutor>
180 using accept_async = async_operation<accept_operation, Handler, IoExecutor>;
184 struct stream_connect_operation : operation<error_code> {
187 explicit stream_connect_operation(complete_fn complete,
188 stream_impl& stream) noexcept
189 : operation(complete), stream(stream)
192 using stream_connect_sync = sync_operation<stream_connect_operation>;
194 template <
typename Handler,
typename IoExecutor>
195 using stream_connect_async = async_operation<
196 stream_connect_operation, Handler, IoExecutor>;
200 struct stream_accept_operation : operation<error_code> {
203 explicit stream_accept_operation(complete_fn complete,
204 stream_impl& stream) noexcept
205 : operation(complete), stream(stream)
208 using stream_accept_sync = sync_operation<stream_accept_operation>;
210 template <
typename Handler,
typename IoExecutor>
211 using stream_accept_async = async_operation<
212 stream_accept_operation, Handler, IoExecutor>;
216 struct stream_data_operation : operation<error_code, size_t> {
217 static constexpr uint16_t max_iovs = 128;
218 iovec iovs[max_iovs];
219 uint16_t num_iovs = 0;
220 size_t bytes_transferred = 0;
222 explicit stream_data_operation(complete_fn complete) noexcept
223 : operation(complete) {}
225 using stream_data_sync = sync_operation<stream_data_operation>;
227 template <
typename Handler,
typename IoExecutor>
228 using stream_data_async = async_operation<
229 stream_data_operation, Handler, IoExecutor>;
233 struct stream_header_read_operation : operation<error_code> {
236 stream_header_read_operation(complete_fn complete,
237 h3::fields& fields) noexcept
238 : operation(complete), fields(fields)
241 using stream_header_read_sync = sync_operation<stream_header_read_operation>;
243 template <
typename Handler,
typename IoExecutor>
244 using stream_header_read_async = async_operation<
245 stream_header_read_operation, Handler, IoExecutor>;
249 struct stream_header_write_operation : operation<error_code> {
250 const h3::fields& fields;
252 stream_header_write_operation(complete_fn complete,
253 const h3::fields& fields) noexcept
254 : operation(complete), fields(fields)
258 using stream_header_write_sync = sync_operation<stream_header_write_operation>;
260 template <
typename Handler,
typename IoExecutor>
261 using stream_header_write_async = async_operation<
262 stream_header_write_operation, Handler, IoExecutor>;
266 struct stream_close_operation : operation<error_code> {
267 explicit stream_close_operation(complete_fn complete) noexcept
268 : operation(complete) {}
271 using stream_close_sync = sync_operation<stream_close_operation>;
273 template <
typename Handler,
typename IoExecutor>
274 using stream_close_async = async_operation<
275 stream_close_operation, Handler, IoExecutor>;