티스토리 뷰

LB/dpdk

ring 동작

bramach 2018. 3. 31. 22:47

0.

dpdk 16.11 기준 lib/lib_ring/rte_ring.h

  ring queue이다.

multi producer, multi consumer 에 대해서도 atomicity 보장을 이용해 lock 없이 동작한다.

일반 ring에서의 producer, consumer가 multi 성격을 인정하면서

producing/ consuming range 느낌으로.. ㅎㅎ head와 tail을 갖는다.


1번 consumer가 consuming에 뛰어 들어 cons.head를 움직이면

2번 consumer가 consuming에 또 뛰어 들 수 있다.

다만 1번 consumer가 먼저 종료할 때 까지 2번은 기다려야 한다.



1. multiwriter에 대해서 어떻게 safe하게 되는지는 아래 함수 하나만 보면 된다.

아래는 간략화된 코드임.


static inline int __attribute__((always_inline))

__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,

         unsigned n, enum rte_ring_queue_behavior behavior)

{

    /* 1.

       cons_head를 읽고 업데이트 한다.

       이 때 나 혼자 읽고 업데이트 할 수 있을 때까지 spinning

- single writer버전에선 굳이 spinning은 안한다.

     */

    /* move cons.head atomically */

    do {

            /* ... */

       

        cons_head = r->cons.head;


        /* ... */

       

        cons_next = cons_head + n;

        /* compare and swap! CAS가 사용됨.*/

        success = rte_atomic32_cmpset(&r->cons.head, cons_head,

                          cons_next);


    } while (unlikely(success == 0));


    /* 2.

       cons_head 값을 바탕으로 얘 혼자만의 구간을 잡아서 dequeue (copy) */

    /* copy in table */

    DEQUEUE_PTRS();

    rte_smp_rmb();


    /* 선행하는 다른 writer가 빠져나갈 때까지 기다린다.

- single writer버전에선 굳이 대기타진 안한다. */

    /*

     * If there are other dequeues in progress that preceded us,

     * we need to wait for them to complete

     */

    while (unlikely(r->cons.tail != cons_head)) {

        /* ... pause하다가 계속 반복되면 차라리 yield 하면서 기다림 */

    }


    /* 4. tail을 업데이트하고 끝남.

       consumer가 굳이 head, tail을 갖는 이유는

       가장 시간이 오래걸리는 DEQUEUE_PTRS()를 동시에 여럿이서 하게 해주려고지.

       하지만 결과적으론 이 함수가 불리고 나면 cons.head와 cons.tail은 같아야 해.

     */

    r->cons.tail = cons_next;


}


RCU랑 코드 구조가 거의 비슷함... 



2. enqueue/dequeue는 pipelined 처리

/* the actual copy of pointers on the ring to obj_table.

 * Placed here since identical code needed in both

 * single and multi consumer dequeue functions */

#define DEQUEUE_PTRS() do { \

    uint32_t idx = cons_head & mask; \

    const uint32_t size = r->cons.size; \

    if (likely(idx + n < size)) { \

        for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\

            obj_table[i] = r->ring[idx]; \

            obj_table[i+1] = r->ring[idx+1]; \

            obj_table[i+2] = r->ring[idx+2]; \

            obj_table[i+3] = r->ring[idx+3]; \

        } \

        switch (n & 0x3) { \

            case 3: obj_table[i++] = r->ring[idx++]; \

            case 2: obj_table[i++] = r->ring[idx++]; \

            case 1: obj_table[i++] = r->ring[idx++]; \

        } \

    } else { \

        for (i = 0; idx < size; i++, idx++) \

            obj_table[i] = r->ring[idx]; \

        for (idx = 0; i < n; i++, idx++) \

            obj_table[i] = r->ring[idx]; \

    } \

} while (0)







3. 이건 ring entry count시 참고 사항


 prod 는 이제 데이터를 넣을 위치

cons는 이제 데이터를 뺄 위치


*

static inline unsigned

rte_ring_count(const struct rte_ring *r)

{

    uint32_t prod_tail = r->prod.tail;

    uint32_t cons_tail = r->cons.tail;

    return (prod_tail - cons_tail) & r->prod.mask;

}

*

static inline unsigned

rte_ring_free_count(const struct rte_ring *r)

{

    uint32_t prod_tail = r->prod.tail;

    uint32_t cons_tail = r->cons.tail;

    return (cons_tail - prod_tail - 1) & r->prod.mask;

}

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함