SeqAn3 3.4.0-rc.3
The Modern C++ library for sequence analysis.
Loading...
Searching...
No Matches
algorithm_executor_blocking.hpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: 2006-2024 Knut Reinert & Freie Universität Berlin
2// SPDX-FileCopyrightText: 2016-2024 Knut Reinert & MPI für molekulare Genetik
3// SPDX-License-Identifier: BSD-3-Clause
4
10#pragma once
11
12#include <functional>
13#include <optional>
14#include <ranges>
15#include <type_traits>
16
19
20namespace seqan3::detail
21{
22
51template <std::ranges::viewable_range resource_t,
52 std::semiregular algorithm_t,
53 std::semiregular algorithm_result_t,
54 typename execution_handler_t = execution_handler_sequential>
55 requires std::ranges::forward_range<resource_t>
56 && std::invocable<algorithm_t,
57 std::ranges::range_reference_t<resource_t>,
58 std::function<void(algorithm_result_t)>>
59class algorithm_executor_blocking
60{
61private:
66 using resource_type = std::views::all_t<resource_t>;
68 using resource_iterator_type = std::ranges::iterator_t<resource_type>;
70 using resource_difference_type = std::iter_difference_t<resource_iterator_type>;
72
77 using bucket_type = std::vector<algorithm_result_t>;
79 using bucket_iterator_type = std::ranges::iterator_t<bucket_type>;
81 using buffer_type = std::vector<bucket_type>;
83 using buffer_iterator_type = std::ranges::iterator_t<buffer_type>;
85
87 enum fill_status
88 {
89 non_empty_buffer,
90 empty_buffer,
91 end_of_resource
92 };
93
94public:
100 algorithm_executor_blocking() = delete;
102 algorithm_executor_blocking(algorithm_executor_blocking const &) = delete;
103
121 algorithm_executor_blocking(algorithm_executor_blocking && other) noexcept :
122 algorithm_executor_blocking{std::move(other), other.resource_position()}
123 {}
124
126 algorithm_executor_blocking & operator=(algorithm_executor_blocking const &) = delete;
127
130 algorithm_executor_blocking & operator=(algorithm_executor_blocking && other)
131 {
132 auto old_resource_position = other.resource_position();
133
134 resource = std::move(other.resource);
135 move_initialise(std::move(other), old_resource_position);
136 return *this;
137 }
138
140 ~algorithm_executor_blocking() = default;
141
156 algorithm_executor_blocking(resource_t resource,
157 algorithm_t algorithm,
158 algorithm_result_t const SEQAN3_DOXYGEN_ONLY(result) = algorithm_result_t{},
159 execution_handler_t && exec_handler = execution_handler_t{}) :
160 exec_handler{std::move(exec_handler)},
161 resource{std::forward<resource_t>(resource)},
162 resource_it{std::ranges::begin(this->resource)},
163 algorithm{std::move(algorithm)}
164 {
165 if constexpr (std::same_as<execution_handler_t, execution_handler_parallel>)
166 buffer_size = static_cast<size_t>(std::ranges::distance(this->resource));
167
168 buffer.resize(buffer_size);
169 buffer_it = buffer.end();
170 buffer_end_it = buffer_it;
171 }
173
190 {
191 fill_status status;
192 // Each invocation of the algorithm might produce zero results (e.g. a search might not find a query)
193 // this repeats the algorithm until it produces the first result or the input resource was consumed.
194 do
195 {
196 status = fill_buffer();
197 }
198 while (status == fill_status::empty_buffer);
199
200 std::optional<algorithm_result_t> result{std::nullopt};
201 if (status == fill_status::end_of_resource)
202 return result;
203
204 assert(status == fill_status::non_empty_buffer);
205 assert(bucket_it != buffer_it->end());
206
207 result = std::ranges::iter_move(bucket_it);
208 go_to_next_result(); // Go to next buffered result.
209 return result;
210 }
211
213 bool is_eof() noexcept
214 {
215 return resource_it == std::ranges::end(resource);
216 }
217
218private:
224 algorithm_executor_blocking(algorithm_executor_blocking && other,
225 resource_difference_type old_resource_position) noexcept :
226 resource{std::move(other.resource)}
227 {
228 move_initialise(std::move(other), old_resource_position);
229 }
230
237 resource_difference_type resource_position()
238 {
239 // Get the old resource position.
240 auto position = std::ranges::distance(std::ranges::begin(resource), resource_it);
241 assert(position >= 0);
242 return position;
243 }
244
246 fill_status fill_buffer()
247 {
248 if (!is_buffer_empty()) // Not everything consumed yet.
249 return fill_status::non_empty_buffer;
250
251 if (is_eof()) // Case: reached end of resource.
252 return fill_status::end_of_resource;
253
254 // Reset the buckets and the buffer iterator.
255 reset_buffer();
256
257 // Execute the algorithm (possibly asynchronous) and fill the buckets in this pre-assigned order.
258 for (buffer_end_it = buffer_it; buffer_end_it != buffer.end() && !is_eof(); ++buffer_end_it, ++resource_it)
259 {
260 exec_handler.execute(algorithm,
261 *resource_it,
262 [target_buffer_it = buffer_end_it](auto && algorithm_result)
263 {
264 target_buffer_it->push_back(std::move(algorithm_result));
265 });
266 }
267
268 exec_handler.wait();
269
270 // Move the results iterator to the next available result. (This skips empty results of the algorithm)
271 find_next_non_empty_bucket();
272
273 if (is_buffer_empty())
274 return fill_status::empty_buffer;
275
276 return fill_status::non_empty_buffer;
277 }
278
282 bool is_buffer_empty() const
283 {
284 return buffer_it == buffer_end_it;
285 }
286
295 void reset_buffer()
296 {
297 // Clear all buckets
298 for (auto & bucket : buffer)
299 bucket.clear();
300
301 // Reset the iterator over the buckets.
302 buffer_it = buffer.begin();
303 }
304
313 void find_next_non_empty_bucket()
314 {
315 assert(buffer_it <= buffer_end_it);
316 // find first buffered bucket that contains at least one element
317 buffer_it = std::find_if(buffer_it,
318 buffer_end_it,
319 [](auto const & buffer)
320 {
321 return !buffer.empty();
322 });
323
324 if (buffer_it != buffer_end_it)
325 bucket_it = buffer_it->begin();
326 }
327
335 void go_to_next_result()
336 {
337 if (++bucket_it == buffer_it->end())
338 {
339 ++buffer_it;
340 find_next_non_empty_bucket();
341 }
342 }
343
345 void move_initialise(algorithm_executor_blocking && other, resource_difference_type old_resource_position) noexcept
346 {
347 algorithm = std::move(other.algorithm);
348 buffer_size = std::move(other.buffer_size);
349 exec_handler = std::move(other.exec_handler);
350 // Move the resource and set the iterator state accordingly.
351 resource_it = std::ranges::next(std::ranges::begin(resource), old_resource_position);
352
353 // Get the old buffer and bucket iterator positions.
354 auto buffer_it_position = other.buffer_it - other.buffer.begin();
355 auto buffer_end_it_position = other.buffer_end_it - other.buffer.begin();
356
357 std::ptrdiff_t bucket_it_position = 0;
358 if (buffer_it_position != buffer_end_it_position)
359 bucket_it_position = other.bucket_it - other.buffer_it->begin();
360
361 // Move the buffer and set the buffer and bucket iterator accordingly.
362 buffer = std::move(other.buffer);
363 buffer_it = buffer.begin() + buffer_it_position;
364 buffer_end_it = buffer.begin() + buffer_end_it_position;
365
366 if (buffer_it_position != buffer_end_it_position)
367 bucket_it = buffer_it->begin() + bucket_it_position;
368 }
369
371 execution_handler_t exec_handler{};
372
374 resource_type resource; // a std::ranges::view
376 resource_iterator_type resource_it{};
378 algorithm_t algorithm{};
379
381 buffer_type buffer{};
383 buffer_iterator_type buffer_it{};
385 buffer_iterator_type buffer_end_it{};
387 bucket_iterator_type bucket_it{};
389 size_t buffer_size{1};
390};
391
398template <typename resource_rng_t, std::semiregular algorithm_t, std::semiregular algorithm_result_t>
399algorithm_executor_blocking(resource_rng_t &&, algorithm_t, algorithm_result_t const &)
400 -> algorithm_executor_blocking<resource_rng_t, algorithm_t, algorithm_result_t, execution_handler_sequential>;
402} // namespace seqan3::detail
T begin(T... args)
Provides seqan3::detail::execution_handler_parallel.
Provides seqan3::detail::execution_handler_sequential.
T find_if(T... args)
T forward(T... args)
constexpr auto is_eof
Checks whether a given letter is equal to the EOF constant defined in <cstdio>.
Definition predicate.hpp:72
SeqAn specific customisations in the standard namespace.
Hide me