티스토리 뷰
0.
dpdk 16.11 기준 lib/lib_ring/rte_ring.h
ring queue이다.
multi producer, multi consumer 에 대해서도 atomicity 보장을 이용해 lock 없이 동작한다.
일반 ring에서의 producer, consumer가 multi 성격을 인정하면서
producing/ consuming range 느낌으로.. ㅎㅎ head와 tail을 갖는다.
1번 consumer가 consuming에 뛰어 들어 cons.head를 움직이면
2번 consumer가 consuming에 또 뛰어 들 수 있다.
다만 1번 consumer가 먼저 종료할 때 까지 2번은 기다려야 한다.
1. multiwriter에 대해서 어떻게 safe하게 되는지는 아래 함수 하나만 보면 된다.
아래는 간략화된 코드임.
static inline int __attribute__((always_inline))
__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
unsigned n, enum rte_ring_queue_behavior behavior)
{
/* 1.
cons_head를 읽고 업데이트 한다.
이 때 나 혼자 읽고 업데이트 할 수 있을 때까지 spinning
- single writer버전에선 굳이 spinning은 안한다.
*/
/* move cons.head atomically */
do {
/* ... */
cons_head = r->cons.head;
/* ... */
cons_next = cons_head + n;
/* compare and swap! CAS가 사용됨.*/
success = rte_atomic32_cmpset(&r->cons.head, cons_head,
cons_next);
} while (unlikely(success == 0));
/* 2.
cons_head 값을 바탕으로 얘 혼자만의 구간을 잡아서 dequeue (copy) */
/* copy in table */
DEQUEUE_PTRS();
rte_smp_rmb();
/* 선행하는 다른 writer가 빠져나갈 때까지 기다린다.
- single writer버전에선 굳이 대기타진 안한다. */
/*
* If there are other dequeues in progress that preceded us,
* we need to wait for them to complete
*/
while (unlikely(r->cons.tail != cons_head)) {
/* ... pause하다가 계속 반복되면 차라리 yield 하면서 기다림 */
}
/* 4. tail을 업데이트하고 끝남.
consumer가 굳이 head, tail을 갖는 이유는
가장 시간이 오래걸리는 DEQUEUE_PTRS()를 동시에 여럿이서 하게 해주려고지.
하지만 결과적으론 이 함수가 불리고 나면 cons.head와 cons.tail은 같아야 해.
*/
r->cons.tail = cons_next;
}
RCU랑 코드 구조가 거의 비슷함...
2. enqueue/dequeue는 pipelined 처리
/* the actual copy of pointers on the ring to obj_table.
* Placed here since identical code needed in both
* single and multi consumer dequeue functions */
#define DEQUEUE_PTRS() do { \
uint32_t idx = cons_head & mask; \
const uint32_t size = r->cons.size; \
if (likely(idx + n < size)) { \
for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
obj_table[i] = r->ring[idx]; \
obj_table[i+1] = r->ring[idx+1]; \
obj_table[i+2] = r->ring[idx+2]; \
obj_table[i+3] = r->ring[idx+3]; \
} \
switch (n & 0x3) { \
case 3: obj_table[i++] = r->ring[idx++]; \
case 2: obj_table[i++] = r->ring[idx++]; \
case 1: obj_table[i++] = r->ring[idx++]; \
} \
} else { \
for (i = 0; idx < size; i++, idx++) \
obj_table[i] = r->ring[idx]; \
for (idx = 0; i < n; i++, idx++) \
obj_table[i] = r->ring[idx]; \
} \
} while (0)
3. 이건 ring entry count시 참고 사항
prod 는 이제 데이터를 넣을 위치
cons는 이제 데이터를 뺄 위치
*
static inline unsigned
rte_ring_count(const struct rte_ring *r)
{
uint32_t prod_tail = r->prod.tail;
uint32_t cons_tail = r->cons.tail;
return (prod_tail - cons_tail) & r->prod.mask;
}
*
static inline unsigned
rte_ring_free_count(const struct rte_ring *r)
{
uint32_t prod_tail = r->prod.tail;
uint32_t cons_tail = r->cons.tail;
return (cons_tail - prod_tail - 1) & r->prod.mask;
}