Embedded Template Library 1.0
Loading...
Searching...
No Matches
bip_buffer_spsc_atomic.h
Go to the documentation of this file.
1
2
3/******************************************************************************
4The MIT License(MIT)
5
6Embedded Template Library.
7https://github.com/ETLCPP/etl
8https://www.etlcpp.com
9
10Copyright(c) 2021 Benedek Kupper, John Wellbelove
11
12Permission is hereby granted, free of charge, to any person obtaining a copy
13of this software and associated documentation files(the "Software"), to deal
14in the Software without restriction, including without limitation the rights
15to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
16copies of the Software, and to permit persons to whom the Software is
17furnished to do so, subject to the following conditions :
18
19The above copyright notice and this permission notice shall be included in all
20copies or substantial portions of the Software.
21
22THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
25AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28SOFTWARE.
29******************************************************************************/
30
37
38#ifndef ETL_BIP_BUFFER_SPSC_ATOMIC_INCLUDED
39#define ETL_BIP_BUFFER_SPSC_ATOMIC_INCLUDED
40
41#include "platform.h"
42#include "alignment.h"
43#include "atomic.h"
44#include "error_handler.h"
45#include "file_error_numbers.h"
46#include "integral_limits.h"
47#include "memory.h"
48#include "memory_model.h"
49#include "parameter_type.h"
50#include "span.h"
51#include "utility.h"
52
53#include <stddef.h>
54#include <stdint.h>
55
56#if ETL_HAS_ATOMIC
57
58namespace etl
59{
60 //***************************************************************************
62 //***************************************************************************
63 class bip_buffer_exception : public exception
64 {
65 public:
66
67 bip_buffer_exception(string_type reason_, string_type file_name_, numeric_type line_number_)
68 : exception(reason_, file_name_, line_number_)
69 {
70 }
71 };
72
73 //***************************************************************************
75 //***************************************************************************
76 class bip_buffer_reserve_invalid : public bip_buffer_exception
77 {
78 public:
79
80 bip_buffer_reserve_invalid(string_type file_name_, numeric_type line_number_)
81 : bip_buffer_exception(ETL_ERROR_TEXT("bip_buffer:reserve", ETL_BIP_BUFFER_SPSC_ATOMIC_FILE_ID"A"), file_name_, line_number_)
82 {
83 }
84 };
85
86 //***************************************************************************
88 //***************************************************************************
89 template <size_t Memory_Model = etl::memory_model::MEMORY_MODEL_LARGE>
90 class bip_buffer_spsc_atomic_base
91 {
92 public:
93
95 typedef typename etl::size_type_lookup<Memory_Model>::type size_type;
96
97 //*************************************************************************
99 //*************************************************************************
100 bool empty() const
101 {
102 return size() == 0;
103 }
104
105 //*************************************************************************
107 //*************************************************************************
108 bool full() const
109 {
110 return available() == 0;
111 }
112
113 //*************************************************************************
116 //*************************************************************************
117 size_type size() const
118 {
119 size_type write_index = write.load(etl::memory_order_acquire);
120 size_type read_index = read.load(etl::memory_order_acquire);
121
122 // no wraparound
123 if (write_index >= read_index)
124 {
125 // size is distance between read and write
126 return write_index - read_index;
127 }
128 else
129 {
130 size_type last_index = last.load(etl::memory_order_acquire);
131
132 // size is distance between beginning and write, plus read and last
133 return (write_index - 0) + (last_index - read_index);
134 }
135 }
136
137 //*************************************************************************
139 //*************************************************************************
140 size_type available() const
141 {
142 size_type write_index = write.load(etl::memory_order_acquire);
143 size_type read_index = read.load(etl::memory_order_acquire);
144
145 // no wraparound
146 if (write_index >= read_index)
147 {
148 size_type forward_size = capacity() - write_index;
149
150 // check if there's more space if wrapping around
151 if (read_index > (forward_size + 1))
152 {
153 return read_index - 1;
154 }
155 else
156 {
157 return forward_size;
158 }
159 }
160 else // read_index > write_index
161 {
162 return read_index - write_index - 1;
163 }
164 }
165
166 //*************************************************************************
168 //*************************************************************************
169 size_type capacity() const
170 {
171 return Reserved;
172 }
173
174 //*************************************************************************
176 //*************************************************************************
177 size_type max_size() const
178 {
179 return Reserved;
180 }
181
182 protected:
183
184 //*************************************************************************
186 //*************************************************************************
187 bip_buffer_spsc_atomic_base(size_type reserved_)
188 : read(0)
189 , write(0)
190 , last(0)
191 , Reserved(reserved_)
192 {
193 }
194
195 //*************************************************************************
196 void reset()
197 {
198 read.store(0, etl::memory_order_release);
199 write.store(0, etl::memory_order_release);
200 last.store(0, etl::memory_order_release);
201 }
202
203 //*************************************************************************
204 size_type get_write_reserve(size_type* psize, size_type fallback_size = numeric_limits<size_type>::max())
205 {
206 size_type write_index = write.load(etl::memory_order_relaxed);
207 size_type read_index = read.load(etl::memory_order_acquire);
208
209 // No wraparound
210 if (write_index >= read_index)
211 {
212 size_type forward_size = capacity() - write_index;
213
214 // We still fit in linearly
215 if (*psize <= forward_size)
216 {
217 return write_index;
218 }
219 // There isn't more space even when wrapping around,
220 // or the linear size is good enough as fallback
221 else if ((read_index <= (forward_size + 1)) || (fallback_size <= forward_size))
222 {
223 *psize = forward_size;
224 return write_index;
225 }
226 // Better wrap around now
227 else
228 {
229 // Check if size fits.
230 // When wrapping, the write index cannot reach read index,
231 // then we'd not be able to distinguish wrapped situation from linear.
232 if (*psize >= read_index)
233 {
234 if (read_index > 0)
235 {
236 *psize = read_index - 1;
237 }
238 else
239 {
240 *psize = 0;
241 }
242 }
243
244 return 0;
245 }
246 }
247 else // read_index > write_index
248 {
249 // Doesn't fit
250 if (*psize >= read_index - write_index)
251 {
252 *psize = read_index - write_index - 1;
253 }
254
255 return write_index;
256 }
257 }
258
259 //*************************************************************************
260 void apply_write_reserve(size_type windex, size_type wsize)
261 {
262 if (wsize > 0)
263 {
264 size_type write_index = write.load(etl::memory_order_relaxed);
265 size_type read_index = read.load(etl::memory_order_acquire);
266
267 // Wrapped around already
268 if (write_index < read_index)
269 {
270 ETL_ASSERT_OR_RETURN((windex == write_index) && ((wsize + 1) <= read_index), ETL_ERROR(bip_buffer_reserve_invalid));
271 }
272 // No wraparound so far, also not wrapping around with this block
273 else if (windex == write_index)
274 {
275 ETL_ASSERT_OR_RETURN(wsize <= (capacity() - write_index), ETL_ERROR(bip_buffer_reserve_invalid));
276
277 // Move both indexes forward
278 last.store(windex + wsize, etl::memory_order_release);
279 }
280 // Wrapping around now
281 else
282 {
283 ETL_ASSERT_OR_RETURN((windex == 0) && ((wsize + 1) <= read_index), ETL_ERROR(bip_buffer_reserve_invalid));
284
285 // Correct wrapping point
286 last.store(write_index, etl::memory_order_release);
287 }
288
289 // Always update write index
290 write.store(windex + wsize, etl::memory_order_release);
291 }
292 }
293
294 //*************************************************************************
295 size_type get_read_reserve(size_type* psize)
296 {
297 size_type read_index = read.load(etl::memory_order_relaxed);
298 size_type write_index = write.load(etl::memory_order_acquire);
299
300 if (read_index > write_index)
301 {
302 // Writer has wrapped around
303 size_type last_index = last.load(etl::memory_order_relaxed);
304
305 if (read_index == last_index)
306 {
307 // Reader reached the end, start read from 0
308 read_index = 0;
309 }
310 else // (read_index < last_index)
311 {
312 // Use the remaining buffer at the end
313 write_index = last_index;
314 }
315 }
316 else
317 {
318 // No wraparound, nothing to adjust
319 }
320
321 // Limit to max available size
322 if ((write_index - read_index) < *psize)
323 {
324 *psize = write_index - read_index;
325 }
326
327 return read_index;
328 }
329
330 //*************************************************************************
331 void apply_read_reserve(size_type rindex, size_type rsize)
332 {
333 if (rsize > 0)
334 {
335 size_type rsize_checker = rsize;
336 ETL_ASSERT_OR_RETURN((rindex == get_read_reserve(&rsize_checker)) && (rsize == rsize_checker), ETL_ERROR(bip_buffer_reserve_invalid));
337
338 read.store(rindex + rsize, etl::memory_order_release);
339 }
340 }
341
342 private:
343
344 etl::atomic<size_type> read;
345 etl::atomic<size_type> write;
346 etl::atomic<size_type> last;
347 const size_type Reserved;
348
349 #if defined(ETL_POLYMORPHIC_SPSC_BIP_BUFFER_ATOMIC) || defined(ETL_POLYMORPHIC_CONTAINERS)
350
351 public:
352
353 virtual ~bip_buffer_spsc_atomic_base() {}
354 #else
355
356 protected:
357
358 ~bip_buffer_spsc_atomic_base() {}
359 #endif
360 };
361
362 //***************************************************************************
364 //***************************************************************************
365 template <typename T, const size_t Memory_Model = etl::memory_model::MEMORY_MODEL_LARGE>
366 class ibip_buffer_spsc_atomic : public bip_buffer_spsc_atomic_base<Memory_Model>
367 {
368 private:
369
370 typedef typename etl::bip_buffer_spsc_atomic_base<Memory_Model> base_t;
371 using base_t::apply_read_reserve;
372 using base_t::apply_write_reserve;
373 using base_t::get_read_reserve;
374 using base_t::get_write_reserve;
375 using base_t::reset;
376
377 public:
378
379 typedef T value_type;
380 typedef T& reference;
381 typedef const T& const_reference;
382 #if ETL_USING_CPP11
383 typedef T&& rvalue_reference;
384 #endif
385 typedef typename base_t::size_type size_type;
386
387 using base_t::max_size;
388
389 //*************************************************************************
390 // Reserves a memory area for reading (up to the max_reserve_size).
391 //*************************************************************************
392 span<T> read_reserve(size_type max_reserve_size = numeric_limits<size_type>::max())
393 {
394 size_type reserve_size = max_reserve_size;
395 size_type rindex = get_read_reserve(&reserve_size);
396
397 return span<T>(p_buffer + rindex, reserve_size);
398 }
399
400 //*************************************************************************
401 // Commits the previously reserved read memory area
402 // the reserve can be trimmed at the end before committing.
403 // Throws bip_buffer_reserve_invalid
404 //*************************************************************************
405 void read_commit(const span<T>& reserve)
406 {
407 size_type rindex = etl::distance(p_buffer, reserve.data());
408 apply_read_reserve(rindex, reserve.size());
409 }
410
411 //*************************************************************************
412 // Reserves a memory area for writing up to the max_reserve_size.
413 //*************************************************************************
414 span<T> write_reserve(size_type max_reserve_size)
415 {
416 size_type reserve_size = max_reserve_size;
417 size_type windex = get_write_reserve(&reserve_size);
418
419 return span<T>(p_buffer + windex, reserve_size);
420 }
421
422 //*************************************************************************
423 // Reserves an optimal memory area for writing. The buffer will only wrap
424 // around if the available forward space is less than min_reserve_size.
425 //*************************************************************************
426 span<T> write_reserve_optimal(size_type min_reserve_size = 1U)
427 {
428 size_type reserve_size = numeric_limits<size_type>::max();
429 size_type windex = get_write_reserve(&reserve_size, min_reserve_size);
430
431 return span<T>(p_buffer + windex, reserve_size);
432 }
433
434 //*************************************************************************
435 // Commits the previously reserved write memory area
436 // the reserve can be trimmed at the end before committing.
437 // Throws bip_buffer_reserve_invalid
438 //*************************************************************************
439 void write_commit(const span<T>& reserve)
440 {
441 size_type windex = etl::distance(p_buffer, reserve.data());
442 apply_write_reserve(windex, reserve.size());
443 }
444
445 //*************************************************************************
447 //*************************************************************************
448 void clear()
449 {
450 // the buffer might be split into two contiguous blocks
451 for (span<T> reader = read_reserve(); reader.size() > 0; reader = read_reserve())
452 {
453 destroy(reader.begin(), reader.end());
454 read_commit(reader);
455 }
456 // now the buffer is already empty
457 // resetting the buffer here is beneficial to have
458 // the whole buffer available for a single block,
459 // but it requires synchronization between the writer and reader threads
460 reset();
461 }
462
463 protected:
464
465 //*************************************************************************
466 ibip_buffer_spsc_atomic(T* p_buffer_, size_type reserved_)
467 : base_t(reserved_)
468 , p_buffer(p_buffer_)
469 {
470 }
471
472 private:
473
474 // Disable copy construction and assignment.
475 ibip_buffer_spsc_atomic(const ibip_buffer_spsc_atomic&) ETL_DELETE;
476 ibip_buffer_spsc_atomic& operator=(const ibip_buffer_spsc_atomic&) ETL_DELETE;
477
478 #if ETL_USING_CPP11
479 ibip_buffer_spsc_atomic(ibip_buffer_spsc_atomic&&) = delete;
480 ibip_buffer_spsc_atomic& operator=(ibip_buffer_spsc_atomic&&) = delete;
481 #endif
482
483 T* const p_buffer;
484 };
485
486 //***************************************************************************
493 //***************************************************************************
494 template <typename T, const size_t Size, const size_t Memory_Model = etl::memory_model::MEMORY_MODEL_LARGE>
495 class bip_buffer_spsc_atomic : public ibip_buffer_spsc_atomic<T, Memory_Model>
496 {
497 private:
498
499 typedef typename etl::ibip_buffer_spsc_atomic<T, Memory_Model> base_t;
500
501 public:
502
503 typedef typename base_t::size_type size_type;
504
505 private:
506
507 static ETL_CONSTANT size_type Reserved_Size = size_type(Size);
508
509 public:
510
511 ETL_STATIC_ASSERT((Size <= (etl::integral_limits<size_type>::max)), "Size too large for memory model");
512
513 static ETL_CONSTANT size_type MAX_SIZE = size_type(Size);
514
515 //*************************************************************************
517 //*************************************************************************
518 bip_buffer_spsc_atomic()
519 : base_t(reinterpret_cast<T*>(buffer.raw), Reserved_Size)
520 {
521 }
522
523 //*************************************************************************
525 //*************************************************************************
526 ~bip_buffer_spsc_atomic()
527 {
528 base_t::clear();
529 }
530
531 private:
532
534 etl::uninitialized_buffer_of<T, Reserved_Size> buffer;
535 };
536
537 template <typename T, const size_t Size, const size_t Memory_Model>
538 ETL_CONSTANT typename bip_buffer_spsc_atomic<T, Size, Memory_Model>::size_type bip_buffer_spsc_atomic<T, Size, Memory_Model>::Reserved_Size;
539} // namespace etl
540
541#endif /* ETL_HAS_ATOMIC && ETL_USING_CPP11 */
542
543#endif /* ETL_BIP_BUFFER_SPSC_ATOMIC_INCLUDED */
Definition exception.h:59
bitset_ext
Definition absolute.h:40
etl::optional< T > read(etl::bit_stream_reader &stream)
Read a checked type from a stream.
Definition bit_stream.h:1377
ETL_CONSTEXPR TContainer::size_type size(const TContainer &container)
Definition iterator.h:1192
bool write(etl::bit_stream_writer &stream, bool value)
Definition bit_stream.h:995