DPDK  22.11.11
rte_ring_rts_elem_pvt.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2020 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #ifndef _RTE_RING_RTS_ELEM_PVT_H_
11 #define _RTE_RING_RTS_ELEM_PVT_H_
12 
24 static __rte_always_inline void
25 __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
26 {
27  union __rte_ring_rts_poscnt h, ot, nt;
28 
29  /*
30  * If there are other enqueues/dequeues in progress that
31  * might preceded us, then don't update tail with new value.
32  */
33 
34  /*
35  * A0 = {A0.a, A0.b}: Synchronizes with the CAS at R0.
36  * The CAS at R0 in same typed thread establishes a happens-before
37  * relationship with this load acquire. Ensures that this thread
38  * observes the same or later values for h.raw/h.val.cnt
39  * observed by the other thread when it updated ht->tail.raw.
40  * If not, ht->tail.raw may get updated out of sync (e.g. getting
41  * updated to the same value twice). A0.a makes sure this condition
42  * holds when CAS succeeds and A0.b when it fails.
43  */
44  /* A0.a */
45  ot.raw = __atomic_load_n(&ht->tail.raw, __ATOMIC_ACQUIRE);
46 
47  do {
48  /* on 32-bit systems we have to do atomic read here */
49  h.raw = __atomic_load_n(&ht->head.raw, __ATOMIC_RELAXED);
50 
51  nt.raw = ot.raw;
52  if (++nt.val.cnt == h.val.cnt)
53  nt.val.pos = h.val.pos;
54  /*
55  * R0: Synchronizes with A2 of a different thread of the opposite type and A0.b
56  * of a different thread of the same type.
57  */
58  /* A0.b */
59  } while (__atomic_compare_exchange_n(&ht->tail.raw, &ot.raw, nt.raw,
60  0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE) == 0);
61 }
62 
67 static __rte_always_inline union __rte_ring_rts_poscnt
68 __rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
69  int memorder)
70 {
71  union __rte_ring_rts_poscnt h;
72  uint32_t max = ht->htd_max;
73 
74 
75  h.raw = __atomic_load_n(&ht->head.raw, memorder);
76 
77  while (h.val.pos - ht->tail.val.pos > max) {
78  rte_pause();
79  h.raw = __atomic_load_n(&ht->head.raw, memorder);
80  }
81 
82  return h;
83 }
84 
88 static __rte_always_inline uint32_t
89 __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
90  enum rte_ring_queue_behavior behavior, uint32_t *old_head,
91  uint32_t *free_entries)
92 {
93  uint32_t n, cons_tail;
94  union __rte_ring_rts_poscnt nh, oh;
95 
96  const uint32_t capacity = r->capacity;
97 
98  do {
99  /* Reset n to the initial burst count */
100  n = num;
101 
102  /*
103  * wait for prod head/tail distance,
104  * make sure that we read prod head *before*
105  * reading cons tail.
106  */
107  /*
108  * A1 Synchronizes with the CAS at R1.
109  * Establishes a happens-before relationship with a thread of the same
110  * type that released the ht.raw, ensuring this thread observes all of
111  * its memory effects needed to maintain a safe partial order.
112  */
113  oh = __rte_ring_rts_head_wait(&r->rts_prod, __ATOMIC_ACQUIRE);
114 
115  /*
116  * A2: Establish a synchronizes-with edge using a store-release at R0.
117  * This ensures that all memory effects from the preceding opposing
118  * thread are observed.
119  */
120  cons_tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
121 
122  /*
123  * The subtraction is done between two unsigned 32bits value
124  * (the result is always modulo 32 bits even if we have
125  * *old_head > cons_tail). So 'free_entries' is always between 0
126  * and capacity (which is < size).
127  */
128  *free_entries = capacity + cons_tail - oh.val.pos;
129 
130  /* check that we have enough room in ring */
131  if (unlikely(n > *free_entries))
132  n = (behavior == RTE_RING_QUEUE_FIXED) ?
133  0 : *free_entries;
134 
135  if (n == 0)
136  break;
137 
138  nh.val.pos = oh.val.pos + n;
139  nh.val.cnt = oh.val.cnt + 1;
140 
141  /*
142  * R1: Establishes a synchronizes-with edge with the load-acquire
143  * of ht.raw at A1. Ensures that the store-release to the tail by
144  * this thread, if it was of the opposite type, becomes
145  * visible to another thread of the current type. That thread will
146  * then observe the updates in the same order, keeping a safe
147  * partial order.
148  */
149  } while (__atomic_compare_exchange_n(&r->rts_prod.head.raw,
150  &oh.raw, nh.raw,
151  0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
152 
153  *old_head = oh.val.pos;
154  return n;
155 }
156 
160 static __rte_always_inline unsigned int
161 __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
162  enum rte_ring_queue_behavior behavior, uint32_t *old_head,
163  uint32_t *entries)
164 {
165  uint32_t n, prod_tail;
166  union __rte_ring_rts_poscnt nh, oh;
167 
168  /* move cons.head atomically */
169  do {
170  /* Restore n as it may change every loop */
171  n = num;
172 
173  /*
174  * wait for cons head/tail distance,
175  * make sure that we read cons head *before*
176  * reading prod tail.
177  */
178  /*
179  * A3: Synchronizes with the CAS at R2.
180  * Establishes a happens-before relationship with a thread of the same
181  * type that released the ht.raw, ensuring this thread observes all of
182  * its memory effects needed to maintain a safe partial order.
183  */
184  oh = __rte_ring_rts_head_wait(&r->rts_cons, __ATOMIC_ACQUIRE);
185 
186  /*
187  * A4: Establish a synchronizes-with edge using a store-release at R0.
188  * This ensures that all memory effects from the preceding opposing
189  * thread are observed.
190  */
191  prod_tail = __atomic_load_n(&r->prod.tail, __ATOMIC_ACQUIRE);
192 
193  /* The subtraction is done between two unsigned 32bits value
194  * (the result is always modulo 32 bits even if we have
195  * cons_head > prod_tail). So 'entries' is always between 0
196  * and size(ring)-1.
197  */
198  *entries = prod_tail - oh.val.pos;
199 
200  /* Set the actual entries for dequeue */
201  if (n > *entries)
202  n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
203 
204  if (unlikely(n == 0))
205  break;
206 
207  nh.val.pos = oh.val.pos + n;
208  nh.val.cnt = oh.val.cnt + 1;
209 
210  /*
211  * R2: Establishes a synchronizes-with edge with the load-acquire
212  * of ht.raw at A3. Ensures that the store-release to the tail by
213  * this thread, if it was of the opposite type, becomes
214  * visible to another thread of the current type. That thread will
215  * then observe the updates in the same order, keeping a safe
216  * partial order.
217  */
218  } while (__atomic_compare_exchange_n(&r->rts_cons.head.raw,
219  &oh.raw, nh.raw,
220  0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
221 
222  *old_head = oh.val.pos;
223  return n;
224 }
225 
248 static __rte_always_inline unsigned int
249 __rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table,
250  uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
251  uint32_t *free_space)
252 {
253  uint32_t free, head;
254 
255  n = __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free);
256 
257  if (n != 0) {
258  __rte_ring_enqueue_elems(r, head, obj_table, esize, n);
259  __rte_ring_rts_update_tail(&r->rts_prod);
260  }
261 
262  if (free_space != NULL)
263  *free_space = free - n;
264  return n;
265 }
266 
289 static __rte_always_inline unsigned int
290 __rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table,
291  uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
292  uint32_t *available)
293 {
294  uint32_t entries, head;
295 
296  n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries);
297 
298  if (n != 0) {
299  __rte_ring_dequeue_elems(r, head, obj_table, esize, n);
300  __rte_ring_rts_update_tail(&r->rts_cons);
301  }
302 
303  if (available != NULL)
304  *available = entries - n;
305  return n;
306 }
307 
308 #endif /* _RTE_RING_RTS_ELEM_PVT_H_ */
#define __rte_always_inline
Definition: rte_common.h:267
rte_ring_queue_behavior
Definition: rte_ring_core.h:43
#define unlikely(x)
static void rte_pause(void)
uint32_t capacity
volatile uint32_t tail
Definition: rte_ring_core.h:70