diff mbox series

[09/12] ring: introduce lockless ring buffer

Message ID 20180604095520.8563-10-xiaoguangrong@tencent.com
State New
Headers show
Series migration: improve multithreads for compression and decompression | expand

Commit Message

Xiao Guangrong June 4, 2018, 9:55 a.m. UTC
From: Xiao Guangrong <xiaoguangrong@tencent.com>

It's the simple lockless ring buffer implement which supports both
single producer vs. single consumer and multiple producers vs.
single consumer.

Many lessons were learned from Linux Kernel's kfifo (1) and DPDK's
rte_ring (2) before i wrote this implement. It corrects some bugs of
memory barriers in kfifo and it is the simpler lockless version of
rte_ring as currently multiple access is only allowed for producer.

If has single producer vs. single consumer, it is the traditional fifo,
If has multiple producers, it uses the algorithm as followings:

For the producer, it uses two steps to update the ring:
   - first step, occupy the entry in the ring:

retry:
      in = ring->in
      if (cmpxhg(&ring->in, in, in +1) != in)
            goto retry;

     after that the entry pointed by ring->data[in] has been owned by
     the producer.

     assert(ring->data[in] == NULL);

     Note, no other producer can touch this entry so that this entry
     should always be the initialized state.

   - second step, write the data to the entry:

     ring->data[in] = data;

For the consumer, it first checks if there is available entry in the
ring and fetches the entry from the ring:

     if (!ring_is_empty(ring))
          entry = &ring[ring->out];

     Note: the ring->out has not been updated so that the entry pointed
     by ring->out is completely owned by the consumer.

Then it checks if the data is ready:

retry:
     if (*entry == NULL)
            goto retry;
That means, the producer has updated the index but haven't written any
data to it.

Finally, it fetches the valid data out, set the entry to the initialized
state and update ring->out to make the entry be usable to the producer:

      data = *entry;
      *entry = NULL;
      ring->out++;

Memory barrier is omitted here, please refer to the comment in the code.

(1) https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
(2) http://dpdk.org/doc/api/rte__ring_8h.html

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 migration/ring.h | 265 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 265 insertions(+)
 create mode 100644 migration/ring.h

Comments

Peter Xu June 20, 2018, 4:52 a.m. UTC | #1
On Mon, Jun 04, 2018 at 05:55:17PM +0800, guangrong.xiao@gmail.com wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> It's the simple lockless ring buffer implement which supports both
> single producer vs. single consumer and multiple producers vs.
> single consumer.
> 
> Many lessons were learned from Linux Kernel's kfifo (1) and DPDK's
> rte_ring (2) before i wrote this implement. It corrects some bugs of
> memory barriers in kfifo and it is the simpler lockless version of
> rte_ring as currently multiple access is only allowed for producer.

Could you provide some more information about the kfifo bug?  Any
pointer would be appreciated.

> 
> If has single producer vs. single consumer, it is the traditional fifo,
> If has multiple producers, it uses the algorithm as followings:
> 
> For the producer, it uses two steps to update the ring:
>    - first step, occupy the entry in the ring:
> 
> retry:
>       in = ring->in
>       if (cmpxhg(&ring->in, in, in +1) != in)
>             goto retry;
> 
>      after that the entry pointed by ring->data[in] has been owned by
>      the producer.
> 
>      assert(ring->data[in] == NULL);
> 
>      Note, no other producer can touch this entry so that this entry
>      should always be the initialized state.
> 
>    - second step, write the data to the entry:
> 
>      ring->data[in] = data;
> 
> For the consumer, it first checks if there is available entry in the
> ring and fetches the entry from the ring:
> 
>      if (!ring_is_empty(ring))
>           entry = &ring[ring->out];
> 
>      Note: the ring->out has not been updated so that the entry pointed
>      by ring->out is completely owned by the consumer.
> 
> Then it checks if the data is ready:
> 
> retry:
>      if (*entry == NULL)
>             goto retry;
> That means, the producer has updated the index but haven't written any
> data to it.
> 
> Finally, it fetches the valid data out, set the entry to the initialized
> state and update ring->out to make the entry be usable to the producer:
> 
>       data = *entry;
>       *entry = NULL;
>       ring->out++;
> 
> Memory barrier is omitted here, please refer to the comment in the code.
> 
> (1) https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
> (2) http://dpdk.org/doc/api/rte__ring_8h.html
> 
> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> ---
>  migration/ring.h | 265 +++++++++++++++++++++++++++++++++++++++++++++++++++++++

If this is a very general implementation, not sure whether we can move
this to util/ directory so that it can be used even outside migration
codes.

>  1 file changed, 265 insertions(+)
>  create mode 100644 migration/ring.h
> 
> diff --git a/migration/ring.h b/migration/ring.h
> new file mode 100644
> index 0000000000..da9b8bdcbb
> --- /dev/null
> +++ b/migration/ring.h
> @@ -0,0 +1,265 @@
> +/*
> + * Ring Buffer
> + *
> + * Multiple producers and single consumer are supported with lock free.
> + *
> + * Copyright (c) 2018 Tencent Inc
> + *
> + * Authors:
> + *  Xiao Guangrong <xiaoguangrong@tencent.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + */
> +
> +#ifndef _RING__
> +#define _RING__
> +
> +#define CACHE_LINE  64

Is this for x86_64?  Is the cache line size the same for all arch?

> +#define cache_aligned __attribute__((__aligned__(CACHE_LINE)))
> +
> +#define RING_MULTI_PRODUCER 0x1
> +
> +struct Ring {
> +    unsigned int flags;
> +    unsigned int size;
> +    unsigned int mask;
> +
> +    unsigned int in cache_aligned;
> +
> +    unsigned int out cache_aligned;
> +
> +    void *data[0] cache_aligned;
> +};
> +typedef struct Ring Ring;
> +
> +/*
> + * allocate and initialize the ring
> + *
> + * @size: the number of element, it should be power of 2
> + * @flags: set to RING_MULTI_PRODUCER if the ring has multiple producer,
> + *         otherwise set it to 0, i,e. single producer and single consumer.
> + *
> + * return the ring.
> + */
> +static inline Ring *ring_alloc(unsigned int size, unsigned int flags)
> +{
> +    Ring *ring;
> +
> +    assert(is_power_of_2(size));
> +
> +    ring = g_malloc0(sizeof(*ring) + size * sizeof(void *));
> +    ring->size = size;
> +    ring->mask = ring->size - 1;
> +    ring->flags = flags;
> +    return ring;
> +}
> +
> +static inline void ring_free(Ring *ring)
> +{
> +    g_free(ring);
> +}
> +
> +static inline bool __ring_is_empty(unsigned int in, unsigned int out)
> +{
> +    return in == out;
> +}

(some of the helpers are a bit confusing to me like this one; I would
 prefer some of the helpers be directly squashed into code, but it's a
 personal preference only)

> +
> +static inline bool ring_is_empty(Ring *ring)
> +{
> +    return ring->in == ring->out;
> +}
> +
> +static inline unsigned int ring_len(unsigned int in, unsigned int out)
> +{
> +    return in - out;
> +}

(this too)

> +
> +static inline bool
> +__ring_is_full(Ring *ring, unsigned int in, unsigned int out)
> +{
> +    return ring_len(in, out) > ring->mask;
> +}
> +
> +static inline bool ring_is_full(Ring *ring)
> +{
> +    return __ring_is_full(ring, ring->in, ring->out);
> +}
> +
> +static inline unsigned int ring_index(Ring *ring, unsigned int pos)
> +{
> +    return pos & ring->mask;
> +}
> +
> +static inline int __ring_put(Ring *ring, void *data)
> +{
> +    unsigned int index, out;
> +
> +    out = atomic_load_acquire(&ring->out);
> +    /*
> +     * smp_mb()
> +     *
> +     * should read ring->out before updating the entry, see the comments in
> +     * __ring_get().

Nit: here I think it means the comment in [1] below.  Maybe:

  "see the comments in __ring_get() when calling
   atomic_store_release()"

?

> +     */
> +
> +    if (__ring_is_full(ring, ring->in, out)) {
> +        return -ENOBUFS;
> +    }
> +
> +    index = ring_index(ring, ring->in);
> +
> +    atomic_set(&ring->data[index], data);
> +
> +    /*
> +     * should make sure the entry is updated before increasing ring->in
> +     * otherwise the consumer will get a entry but its content is useless.
> +     */
> +    smp_wmb();
> +    atomic_set(&ring->in, ring->in + 1);

Pure question: could we use store_release() instead of a mixture of
store/release and raw memory barriers in the function?  Or is there
any performance consideration behind?

It'll be nice to mention the performance considerations if there is.

> +    return 0;
> +}
> +
> +static inline void *__ring_get(Ring *ring)
> +{
> +    unsigned int index, in;
> +    void *data;
> +
> +    in = atomic_read(&ring->in);
> +
> +    /*
> +     * should read ring->in first to make sure the entry pointed by this
> +     * index is available, see the comments in __ring_put().
> +     */

Nit: similar to above, maybe mention about which comment would be a
bit nicer.

> +    smp_rmb();
> +    if (__ring_is_empty(in, ring->out)) {
> +        return NULL;
> +    }
> +
> +    index = ring_index(ring, ring->out);
> +
> +    data = atomic_read(&ring->data[index]);
> +
> +    /*
> +     * smp_mb()
> +     *
> +     * once the ring->out is updated the entry originally indicated by the
> +     * the index is visible and usable to the producer so that we should
> +     * make sure reading the entry out before updating ring->out to avoid
> +     * the entry being overwritten by the producer.
> +     */
> +    atomic_store_release(&ring->out, ring->out + 1);

[1]

> +
> +    return data;
> +}

Regards,
Peter Xu June 20, 2018, 5:55 a.m. UTC | #2
On Mon, Jun 04, 2018 at 05:55:17PM +0800, guangrong.xiao@gmail.com wrote:

[...]

(Some more comments/questions for the MP implementation...)

> +static inline int ring_mp_put(Ring *ring, void *data)
> +{
> +    unsigned int index, in, in_next, out;
> +
> +    do {
> +        in = atomic_read(&ring->in);
> +        out = atomic_read(&ring->out);

[0]

Do we need to fetch "out" with load_acquire()?  Otherwise what's the
pairing of below store_release() at [1]?

This barrier exists in SP-SC case which makes sense to me, I assume
that's also needed for MP-SC case, am I right?

> +
> +        if (__ring_is_full(ring, in, out)) {
> +            if (atomic_read(&ring->in) == in &&
> +                atomic_read(&ring->out) == out) {

Why read again?  After all the ring API seems to be designed as
non-blocking.  E.g., I see the poll at [2] below makes more sense
since when reaches [2] it means that there must be a producer that is
_doing_ the queuing, so polling is very possible to complete fast.
However here it seems to be a pure busy poll without any hint.  Then
not sure whether we should just let the caller decide whether it wants
to call ring_put() again.

> +                return -ENOBUFS;
> +            }
> +
> +            /* a entry has been fetched out, retry. */
> +            continue;
> +        }
> +
> +        in_next = in + 1;
> +    } while (atomic_cmpxchg(&ring->in, in, in_next) != in);
> +
> +    index = ring_index(ring, in);
> +
> +    /*
> +     * smp_rmb() paired with the memory barrier of (A) in ring_mp_get()
> +     * is implied in atomic_cmpxchg() as we should read ring->out first
> +     * before fetching the entry, otherwise this assert will fail.

Thanks for all these comments!  These are really helpful for
reviewers.

However I'm not sure whether I understand it correctly here on MB of
(A) for ring_mp_get() - AFAIU that should corresponds to a smp_rmb()
at [0] above when reading the "out" variable rather than this
assertion, and that's why I thought at [0] we should have something
like a load_acquire() there (which contains a rmb()).

From content-wise, I think the code here is correct, since
atomic_cmpxchg() should have one implicit smp_mb() after all so we
don't need anything further barriers here.

> +     */
> +    assert(!atomic_read(&ring->data[index]));
> +
> +    /*
> +     * smp_mb() paired with the memory barrier of (B) in ring_mp_get() is
> +     * implied in atomic_cmpxchg(), that is needed here as  we should read
> +     * ring->out before updating the entry, it is the same as we did in
> +     * __ring_put().
> +     *
> +     * smp_wmb() paired with the memory barrier of (C) in ring_mp_get()
> +     * is implied in atomic_cmpxchg(), that is needed as we should increase
> +     * ring->in before updating the entry.
> +     */
> +    atomic_set(&ring->data[index], data);
> +
> +    return 0;
> +}
> +
> +static inline void *ring_mp_get(Ring *ring)
> +{
> +    unsigned int index, in;
> +    void *data;
> +
> +    do {
> +        in = atomic_read(&ring->in);
> +
> +        /*
> +         * (C) should read ring->in first to make sure the entry pointed by this
> +         * index is available
> +         */
> +        smp_rmb();
> +
> +        if (!__ring_is_empty(in, ring->out)) {
> +            break;
> +        }
> +
> +        if (atomic_read(&ring->in) == in) {
> +            return NULL;
> +        }
> +        /* new entry has been added in, retry. */
> +    } while (1);
> +
> +    index = ring_index(ring, ring->out);
> +
> +    do {
> +        data = atomic_read(&ring->data[index]);
> +        if (data) {
> +            break;
> +        }
> +        /* the producer is updating the entry, retry */
> +        cpu_relax();

[2]

> +    } while (1);
> +
> +    atomic_set(&ring->data[index], NULL);
> +
> +    /*
> +     * (B) smp_mb() is needed as we should read the entry out before
> +     * updating ring->out as we did in __ring_get().
> +     *
> +     * (A) smp_wmb() is needed as we should make the entry be NULL before
> +     * updating ring->out (which will make the entry be visible and usable).
> +     */
> +    atomic_store_release(&ring->out, ring->out + 1);

[1]

> +
> +    return data;
> +}
> +
> +static inline int ring_put(Ring *ring, void *data)
> +{
> +    if (ring->flags & RING_MULTI_PRODUCER) {
> +        return ring_mp_put(ring, data);
> +    }
> +    return __ring_put(ring, data);
> +}
> +
> +static inline void *ring_get(Ring *ring)
> +{
> +    if (ring->flags & RING_MULTI_PRODUCER) {
> +        return ring_mp_get(ring);
> +    }
> +    return __ring_get(ring);
> +}
> +#endif
> -- 
> 2.14.4
> 

Thanks,
Michael S. Tsirkin June 20, 2018, 12:38 p.m. UTC | #3
On Mon, Jun 04, 2018 at 05:55:17PM +0800, guangrong.xiao@gmail.com wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> It's the simple lockless ring buffer implement which supports both
> single producer vs. single consumer and multiple producers vs.
> single consumer.
> 
> Many lessons were learned from Linux Kernel's kfifo (1) and DPDK's
> rte_ring (2) before i wrote this implement. It corrects some bugs of
> memory barriers in kfifo and it is the simpler lockless version of
> rte_ring as currently multiple access is only allowed for producer.
> 
> If has single producer vs. single consumer, it is the traditional fifo,
> If has multiple producers, it uses the algorithm as followings:
> 
> For the producer, it uses two steps to update the ring:
>    - first step, occupy the entry in the ring:
> 
> retry:
>       in = ring->in
>       if (cmpxhg(&ring->in, in, in +1) != in)
>             goto retry;
> 
>      after that the entry pointed by ring->data[in] has been owned by
>      the producer.
> 
>      assert(ring->data[in] == NULL);
> 
>      Note, no other producer can touch this entry so that this entry
>      should always be the initialized state.
> 
>    - second step, write the data to the entry:
> 
>      ring->data[in] = data;
> 
> For the consumer, it first checks if there is available entry in the
> ring and fetches the entry from the ring:
> 
>      if (!ring_is_empty(ring))
>           entry = &ring[ring->out];
> 
>      Note: the ring->out has not been updated so that the entry pointed
>      by ring->out is completely owned by the consumer.
> 
> Then it checks if the data is ready:
> 
> retry:
>      if (*entry == NULL)
>             goto retry;
> That means, the producer has updated the index but haven't written any
> data to it.
> 
> Finally, it fetches the valid data out, set the entry to the initialized
> state and update ring->out to make the entry be usable to the producer:
> 
>       data = *entry;
>       *entry = NULL;
>       ring->out++;
> 
> Memory barrier is omitted here, please refer to the comment in the code.
>
>
>
> (1) https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
> (2) http://dpdk.org/doc/api/rte__ring_8h.html
> 
> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>

So instead of all this super-optimized trickiness, how about
a simple port of ptr_ring from linux?

That one isn't lockless but it's known to outperform
most others for a single producer/single consumer case.
And with a ton of networking going on,
who said it's such a hot spot? OTOH this implementation
has more barriers which slows down each individual thread.
It's also a source of bugs.

Further, atomic tricks this one uses are not fair so some threads can get
completely starved while others make progress. There's also no
chance to mix aggressive polling and sleeping with this
kind of scheme, so the starved thread will consume lots of
CPU.

So I'd like to see a simple ring used, and then a patch on top
switching to this tricky one with performance comparison
along with that.

> ---
>  migration/ring.h | 265 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 265 insertions(+)
>  create mode 100644 migration/ring.h
> 
> diff --git a/migration/ring.h b/migration/ring.h
> new file mode 100644
> index 0000000000..da9b8bdcbb
> --- /dev/null
> +++ b/migration/ring.h
> @@ -0,0 +1,265 @@
> +/*
> + * Ring Buffer
> + *
> + * Multiple producers and single consumer are supported with lock free.
> + *
> + * Copyright (c) 2018 Tencent Inc
> + *
> + * Authors:
> + *  Xiao Guangrong <xiaoguangrong@tencent.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + */
> +
> +#ifndef _RING__
> +#define _RING__

Prefix Ring is too short.


> +
> +#define CACHE_LINE  64
> +#define cache_aligned __attribute__((__aligned__(CACHE_LINE)))
> +
> +#define RING_MULTI_PRODUCER 0x1
> +
> +struct Ring {
> +    unsigned int flags;
> +    unsigned int size;
> +    unsigned int mask;
> +
> +    unsigned int in cache_aligned;
> +
> +    unsigned int out cache_aligned;
> +
> +    void *data[0] cache_aligned;
> +};
> +typedef struct Ring Ring;
> +
> +/*
> + * allocate and initialize the ring
> + *
> + * @size: the number of element, it should be power of 2
> + * @flags: set to RING_MULTI_PRODUCER if the ring has multiple producer,
> + *         otherwise set it to 0, i,e. single producer and single consumer.
> + *
> + * return the ring.
> + */
> +static inline Ring *ring_alloc(unsigned int size, unsigned int flags)
> +{
> +    Ring *ring;
> +
> +    assert(is_power_of_2(size));
> +
> +    ring = g_malloc0(sizeof(*ring) + size * sizeof(void *));
> +    ring->size = size;
> +    ring->mask = ring->size - 1;
> +    ring->flags = flags;
> +    return ring;
> +}
> +
> +static inline void ring_free(Ring *ring)
> +{
> +    g_free(ring);
> +}
> +
> +static inline bool __ring_is_empty(unsigned int in, unsigned int out)
> +{
> +    return in == out;
> +}
> +
> +static inline bool ring_is_empty(Ring *ring)
> +{
> +    return ring->in == ring->out;
> +}
> +
> +static inline unsigned int ring_len(unsigned int in, unsigned int out)
> +{
> +    return in - out;
> +}
> +
> +static inline bool
> +__ring_is_full(Ring *ring, unsigned int in, unsigned int out)
> +{
> +    return ring_len(in, out) > ring->mask;
> +}
> +
> +static inline bool ring_is_full(Ring *ring)
> +{
> +    return __ring_is_full(ring, ring->in, ring->out);
> +}
> +
> +static inline unsigned int ring_index(Ring *ring, unsigned int pos)
> +{
> +    return pos & ring->mask;
> +}
> +
> +static inline int __ring_put(Ring *ring, void *data)
> +{
> +    unsigned int index, out;
> +
> +    out = atomic_load_acquire(&ring->out);
> +    /*
> +     * smp_mb()
> +     *
> +     * should read ring->out before updating the entry, see the comments in
> +     * __ring_get().
> +     */
> +
> +    if (__ring_is_full(ring, ring->in, out)) {
> +        return -ENOBUFS;
> +    }
> +
> +    index = ring_index(ring, ring->in);
> +
> +    atomic_set(&ring->data[index], data);
> +
> +    /*
> +     * should make sure the entry is updated before increasing ring->in
> +     * otherwise the consumer will get a entry but its content is useless.
> +     */
> +    smp_wmb();
> +    atomic_set(&ring->in, ring->in + 1);
> +    return 0;
> +}
> +
> +static inline void *__ring_get(Ring *ring)
> +{
> +    unsigned int index, in;
> +    void *data;
> +
> +    in = atomic_read(&ring->in);
> +
> +    /*
> +     * should read ring->in first to make sure the entry pointed by this
> +     * index is available, see the comments in __ring_put().
> +     */
> +    smp_rmb();
> +    if (__ring_is_empty(in, ring->out)) {
> +        return NULL;
> +    }
> +
> +    index = ring_index(ring, ring->out);
> +
> +    data = atomic_read(&ring->data[index]);
> +
> +    /*
> +     * smp_mb()
> +     *
> +     * once the ring->out is updated the entry originally indicated by the
> +     * the index is visible and usable to the producer so that we should
> +     * make sure reading the entry out before updating ring->out to avoid
> +     * the entry being overwritten by the producer.
> +     */
> +    atomic_store_release(&ring->out, ring->out + 1);
> +
> +    return data;
> +}
> +
> +static inline int ring_mp_put(Ring *ring, void *data)
> +{
> +    unsigned int index, in, in_next, out;
> +
> +    do {
> +        in = atomic_read(&ring->in);
> +        out = atomic_read(&ring->out);
> +
> +        if (__ring_is_full(ring, in, out)) {
> +            if (atomic_read(&ring->in) == in &&
> +                atomic_read(&ring->out) == out) {
> +                return -ENOBUFS;
> +            }
> +
> +            /* a entry has been fetched out, retry. */
> +            continue;
> +        }
> +
> +        in_next = in + 1;
> +    } while (atomic_cmpxchg(&ring->in, in, in_next) != in);
> +
> +    index = ring_index(ring, in);
> +
> +    /*
> +     * smp_rmb() paired with the memory barrier of (A) in ring_mp_get()
> +     * is implied in atomic_cmpxchg() as we should read ring->out first
> +     * before fetching the entry, otherwise this assert will fail.
> +     */
> +    assert(!atomic_read(&ring->data[index]));
> +
> +    /*
> +     * smp_mb() paired with the memory barrier of (B) in ring_mp_get() is
> +     * implied in atomic_cmpxchg(), that is needed here as  we should read
> +     * ring->out before updating the entry, it is the same as we did in
> +     * __ring_put().
> +     *
> +     * smp_wmb() paired with the memory barrier of (C) in ring_mp_get()
> +     * is implied in atomic_cmpxchg(), that is needed as we should increase
> +     * ring->in before updating the entry.
> +     */
> +    atomic_set(&ring->data[index], data);
> +
> +    return 0;
> +}
> +
> +static inline void *ring_mp_get(Ring *ring)
> +{
> +    unsigned int index, in;
> +    void *data;
> +
> +    do {
> +        in = atomic_read(&ring->in);
> +
> +        /*
> +         * (C) should read ring->in first to make sure the entry pointed by this
> +         * index is available
> +         */
> +        smp_rmb();
> +
> +        if (!__ring_is_empty(in, ring->out)) {
> +            break;
> +        }
> +
> +        if (atomic_read(&ring->in) == in) {
> +            return NULL;
> +        }
> +        /* new entry has been added in, retry. */
> +    } while (1);
> +
> +    index = ring_index(ring, ring->out);
> +
> +    do {
> +        data = atomic_read(&ring->data[index]);
> +        if (data) {
> +            break;
> +        }
> +        /* the producer is updating the entry, retry */
> +        cpu_relax();
> +    } while (1);
> +
> +    atomic_set(&ring->data[index], NULL);
> +
> +    /*
> +     * (B) smp_mb() is needed as we should read the entry out before
> +     * updating ring->out as we did in __ring_get().
> +     *
> +     * (A) smp_wmb() is needed as we should make the entry be NULL before
> +     * updating ring->out (which will make the entry be visible and usable).
> +     */

I can't say I understand this all.
And the interaction of acquire/release semantics with smp_*
barriers is even scarier.

> +    atomic_store_release(&ring->out, ring->out + 1);
> +
> +    return data;
> +}
> +
> +static inline int ring_put(Ring *ring, void *data)
> +{
> +    if (ring->flags & RING_MULTI_PRODUCER) {
> +        return ring_mp_put(ring, data);
> +    }
> +    return __ring_put(ring, data);
> +}
> +
> +static inline void *ring_get(Ring *ring)
> +{
> +    if (ring->flags & RING_MULTI_PRODUCER) {
> +        return ring_mp_get(ring);
> +    }
> +    return __ring_get(ring);
> +}
> +#endif


A bunch of tricky barriers retries etc all over the place.  This sorely
needs *a lot of* unit tests. Where are they?



> -- 
> 2.14.4
Xiao Guangrong June 28, 2018, 10:02 a.m. UTC | #4
CC: Paul, Peter Zijlstra, Stefani, Lai who are all good at memory barrier.


On 06/20/2018 12:52 PM, Peter Xu wrote:
> On Mon, Jun 04, 2018 at 05:55:17PM +0800, guangrong.xiao@gmail.com wrote:
>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>
>> It's the simple lockless ring buffer implement which supports both
>> single producer vs. single consumer and multiple producers vs.
>> single consumer.
>>
>> Many lessons were learned from Linux Kernel's kfifo (1) and DPDK's
>> rte_ring (2) before i wrote this implement. It corrects some bugs of
>> memory barriers in kfifo and it is the simpler lockless version of
>> rte_ring as currently multiple access is only allowed for producer.
> 
> Could you provide some more information about the kfifo bug?  Any
> pointer would be appreciated.
> 

Sure, i reported one of the memory barrier issue to linux kernel:
    https://lkml.org/lkml/2018/5/11/58

Actually, beside that, there is another memory barrier issue in kfifo,
please consider this case:

    at the beginning
    ring->size = 4
    ring->out = 0
    ring->in = 4

      Consumer                            Producer
  ---------------                     --------------
    index = ring->out; /* index == 0 */
    ring->out++; /* ring->out == 1 */
    < Re-Order >
                                     out = ring->out;
                                     if (ring->in - out >= ring->mask)
                                         return -EFULL;
                                     /* see the ring is not full */
                                     index = ring->in & ring->mask; /* index == 0 */
                                     ring->data[index] = new_data;
                     ring->in++;

    data = ring->data[index];
    !!!!!! the old data is lost !!!!!!

So we need to make sure:
1) for the consumer, we should read the ring->data[] out before updating ring->out
2) for the producer, we should read ring->out before updating ring->data[]

as followings:
       Producer                                       Consumer
   ------------------------------------         ------------------------
       Reading ring->out                            Reading ring->data[index]
       smp_mb()                                     smp_mb()
       Setting ring->data[index] = data             ring->out++

[ i used atomic_store_release() and atomic_load_acquire() instead of smp_mb() in the
   patch. ]

But i am not sure if we can use smp_acquire__after_ctrl_dep() in the producer?

>>
>> If has single producer vs. single consumer, it is the traditional fifo,
>> If has multiple producers, it uses the algorithm as followings:
>>
>> For the producer, it uses two steps to update the ring:
>>     - first step, occupy the entry in the ring:
>>
>> retry:
>>        in = ring->in
>>        if (cmpxhg(&ring->in, in, in +1) != in)
>>              goto retry;
>>
>>       after that the entry pointed by ring->data[in] has been owned by
>>       the producer.
>>
>>       assert(ring->data[in] == NULL);
>>
>>       Note, no other producer can touch this entry so that this entry
>>       should always be the initialized state.
>>
>>     - second step, write the data to the entry:
>>
>>       ring->data[in] = data;
>>
>> For the consumer, it first checks if there is available entry in the
>> ring and fetches the entry from the ring:
>>
>>       if (!ring_is_empty(ring))
>>            entry = &ring[ring->out];
>>
>>       Note: the ring->out has not been updated so that the entry pointed
>>       by ring->out is completely owned by the consumer.
>>
>> Then it checks if the data is ready:
>>
>> retry:
>>       if (*entry == NULL)
>>              goto retry;
>> That means, the producer has updated the index but haven't written any
>> data to it.
>>
>> Finally, it fetches the valid data out, set the entry to the initialized
>> state and update ring->out to make the entry be usable to the producer:
>>
>>        data = *entry;
>>        *entry = NULL;
>>        ring->out++;
>>
>> Memory barrier is omitted here, please refer to the comment in the code.
>>
>> (1) https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
>> (2) http://dpdk.org/doc/api/rte__ring_8h.html
>>
>> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
>> ---
>>   migration/ring.h | 265 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
> 
> If this is a very general implementation, not sure whether we can move
> this to util/ directory so that it can be used even outside migration
> codes.

I thought it too. Currently migration is the only user for it, so i put
it near the code of migration. It's good to me to move it to util/ if you
prefer.

> 
>>   1 file changed, 265 insertions(+)
>>   create mode 100644 migration/ring.h
>>
>> diff --git a/migration/ring.h b/migration/ring.h
>> new file mode 100644
>> index 0000000000..da9b8bdcbb
>> --- /dev/null
>> +++ b/migration/ring.h
>> @@ -0,0 +1,265 @@
>> +/*
>> + * Ring Buffer
>> + *
>> + * Multiple producers and single consumer are supported with lock free.
>> + *
>> + * Copyright (c) 2018 Tencent Inc
>> + *
>> + * Authors:
>> + *  Xiao Guangrong <xiaoguangrong@tencent.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
>> + * See the COPYING file in the top-level directory.
>> + */
>> +
>> +#ifndef _RING__
>> +#define _RING__
>> +
>> +#define CACHE_LINE  64
> 
> Is this for x86_64?  Is the cache line size the same for all arch?

64 bytes is just a common size. :)

Does QEMU support pre-configured CACHE_SIZE?

> 
>> +#define cache_aligned __attribute__((__aligned__(CACHE_LINE)))
>> +
>> +#define RING_MULTI_PRODUCER 0x1
>> +
>> +struct Ring {
>> +    unsigned int flags;
>> +    unsigned int size;
>> +    unsigned int mask;
>> +
>> +    unsigned int in cache_aligned;
>> +
>> +    unsigned int out cache_aligned;
>> +
>> +    void *data[0] cache_aligned;
>> +};
>> +typedef struct Ring Ring;
>> +
>> +/*
>> + * allocate and initialize the ring
>> + *
>> + * @size: the number of element, it should be power of 2
>> + * @flags: set to RING_MULTI_PRODUCER if the ring has multiple producer,
>> + *         otherwise set it to 0, i,e. single producer and single consumer.
>> + *
>> + * return the ring.
>> + */
>> +static inline Ring *ring_alloc(unsigned int size, unsigned int flags)
>> +{
>> +    Ring *ring;
>> +
>> +    assert(is_power_of_2(size));
>> +
>> +    ring = g_malloc0(sizeof(*ring) + size * sizeof(void *));
>> +    ring->size = size;
>> +    ring->mask = ring->size - 1;
>> +    ring->flags = flags;
>> +    return ring;
>> +}
>> +
>> +static inline void ring_free(Ring *ring)
>> +{
>> +    g_free(ring);
>> +}
>> +
>> +static inline bool __ring_is_empty(unsigned int in, unsigned int out)
>> +{
>> +    return in == out;
>> +}
> 
> (some of the helpers are a bit confusing to me like this one; I would
>   prefer some of the helpers be directly squashed into code, but it's a
>   personal preference only)
> 

I will carefully consider it in the next version...

>> +
>> +static inline bool ring_is_empty(Ring *ring)
>> +{
>> +    return ring->in == ring->out;
>> +}
>> +
>> +static inline unsigned int ring_len(unsigned int in, unsigned int out)
>> +{
>> +    return in - out;
>> +}
> 
> (this too)
> 
>> +
>> +static inline bool
>> +__ring_is_full(Ring *ring, unsigned int in, unsigned int out)
>> +{
>> +    return ring_len(in, out) > ring->mask;
>> +}
>> +
>> +static inline bool ring_is_full(Ring *ring)
>> +{
>> +    return __ring_is_full(ring, ring->in, ring->out);
>> +}
>> +
>> +static inline unsigned int ring_index(Ring *ring, unsigned int pos)
>> +{
>> +    return pos & ring->mask;
>> +}
>> +
>> +static inline int __ring_put(Ring *ring, void *data)
>> +{
>> +    unsigned int index, out;
>> +
>> +    out = atomic_load_acquire(&ring->out);
>> +    /*
>> +     * smp_mb()
>> +     *
>> +     * should read ring->out before updating the entry, see the comments in
>> +     * __ring_get().
> 
> Nit: here I think it means the comment in [1] below.  Maybe:
> 
>    "see the comments in __ring_get() when calling
>     atomic_store_release()"
> 
> ?

Yes, you are right, i will address your suggestion.

> 
>> +     */
>> +
>> +    if (__ring_is_full(ring, ring->in, out)) {
>> +        return -ENOBUFS;
>> +    }
>> +
>> +    index = ring_index(ring, ring->in);
>> +
>> +    atomic_set(&ring->data[index], data);
>> +
>> +    /*
>> +     * should make sure the entry is updated before increasing ring->in
>> +     * otherwise the consumer will get a entry but its content is useless.
>> +     */
>> +    smp_wmb();
>> +    atomic_set(&ring->in, ring->in + 1);
> 
> Pure question: could we use store_release() instead of a mixture of
> store/release and raw memory barriers in the function?  Or is there
> any performance consideration behind?
> 
> It'll be nice to mention the performance considerations if there is.

I think atomic_mb_read() and atomic_mb_set() is what you are
talking about. These operations speed up read accesses but
slow done write accesses that is not suitable for our case.

> 
>> +    return 0;
>> +}
>> +
>> +static inline void *__ring_get(Ring *ring)
>> +{
>> +    unsigned int index, in;
>> +    void *data;
>> +
>> +    in = atomic_read(&ring->in);
>> +
>> +    /*
>> +     * should read ring->in first to make sure the entry pointed by this
>> +     * index is available, see the comments in __ring_put().
>> +     */
> 
> Nit: similar to above, maybe mention about which comment would be a
> bit nicer.

Yes, will improve it.
Wang, Wei W June 28, 2018, 11:55 a.m. UTC | #5
On 06/28/2018 06:02 PM, Xiao Guangrong wrote:
>
> CC: Paul, Peter Zijlstra, Stefani, Lai who are all good at memory 
> barrier.
>
>
> On 06/20/2018 12:52 PM, Peter Xu wrote:
>> On Mon, Jun 04, 2018 at 05:55:17PM +0800, guangrong.xiao@gmail.com 
>> wrote:
>>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>>
>>> It's the simple lockless ring buffer implement which supports both
>>> single producer vs. single consumer and multiple producers vs.
>>> single consumer.
>>>
>>> Many lessons were learned from Linux Kernel's kfifo (1) and DPDK's
>>> rte_ring (2) before i wrote this implement. It corrects some bugs of
>>> memory barriers in kfifo and it is the simpler lockless version of
>>> rte_ring as currently multiple access is only allowed for producer.
>>
>> Could you provide some more information about the kfifo bug? Any
>> pointer would be appreciated.
>>
>
> Sure, i reported one of the memory barrier issue to linux kernel:
>    https://lkml.org/lkml/2018/5/11/58
>
> Actually, beside that, there is another memory barrier issue in kfifo,
> please consider this case:
>
>    at the beginning
>    ring->size = 4
>    ring->out = 0
>    ring->in = 4
>
>      Consumer                            Producer
>  ---------------                     --------------
>    index = ring->out; /* index == 0 */
>    ring->out++; /* ring->out == 1 */
>    < Re-Order >
>                                     out = ring->out;
>                                     if (ring->in - out >= ring->mask)
>                                         return -EFULL;
>                                     /* see the ring is not full */
>                                     index = ring->in & ring->mask; /* 
> index == 0 */
>                                     ring->data[index] = new_data;
>                      ring->in++;
>
>    data = ring->data[index];
>    !!!!!! the old data is lost !!!!!!
>
> So we need to make sure:
> 1) for the consumer, we should read the ring->data[] out before 
> updating ring->out
> 2) for the producer, we should read ring->out before updating 
> ring->data[]
>
> as followings:
>       Producer                                       Consumer
>   ------------------------------------ ------------------------
>       Reading ring->out                            Reading 
> ring->data[index]
>       smp_mb()                                     smp_mb()
>       Setting ring->data[index] = data ring->out++
>
> [ i used atomic_store_release() and atomic_load_acquire() instead of 
> smp_mb() in the
>   patch. ]
>
> But i am not sure if we can use smp_acquire__after_ctrl_dep() in the 
> producer?


I wonder if this could be solved by simply tweaking the above consumer 
implementation:

[1] index = ring->out;
[2] data = ring->data[index];
[3] index++;
[4] ring->out = index;

Now [2] and [3] forms a WAR dependency, which avoids the reordering.


Best,
Wei
Jason Wang June 28, 2018, 1:36 p.m. UTC | #6
On 2018年06月04日 17:55, guangrong.xiao@gmail.com wrote:
> From: Xiao Guangrong<xiaoguangrong@tencent.com>
>
> It's the simple lockless ring buffer implement which supports both
> single producer vs. single consumer and multiple producers vs.
> single consumer.
>
> Many lessons were learned from Linux Kernel's kfifo (1) and DPDK's
> rte_ring (2) before i wrote this implement. It corrects some bugs of
> memory barriers in kfifo and it is the simpler lockless version of
> rte_ring as currently multiple access is only allowed for producer.
>
> If has single producer vs. single consumer, it is the traditional fifo,
> If has multiple producers, it uses the algorithm as followings:
>
> For the producer, it uses two steps to update the ring:
>     - first step, occupy the entry in the ring:
>
> retry:
>        in = ring->in
>        if (cmpxhg(&ring->in, in, in +1) != in)
>              goto retry;
>
>       after that the entry pointed by ring->data[in] has been owned by
>       the producer.
>
>       assert(ring->data[in] == NULL);
>
>       Note, no other producer can touch this entry so that this entry
>       should always be the initialized state.
>
>     - second step, write the data to the entry:
>
>       ring->data[in] = data;
>
> For the consumer, it first checks if there is available entry in the
> ring and fetches the entry from the ring:
>
>       if (!ring_is_empty(ring))
>            entry = &ring[ring->out];
>
>       Note: the ring->out has not been updated so that the entry pointed
>       by ring->out is completely owned by the consumer.
>
> Then it checks if the data is ready:
>
> retry:
>       if (*entry == NULL)
>              goto retry;
> That means, the producer has updated the index but haven't written any
> data to it.
>
> Finally, it fetches the valid data out, set the entry to the initialized
> state and update ring->out to make the entry be usable to the producer:
>
>        data = *entry;
>        *entry = NULL;
>        ring->out++;
>
> Memory barrier is omitted here, please refer to the comment in the code.
>
> (1)https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
> (2)http://dpdk.org/doc/api/rte__ring_8h.html
>
> Signed-off-by: Xiao Guangrong<xiaoguangrong@tencent.com>
> ---

May I ask why you need a MPSC ring here? Can we just use N SPSC ring for 
submitting pages and another N SPSC ring for passing back results?

Thanks
Xiao Guangrong June 28, 2018, 2 p.m. UTC | #7
On 06/20/2018 01:55 PM, Peter Xu wrote:
> On Mon, Jun 04, 2018 at 05:55:17PM +0800, guangrong.xiao@gmail.com wrote:
> 
> [...]
> 
> (Some more comments/questions for the MP implementation...)
> 
>> +static inline int ring_mp_put(Ring *ring, void *data)
>> +{
>> +    unsigned int index, in, in_next, out;
>> +
>> +    do {
>> +        in = atomic_read(&ring->in);
>> +        out = atomic_read(&ring->out);
> 
> [0]
> 
> Do we need to fetch "out" with load_acquire()?  Otherwise what's the
> pairing of below store_release() at [1]?
> 

The barrier paired with [1] is the full barrier implied in atomic_cmpxchg(),

> This barrier exists in SP-SC case which makes sense to me, I assume
> that's also needed for MP-SC case, am I right?

We needn't put a memory here as we do not need to care the order between
these two indexes (in and out), instead, the memory barrier (and for
SP-SC as well) is used to make the order between ring->out and updating
ring->data[] as we explained in previous mail.

> 
>> +
>> +        if (__ring_is_full(ring, in, out)) {
>> +            if (atomic_read(&ring->in) == in &&
>> +                atomic_read(&ring->out) == out) {
> 
> Why read again?  After all the ring API seems to be designed as
> non-blocking.  E.g., I see the poll at [2] below makes more sense
> since when reaches [2] it means that there must be a producer that is
> _doing_ the queuing, so polling is very possible to complete fast.
> However here it seems to be a pure busy poll without any hint.  Then
> not sure whether we should just let the caller decide whether it wants
> to call ring_put() again.
> 

Without it we can easily observe a "strange" behavior that the thread will
put the result to the global ring failed even if we allocated enough room
for the global ring (its capability >= total requests), that's because
these two indexes can be updated at anytime, consider the case that multiple
get and put operations can be finished between reading ring->in and ring->out
so that very possibly ring->in can pass the value readed from ring->out.

Having this code, the negative case only happens if these two indexes (32 bits)
overflows to the same value, that can help us to catch potential bug in the
code.

>> +                return -ENOBUFS;
>> +            }
>> +
>> +            /* a entry has been fetched out, retry. */
>> +            continue;
>> +        }
>> +
>> +        in_next = in + 1;
>> +    } while (atomic_cmpxchg(&ring->in, in, in_next) != in);
>> +
>> +    index = ring_index(ring, in);
>> +
>> +    /*
>> +     * smp_rmb() paired with the memory barrier of (A) in ring_mp_get()
>> +     * is implied in atomic_cmpxchg() as we should read ring->out first
>> +     * before fetching the entry, otherwise this assert will fail.
> 
> Thanks for all these comments!  These are really helpful for
> reviewers.
> 
> However I'm not sure whether I understand it correctly here on MB of
> (A) for ring_mp_get() - AFAIU that should corresponds to a smp_rmb()
> at [0] above when reading the "out" variable rather than this
> assertion, and that's why I thought at [0] we should have something
> like a load_acquire() there (which contains a rmb()).

Memory barrier (A) in ring_mp_get() makes sure the order between:
    ring->data[index] = NULL;
    smp_wmb();
    ring->out = out + 1;

And the memory barrier at [0] makes sure the order between:
    out = ring->out;
    /* smp_rmb() */
    compxchg();
    value = ring->data[index];
    assert(value);

[ note: the assertion and reading ring->out are across cmpxchg(). ]

Did i understand your question clearly?

> 
>  From content-wise, I think the code here is correct, since
> atomic_cmpxchg() should have one implicit smp_mb() after all so we
> don't need anything further barriers here.

Yes, it is.
Xiao Guangrong June 29, 2018, 3:55 a.m. UTC | #8
On 06/28/2018 07:55 PM, Wei Wang wrote:
> On 06/28/2018 06:02 PM, Xiao Guangrong wrote:
>>
>> CC: Paul, Peter Zijlstra, Stefani, Lai who are all good at memory barrier.
>>
>>
>> On 06/20/2018 12:52 PM, Peter Xu wrote:
>>> On Mon, Jun 04, 2018 at 05:55:17PM +0800, guangrong.xiao@gmail.com wrote:
>>>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>>>
>>>> It's the simple lockless ring buffer implement which supports both
>>>> single producer vs. single consumer and multiple producers vs.
>>>> single consumer.
>>>>
>>>> Many lessons were learned from Linux Kernel's kfifo (1) and DPDK's
>>>> rte_ring (2) before i wrote this implement. It corrects some bugs of
>>>> memory barriers in kfifo and it is the simpler lockless version of
>>>> rte_ring as currently multiple access is only allowed for producer.
>>>
>>> Could you provide some more information about the kfifo bug? Any
>>> pointer would be appreciated.
>>>
>>
>> Sure, i reported one of the memory barrier issue to linux kernel:
>>    https://lkml.org/lkml/2018/5/11/58
>>
>> Actually, beside that, there is another memory barrier issue in kfifo,
>> please consider this case:
>>
>>    at the beginning
>>    ring->size = 4
>>    ring->out = 0
>>    ring->in = 4
>>
>>      Consumer                            Producer
>>  ---------------                     --------------
>>    index = ring->out; /* index == 0 */
>>    ring->out++; /* ring->out == 1 */
>>    < Re-Order >
>>                                     out = ring->out;
>>                                     if (ring->in - out >= ring->mask)
>>                                         return -EFULL;
>>                                     /* see the ring is not full */
>>                                     index = ring->in & ring->mask; /* index == 0 */
>>                                     ring->data[index] = new_data;
>>                      ring->in++;
>>
>>    data = ring->data[index];
>>    !!!!!! the old data is lost !!!!!!
>>
>> So we need to make sure:
>> 1) for the consumer, we should read the ring->data[] out before updating ring->out
>> 2) for the producer, we should read ring->out before updating ring->data[]
>>
>> as followings:
>>       Producer                                       Consumer
>>   ------------------------------------ ------------------------
>>       Reading ring->out                            Reading ring->data[index]
>>       smp_mb()                                     smp_mb()
>>       Setting ring->data[index] = data ring->out++
>>
>> [ i used atomic_store_release() and atomic_load_acquire() instead of smp_mb() in the
>>   patch. ]
>>
>> But i am not sure if we can use smp_acquire__after_ctrl_dep() in the producer?
> 
> 
> I wonder if this could be solved by simply tweaking the above consumer implementation:
> 
> [1] index = ring->out;
> [2] data = ring->data[index];
> [3] index++;
> [4] ring->out = index;
> 
> Now [2] and [3] forms a WAR dependency, which avoids the reordering.

It can not. [2] and [4] still do not any dependency, CPU and complainer can omit
the 'index'.
Xiao Guangrong June 29, 2018, 3:59 a.m. UTC | #9
On 06/28/2018 09:36 PM, Jason Wang wrote:
> 
> 
> On 2018年06月04日 17:55, guangrong.xiao@gmail.com wrote:
>> From: Xiao Guangrong<xiaoguangrong@tencent.com>
>>
>> It's the simple lockless ring buffer implement which supports both
>> single producer vs. single consumer and multiple producers vs.
>> single consumer.
>>

>> Finally, it fetches the valid data out, set the entry to the initialized
>> state and update ring->out to make the entry be usable to the producer:
>>
>>        data = *entry;
>>        *entry = NULL;
>>        ring->out++;
>>
>> Memory barrier is omitted here, please refer to the comment in the code.
>>
>> (1)https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
>> (2)http://dpdk.org/doc/api/rte__ring_8h.html
>>
>> Signed-off-by: Xiao Guangrong<xiaoguangrong@tencent.com>
>> ---
> 
> May I ask why you need a MPSC ring here? Can we just use N SPSC ring for submitting pages and another N SPSC ring for passing back results?

Sure.

We had this option in our mind, however, it is not scalable which will slow
the main thread down, instead, we'd rather to speed up main thread and move
reasonable workload to the threads.
Michael S. Tsirkin June 29, 2018, 4:23 a.m. UTC | #10
On Thu, Jun 28, 2018 at 09:36:00PM +0800, Jason Wang wrote:
> 
> 
> On 2018年06月04日 17:55, guangrong.xiao@gmail.com wrote:
> > From: Xiao Guangrong<xiaoguangrong@tencent.com>
> > 
> > It's the simple lockless ring buffer implement which supports both
> > single producer vs. single consumer and multiple producers vs.
> > single consumer.
> > 
> > Many lessons were learned from Linux Kernel's kfifo (1) and DPDK's
> > rte_ring (2) before i wrote this implement. It corrects some bugs of
> > memory barriers in kfifo and it is the simpler lockless version of
> > rte_ring as currently multiple access is only allowed for producer.
> > 
> > If has single producer vs. single consumer, it is the traditional fifo,
> > If has multiple producers, it uses the algorithm as followings:
> > 
> > For the producer, it uses two steps to update the ring:
> >     - first step, occupy the entry in the ring:
> > 
> > retry:
> >        in = ring->in
> >        if (cmpxhg(&ring->in, in, in +1) != in)
> >              goto retry;
> > 
> >       after that the entry pointed by ring->data[in] has been owned by
> >       the producer.
> > 
> >       assert(ring->data[in] == NULL);
> > 
> >       Note, no other producer can touch this entry so that this entry
> >       should always be the initialized state.
> > 
> >     - second step, write the data to the entry:
> > 
> >       ring->data[in] = data;
> > 
> > For the consumer, it first checks if there is available entry in the
> > ring and fetches the entry from the ring:
> > 
> >       if (!ring_is_empty(ring))
> >            entry = &ring[ring->out];
> > 
> >       Note: the ring->out has not been updated so that the entry pointed
> >       by ring->out is completely owned by the consumer.
> > 
> > Then it checks if the data is ready:
> > 
> > retry:
> >       if (*entry == NULL)
> >              goto retry;
> > That means, the producer has updated the index but haven't written any
> > data to it.
> > 
> > Finally, it fetches the valid data out, set the entry to the initialized
> > state and update ring->out to make the entry be usable to the producer:
> > 
> >        data = *entry;
> >        *entry = NULL;
> >        ring->out++;
> > 
> > Memory barrier is omitted here, please refer to the comment in the code.
> > 
> > (1)https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
> > (2)http://dpdk.org/doc/api/rte__ring_8h.html
> > 
> > Signed-off-by: Xiao Guangrong<xiaoguangrong@tencent.com>
> > ---
> 
> May I ask why you need a MPSC ring here? Can we just use N SPSC ring for
> submitting pages and another N SPSC ring for passing back results?
> 
> Thanks

Or just an SPSC ring + a lock.
How big of a gain is lockless access to a trivial structure
like the ring?
Jason Wang June 29, 2018, 6:15 a.m. UTC | #11
On 2018年06月29日 11:59, Xiao Guangrong wrote:
>
>
> On 06/28/2018 09:36 PM, Jason Wang wrote:
>>
>>
>> On 2018年06月04日 17:55, guangrong.xiao@gmail.com wrote:
>>> From: Xiao Guangrong<xiaoguangrong@tencent.com>
>>>
>>> It's the simple lockless ring buffer implement which supports both
>>> single producer vs. single consumer and multiple producers vs.
>>> single consumer.
>>>
>
>>> Finally, it fetches the valid data out, set the entry to the 
>>> initialized
>>> state and update ring->out to make the entry be usable to the producer:
>>>
>>>        data = *entry;
>>>        *entry = NULL;
>>>        ring->out++;
>>>
>>> Memory barrier is omitted here, please refer to the comment in the 
>>> code.
>>>
>>> (1)https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h 
>>>
>>> (2)http://dpdk.org/doc/api/rte__ring_8h.html
>>>
>>> Signed-off-by: Xiao Guangrong<xiaoguangrong@tencent.com>
>>> ---
>>
>> May I ask why you need a MPSC ring here? Can we just use N SPSC ring 
>> for submitting pages and another N SPSC ring for passing back results?
>
> Sure.
>
> We had this option in our mind, however, it is not scalable which will 
> slow
> the main thread down, instead, we'd rather to speed up main thread and 
> move
> reasonable workload to the threads.

I'm not quite understand the scalability issue here. Is it because of 
main thread need go through all N rings (which I think not)?

Thanks
Xiao Guangrong June 29, 2018, 7:30 a.m. UTC | #12
Hi Michael,

On 06/20/2018 08:38 PM, Michael S. Tsirkin wrote:
> On Mon, Jun 04, 2018 at 05:55:17PM +0800, guangrong.xiao@gmail.com wrote:
>> From: Xiao Guangrong <xiaoguangrong@tencent.com>

>>
>>
>> (1) https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
>> (2) http://dpdk.org/doc/api/rte__ring_8h.html
>>
>> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> So instead of all this super-optimized trickiness, how about
> a simple port of ptr_ring from linux?
> 
> That one isn't lockless but it's known to outperform
> most others for a single producer/single consumer case.
> And with a ton of networking going on,
> who said it's such a hot spot? OTOH this implementation
> has more barriers which slows down each individual thread.
> It's also a source of bugs.
> 

Thank you for pointing it out.

I just quickly went through the code of ptr_ring that is very nice and
really impressive. I will consider to port it to QEMU.

> Further, atomic tricks this one uses are not fair so some threads can get
> completely starved while others make progress. There's also no
> chance to mix aggressive polling and sleeping with this
> kind of scheme, so the starved thread will consume lots of
> CPU.
> 
> So I'd like to see a simple ring used, and then a patch on top
> switching to this tricky one with performance comparison
> along with that.
> 

I agree with you, i will make a version that uses a lock for multiple
producers and doing incremental optimizations based on it.

>> ---
>>   migration/ring.h | 265 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
>>   1 file changed, 265 insertions(+)
>>   create mode 100644 migration/ring.h
>>
>> diff --git a/migration/ring.h b/migration/ring.h
>> new file mode 100644
>> index 0000000000..da9b8bdcbb
>> --- /dev/null
>> +++ b/migration/ring.h
>> @@ -0,0 +1,265 @@
>> +/*
>> + * Ring Buffer
>> + *
>> + * Multiple producers and single consumer are supported with lock free.
>> + *
>> + * Copyright (c) 2018 Tencent Inc
>> + *
>> + * Authors:
>> + *  Xiao Guangrong <xiaoguangrong@tencent.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
>> + * See the COPYING file in the top-level directory.
>> + */
>> +
>> +#ifndef _RING__
>> +#define _RING__
> 
> Prefix Ring is too short.
> 

Okay, will improve it.

>> +    atomic_set(&ring->data[index], NULL);
>> +
>> +    /*
>> +     * (B) smp_mb() is needed as we should read the entry out before
>> +     * updating ring->out as we did in __ring_get().
>> +     *
>> +     * (A) smp_wmb() is needed as we should make the entry be NULL before
>> +     * updating ring->out (which will make the entry be visible and usable).
>> +     */
> 
> I can't say I understand this all.
> And the interaction of acquire/release semantics with smp_*
> barriers is even scarier.
> 

Hmm... the parallel accesses for these two indexes and the data stored
in the ring are subtle indeed. :(

>> +    atomic_store_release(&ring->out, ring->out + 1);
>> +
>> +    return data;
>> +}
>> +
>> +static inline int ring_put(Ring *ring, void *data)
>> +{
>> +    if (ring->flags & RING_MULTI_PRODUCER) {
>> +        return ring_mp_put(ring, data);
>> +    }
>> +    return __ring_put(ring, data);
>> +}
>> +
>> +static inline void *ring_get(Ring *ring)
>> +{
>> +    if (ring->flags & RING_MULTI_PRODUCER) {
>> +        return ring_mp_get(ring);
>> +    }
>> +    return __ring_get(ring);
>> +}
>> +#endif
> 
> 
> A bunch of tricky barriers retries etc all over the place.  This sorely
> needs *a lot of* unit tests. Where are they?

I used the code attached in this mail to test & benchmark the patches during
my development which does not dedicate for Ring, instead it is based
on the framework of compression.

Yes, test cases are useful and really needed, i will do it... :)
#include "qemu/osdep.h"

#include "libqtest.h"
#include <zlib.h>

#include "qemu/osdep.h"
#include <zlib.h>
#include "qemu/cutils.h"
#include "qemu/bitops.h"
#include "qemu/bitmap.h"
#include "qemu/main-loop.h"
#include "migration/ram.h"
#include "migration/migration.h"
#include "migration/register.h"
#include "migration/misc.h"
#include "migration/page_cache.h"
#include "qemu/error-report.h"
#include "qapi/error.h"
#include "qapi/qapi-events-migration.h"
#include "qapi/qmp/qerror.h"
#include "trace.h"
//#include "exec/ram_addr.h"
#include "exec/target_page.h"
#include "qemu/rcu_queue.h"
#include "migration/colo.h"
#include "migration/block.h"
#include "migration/threads.h"

#include "migration/qemu-file.h"
#include "migration/threads.h"

CompressionStats compression_counters;

#define PAGE_SIZE 4096
#define PAGE_MASK ~(PAGE_SIZE - 1)

static ssize_t test_writev_buffer(void *opaque, struct iovec *iov, int iovcnt,
                                   int64_t pos)
{
    int i, size = 0;

    for (i = 0; i < iovcnt; i++) {
        size += iov[i].iov_len;
    }
    return size;
}

static int test_fclose(void *opaque)
{
    return 0;
}

static const QEMUFileOps test_write_ops = {
    .writev_buffer  = test_writev_buffer,
    .close          = test_fclose
};

QEMUFile *dest_file;

static const QEMUFileOps empty_ops = { };

static int do_compress_ram_page(QEMUFile *f, z_stream *stream, uint8_t *ram_addr,
                                ram_addr_t offset, uint8_t *source_buf)
{
    int bytes_sent = 0, blen;
    uint8_t *p = ram_addr;

    /*
     * copy it to a internal buffer to avoid it being modified by VM
     * so that we can catch up the error during compression and
     * decompression
     */
    memcpy(source_buf, p, PAGE_SIZE);
    blen = qemu_put_compression_data(f, stream, source_buf, PAGE_SIZE);
    if (blen < 0) {
        bytes_sent = 0;
        qemu_file_set_error(dest_file, blen);
        error_report("compressed data failed!");
    } else {
        printf("Compressed size %d.\n", blen);
        bytes_sent += blen;
    }

    return bytes_sent;
}

struct CompressData {
    /* filled by migration thread.*/
    uint8_t *ram_addr;
    ram_addr_t offset;

    /* filled by compress thread. */
    QEMUFile *file;
    z_stream stream;
    uint8_t *originbuf;

    ThreadRequest data;
};
typedef struct CompressData CompressData;

static ThreadRequest *compress_thread_data_init(void)
{
    CompressData *cd = g_new0(CompressData, 1);

    cd->originbuf = g_try_malloc(PAGE_SIZE);
    if (!cd->originbuf) {
        goto exit;
    }

    if (deflateInit(&cd->stream, 1) != Z_OK) {
        g_free(cd->originbuf);
        goto exit;
    }

    cd->file = qemu_fopen_ops(NULL, &empty_ops);
    return &cd->data;

exit:
    g_free(cd);
    return NULL;
}

static void compress_thread_data_fini(ThreadRequest *data)
{
    CompressData *cd = container_of(data, CompressData, data);

    qemu_fclose(cd->file);
    deflateEnd(&cd->stream);
    g_free(cd->originbuf);
    g_free(cd);
}

static void compress_thread_data_handler(ThreadRequest *data)
{
    CompressData *cd = container_of(data, CompressData, data);

    /*
     * if compression fails, it will indicate by
     * migrate_get_current()->to_dst_file.
     */
    do_compress_ram_page(cd->file, &cd->stream, cd->ram_addr, cd->offset,
                         cd->originbuf);
}

static void compress_thread_data_done(ThreadRequest *data)
{
    CompressData *cd = container_of(data, CompressData, data);
    int bytes_xmit;

    bytes_xmit = qemu_put_qemu_file(dest_file, cd->file);
    /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
    compression_counters.reduced_size += 4096 - bytes_xmit + 8;
    compression_counters.pages++;
}

static Threads *compress_threads;

static void flush_compressed_data(void)
{
    threads_wait_done(compress_threads);
}

static void compress_threads_save_cleanup(void)
{
    if (!compress_threads) {
        return;
    }

    threads_destroy(compress_threads);
    compress_threads = NULL;
    qemu_fclose(dest_file);
    dest_file = NULL;
}

static int compress_threads_save_setup(void)
{
    dest_file = qemu_fopen_ops(NULL, &test_write_ops);
    compress_threads = threads_create(16,
                                      "compress",
                                      compress_thread_data_init,
                                      compress_thread_data_fini,
                                      compress_thread_data_handler,
                                      compress_thread_data_done);
    assert(compress_threads);
    return 0;
}

static int compress_page_with_multi_thread(uint8_t *addr)
{
    CompressData *cd;
    ThreadRequest *thread_data;
    thread_data = threads_submit_request_prepare(compress_threads);
    if (!thread_data) {
        compression_counters.busy++;
        return -1;
    }

    cd = container_of(thread_data, CompressData, data);
    cd->ram_addr = addr;
    threads_submit_request_commit(compress_threads, thread_data);
    return 1;
}

#define MEM_SIZE (30ULL << 30)
#define COUNT    5 

static void run(void)
{
    void *mem = qemu_memalign(PAGE_SIZE, MEM_SIZE);
    uint8_t *ptr = mem, *end = mem + MEM_SIZE;
    uint64_t start_time, total_time = 0, spend, total_busy = 0;
    int i;

    memset(mem, 0, MEM_SIZE);

    start_time = g_get_monotonic_time();
    for (i = 0; i < COUNT; i++) {
        ptr = mem;
	start_time = g_get_monotonic_time();
        while (ptr < end) {
            *ptr = 0x10;
            compress_page_with_multi_thread(ptr);
            ptr += PAGE_SIZE;
        }
        flush_compressed_data();
	spend = g_get_monotonic_time() - start_time;
	total_time += spend;
	printf("RUN %d: BUSY %ld Time Cost %ld.\n", i, compression_counters.busy, spend);
	total_busy += compression_counters.busy;
	compression_counters.busy = 0;
    }

    printf("AVG: BUSY %ld Time Cost %ld.\n", total_busy / COUNT, total_time / COUNT);
}

static void compare_zero_and_compression(void)
{
    ThreadRequest *data = compress_thread_data_init();
    CompressData *cd;
    uint64_t start_time, zero_time, compress_time;
    char page[PAGE_SIZE];

    if (!data) {
        printf("Init compression failed.\n");
        return;
    }

    cd = container_of(data, CompressData, data);
    cd->ram_addr = (uint8_t *)page;

    memset(page, 0, sizeof(page));
    dest_file = qemu_fopen_ops(NULL, &test_write_ops);

    start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
    buffer_is_zero(page, PAGE_SIZE);
    zero_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;

    start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
    compress_thread_data_handler(data);
    compress_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;

    printf("Zero %ld ns Compression: %ld ns.\n", zero_time, compress_time);
    compress_thread_data_fini(data);

}

static void migration_threads(void)
{
    int i;

    printf("Zero Test vs. compression.\n");
    for (i = 0; i < 10; i++) {
        compare_zero_and_compression();
    }

    printf("test migration threads.\n");
    compress_threads_save_setup();
    run();
    compress_threads_save_cleanup();
}

int main(int argc, char **argv)
{
    QTestState *s = NULL;
    int ret;

    g_test_init(&argc, &argv, NULL);

    qtest_add_func("/migration/threads", migration_threads);
    ret = g_test_run();

    if (s) {
        qtest_quit(s);
    }

    return ret;
}
Xiao Guangrong June 29, 2018, 7:44 a.m. UTC | #13
On 06/29/2018 12:23 PM, Michael S. Tsirkin wrote:
> On Thu, Jun 28, 2018 at 09:36:00PM +0800, Jason Wang wrote:
>>
>>
>> On 2018年06月04日 17:55, guangrong.xiao@gmail.com wrote:
>>> From: Xiao Guangrong<xiaoguangrong@tencent.com>
>>>

>>> Memory barrier is omitted here, please refer to the comment in the code.
>>>
>>> (1)https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
>>> (2)http://dpdk.org/doc/api/rte__ring_8h.html
>>>
>>> Signed-off-by: Xiao Guangrong<xiaoguangrong@tencent.com>
>>> ---
>>
>> May I ask why you need a MPSC ring here? Can we just use N SPSC ring for
>> submitting pages and another N SPSC ring for passing back results?
>>
>> Thanks
> 
> Or just an SPSC ring + a lock.
> How big of a gain is lockless access to a trivial structure
> like the ring?
> 

Okay, i will give a try.

BTW, we tried to use a global ring + lock for input and lockless ring for input,
the former did not show better performance. But we haven't tried to use global
ring + lock for out yet.
Xiao Guangrong June 29, 2018, 7:47 a.m. UTC | #14
On 06/29/2018 02:15 PM, Jason Wang wrote:
> 
> 
> On 2018年06月29日 11:59, Xiao Guangrong wrote:
>>
>>
>> On 06/28/2018 09:36 PM, Jason Wang wrote:
>>>
>>>
>>> On 2018年06月04日 17:55, guangrong.xiao@gmail.com wrote:
>>>> From: Xiao Guangrong<xiaoguangrong@tencent.com>
>>>>
>>>> It's the simple lockless ring buffer implement which supports both
>>>> single producer vs. single consumer and multiple producers vs.
>>>> single consumer.
>>>>
>>
>>>> Finally, it fetches the valid data out, set the entry to the initialized
>>>> state and update ring->out to make the entry be usable to the producer:
>>>>
>>>>        data = *entry;
>>>>        *entry = NULL;
>>>>        ring->out++;
>>>>
>>>> Memory barrier is omitted here, please refer to the comment in the code.
>>>>
>>>> (1)https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
>>>> (2)http://dpdk.org/doc/api/rte__ring_8h.html
>>>>
>>>> Signed-off-by: Xiao Guangrong<xiaoguangrong@tencent.com>
>>>> ---
>>>
>>> May I ask why you need a MPSC ring here? Can we just use N SPSC ring for submitting pages and another N SPSC ring for passing back results?
>>
>> Sure.
>>
>> We had this option in our mind, however, it is not scalable which will slow
>> the main thread down, instead, we'd rather to speed up main thread and move
>> reasonable workload to the threads.
> 
> I'm not quite understand the scalability issue here. Is it because of main thread need go through all N rings (which I think not)?

Yes, it is.

The main thread need to check each single thread and wait
it done one by one...
Michael S. Tsirkin June 29, 2018, 1:08 p.m. UTC | #15
On Fri, Jun 29, 2018 at 03:30:44PM +0800, Xiao Guangrong wrote:
> 
> Hi Michael,
> 
> On 06/20/2018 08:38 PM, Michael S. Tsirkin wrote:
> > On Mon, Jun 04, 2018 at 05:55:17PM +0800, guangrong.xiao@gmail.com wrote:
> > > From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> > > 
> > > 
> > > (1) https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
> > > (2) http://dpdk.org/doc/api/rte__ring_8h.html
> > > 
> > > Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> > 
> > So instead of all this super-optimized trickiness, how about
> > a simple port of ptr_ring from linux?
> > 
> > That one isn't lockless but it's known to outperform
> > most others for a single producer/single consumer case.
> > And with a ton of networking going on,
> > who said it's such a hot spot? OTOH this implementation
> > has more barriers which slows down each individual thread.
> > It's also a source of bugs.
> > 
> 
> Thank you for pointing it out.
> 
> I just quickly went through the code of ptr_ring that is very nice and
> really impressive. I will consider to port it to QEMU.

The port is pretty trivial. See below. It's a SPSC structure though.  So
you need to use it with lock.  Given the critical section is small, I
put in QmueSpin, not a mutex.  To reduce cost of locks, it helps if you
can use the batches API to consume. I assume producers can't batch
but if they can, we should add an API for that, will help too.


---

qemu/ptr_ring.h: straight port from Linux 4.17

Port done by author.

Signed-off-by: Michael S. Tsirkin <mst@redhat.com>

diff --git a/include/qemu/ptr_ring.h b/include/qemu/ptr_ring.h
new file mode 100644
index 0000000000..f7446678de
--- /dev/null
+++ b/include/qemu/ptr_ring.h
@@ -0,0 +1,464 @@
+/*
+ *	Definitions for the 'struct ptr_ring' datastructure.
+ *
+ *	Author:
+ *		Michael S. Tsirkin <mst@redhat.com>
+ *
+ *	Copyright (C) 2016 Red Hat, Inc.
+ *
+ *	This program is free software; you can redistribute it and/or modify it
+ *	under the terms of the GNU General Public License as published by the
+ *	Free Software Foundation; either version 2 of the License, or (at your
+ *	option) any later version.
+ *
+ *	This is a limited-size FIFO maintaining pointers in FIFO order, with
+ *	one CPU producing entries and another consuming entries from a FIFO.
+ *
+ *	This implementation tries to minimize cache-contention when there is a
+ *	single producer and a single consumer CPU.
+ */
+
+#ifndef QEMU_PTR_RING_H
+#define QEMU_PTR_RING_H 1
+
+#include "qemu/thread.h"
+
+#define PTR_RING_CACHE_BYTES 64
+#define PTR_RING_CACHE_ALIGNED __attribute__((__aligned__(PTR_RING_CACHE_BYTES)))
+#define PTR_RING_WRITE_ONCE(p, v) (*(volatile typeof(&(p)))(&(p)) = (v))
+#define PTR_RING_READ_ONCE(p) (*(volatile typeof(&(p)))(&(p)))
+
+struct ptr_ring {
+	int producer PTR_RING_CACHE_ALIGNED;
+	QemuSpin producer_lock;
+	int consumer_head PTR_RING_CACHE_ALIGNED; /* next valid entry */
+	int consumer_tail; /* next entry to invalidate */
+	QemuSpin consumer_lock;
+	/* Shared consumer/producer data */
+	/* Read-only by both the producer and the consumer */
+	int size PTR_RING_CACHE_ALIGNED; /* max entries in queue */
+	int batch; /* number of entries to consume in a batch */
+	void **queue;
+};
+
+/* Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax().
+ *
+ * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
+ * see e.g. ptr_ring_full.
+ */
+static inline bool __ptr_ring_full(struct ptr_ring *r)
+{
+	return r->queue[r->producer];
+}
+
+static inline bool ptr_ring_full(struct ptr_ring *r)
+{
+	bool ret;
+
+	qemu_spin_lock(&r->producer_lock);
+	ret = __ptr_ring_full(r);
+	qemu_spin_unlock(&r->producer_lock);
+
+	return ret;
+}
+
+/* Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax(). Callers must hold producer_lock.
+ * Callers are responsible for making sure pointer that is being queued
+ * points to a valid data.
+ */
+static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
+{
+	if (unlikely(!r->size) || r->queue[r->producer])
+		return -ENOSPC;
+
+	/* Make sure the pointer we are storing points to a valid data. */
+	/* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */
+	smp_wmb();
+
+	PTR_RING_WRITE_ONCE(r->queue[r->producer++], ptr);
+	if (unlikely(r->producer >= r->size))
+		r->producer = 0;
+	return 0;
+}
+
+/*
+ * Note: resize (below) nests producer lock within consumer lock, so if you
+ * consume in interrupt or BH context, you must disable interrupts/BH when
+ * calling this.
+ */
+static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
+{
+	int ret;
+
+	qemu_spin_lock(&r->producer_lock);
+	ret = __ptr_ring_produce(r, ptr);
+	qemu_spin_unlock(&r->producer_lock);
+
+	return ret;
+}
+
+static inline void *__ptr_ring_peek(struct ptr_ring *r)
+{
+	if (likely(r->size))
+		return PTR_RING_READ_ONCE(r->queue[r->consumer_head]);
+	return NULL;
+}
+
+/*
+ * Test ring empty status without taking any locks.
+ *
+ * NB: This is only safe to call if ring is never resized.
+ *
+ * However, if some other CPU consumes ring entries at the same time, the value
+ * returned is not guaranteed to be correct.
+ *
+ * In this case - to avoid incorrectly detecting the ring
+ * as empty - the CPU consuming the ring entries is responsible
+ * for either consuming all ring entries until the ring is empty,
+ * or synchronizing with some other CPU and causing it to
+ * re-test __ptr_ring_empty and/or consume the ring enteries
+ * after the synchronization point.
+ *
+ * Note: callers invoking this in a loop must use a compiler barrier,
+ * for example cpu_relax().
+ */
+static inline bool __ptr_ring_empty(struct ptr_ring *r)
+{
+	if (likely(r->size))
+		return !r->queue[PTR_RING_READ_ONCE(r->consumer_head)];
+	return true;
+}
+
+static inline bool ptr_ring_empty(struct ptr_ring *r)
+{
+	bool ret;
+
+	qemu_spin_lock(&r->consumer_lock);
+	ret = __ptr_ring_empty(r);
+	qemu_spin_unlock(&r->consumer_lock);
+
+	return ret;
+}
+
+/* Must only be called after __ptr_ring_peek returned !NULL */
+static inline void __ptr_ring_discard_one(struct ptr_ring *r)
+{
+	/* Fundamentally, what we want to do is update consumer
+	 * index and zero out the entry so producer can reuse it.
+	 * Doing it naively at each consume would be as simple as:
+	 *       consumer = r->consumer;
+	 *       r->queue[consumer++] = NULL;
+	 *       if (unlikely(consumer >= r->size))
+	 *               consumer = 0;
+	 *       r->consumer = consumer;
+	 * but that is suboptimal when the ring is full as producer is writing
+	 * out new entries in the same cache line.  Defer these updates until a
+	 * batch of entries has been consumed.
+	 */
+	/* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
+	 * to work correctly.
+	 */
+	int consumer_head = r->consumer_head;
+	int head = consumer_head++;
+
+	/* Once we have processed enough entries invalidate them in
+	 * the ring all at once so producer can reuse their space in the ring.
+	 * We also do this when we reach end of the ring - not mandatory
+	 * but helps keep the implementation simple.
+	 */
+	if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
+		     consumer_head >= r->size)) {
+		/* Zero out entries in the reverse order: this way we touch the
+		 * cache line that producer might currently be reading the last;
+		 * producer won't make progress and touch other cache lines
+		 * besides the first one until we write out all entries.
+		 */
+		while (likely(head >= r->consumer_tail))
+			r->queue[head--] = NULL;
+		r->consumer_tail = consumer_head;
+	}
+	if (unlikely(consumer_head >= r->size)) {
+		consumer_head = 0;
+		r->consumer_tail = 0;
+	}
+	/* matching READ_ONCE in __ptr_ring_empty for lockless tests */
+	PTR_RING_WRITE_ONCE(r->consumer_head, consumer_head);
+}
+
+static inline void *__ptr_ring_consume(struct ptr_ring *r)
+{
+	void *ptr;
+
+	/* The READ_ONCE in __ptr_ring_peek guarantees that anyone
+	 * accessing data through the pointer is up to date. Pairs
+	 * with smp_wmb in __ptr_ring_produce.
+	 */
+	ptr = __ptr_ring_peek(r);
+	if (ptr)
+		__ptr_ring_discard_one(r);
+
+	return ptr;
+}
+
+static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
+					     void **array, int n)
+{
+	void *ptr;
+	int i;
+
+	for (i = 0; i < n; i++) {
+		ptr = __ptr_ring_consume(r);
+		if (!ptr)
+			break;
+		array[i] = ptr;
+	}
+
+	return i;
+}
+
+/*
+ * Note: resize (below) nests producer lock within consumer lock, so if you
+ * call this in interrupt or BH context, you must disable interrupts/BH when
+ * producing.
+ */
+static inline void *ptr_ring_consume(struct ptr_ring *r)
+{
+	void *ptr;
+
+	qemu_spin_lock(&r->consumer_lock);
+	ptr = __ptr_ring_consume(r);
+	qemu_spin_unlock(&r->consumer_lock);
+
+	return ptr;
+}
+
+static inline int ptr_ring_consume_batched(struct ptr_ring *r,
+					   void **array, int n)
+{
+	int ret;
+
+	qemu_spin_lock(&r->consumer_lock);
+	ret = __ptr_ring_consume_batched(r, array, n);
+	qemu_spin_unlock(&r->consumer_lock);
+
+	return ret;
+}
+
+/* Cast to structure type and call a function without discarding from FIFO.
+ * Function must return a value.
+ * Callers must take consumer_lock.
+ */
+#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
+
+#define PTR_RING_PEEK_CALL(r, f) ({ \
+	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
+	\
+	qemu_spin_lock(&(r)->consumer_lock); \
+	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
+	qemu_spin_unlock(&(r)->consumer_lock); \
+	__PTR_RING_PEEK_CALL_v; \
+})
+
+static inline void **__ptr_ring_init_queue_alloc(unsigned int size)
+{
+	return g_try_new(void *, size);
+}
+
+static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
+{
+	r->size = size;
+	r->batch = PTR_RING_CACHE_BYTES * 2 / sizeof(*(r->queue));
+	/* We need to set batch at least to 1 to make logic
+	 * in __ptr_ring_discard_one work correctly.
+	 * Batching too much (because ring is small) would cause a lot of
+	 * burstiness. Needs tuning, for now disable batching.
+	 */
+	if (r->batch > r->size / 2 || !r->batch)
+		r->batch = 1;
+}
+
+static inline int ptr_ring_init(struct ptr_ring *r, int size)
+{
+	r->queue = __ptr_ring_init_queue_alloc(size);
+	if (!r->queue)
+		return -ENOMEM;
+
+	__ptr_ring_set_size(r, size);
+	r->producer = r->consumer_head = r->consumer_tail = 0;
+	qemu_spin_init(&r->producer_lock);
+	qemu_spin_init(&r->consumer_lock);
+
+	return 0;
+}
+
+/*
+ * Return entries into ring. Destroy entries that don't fit.
+ *
+ * Note: this is expected to be a rare slow path operation.
+ *
+ * Note: producer lock is nested within consumer lock, so if you
+ * resize you must make sure all uses nest correctly.
+ * In particular if you consume ring in interrupt or BH context, you must
+ * disable interrupts/BH when doing so.
+ */
+static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
+				      void (*destroy)(void *))
+{
+	int head;
+
+	qemu_spin_lock(&r->consumer_lock);
+	qemu_spin_lock(&r->producer_lock);
+
+	if (!r->size)
+		goto done;
+
+	/*
+	 * Clean out buffered entries (for simplicity). This way following code
+	 * can test entries for NULL and if not assume they are valid.
+	 */
+	head = r->consumer_head - 1;
+	while (likely(head >= r->consumer_tail))
+		r->queue[head--] = NULL;
+	r->consumer_tail = r->consumer_head;
+
+	/*
+	 * Go over entries in batch, start moving head back and copy entries.
+	 * Stop when we run into previously unconsumed entries.
+	 */
+	while (n) {
+		head = r->consumer_head - 1;
+		if (head < 0)
+			head = r->size - 1;
+		if (r->queue[head]) {
+			/* This batch entry will have to be destroyed. */
+			goto done;
+		}
+		r->queue[head] = batch[--n];
+		r->consumer_tail = head;
+		/* matching READ_ONCE in __ptr_ring_empty for lockless tests */
+		PTR_RING_WRITE_ONCE(r->consumer_head, head);
+	}
+
+done:
+	/* Destroy all entries left in the batch. */
+	while (n)
+		destroy(batch[--n]);
+	qemu_spin_unlock(&r->producer_lock);
+	qemu_spin_unlock(&r->consumer_lock);
+}
+
+static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
+						    int size,
+						    void (*destroy)(void *))
+{
+	int producer = 0;
+	void **old;
+	void *ptr;
+
+	while ((ptr = __ptr_ring_consume(r)))
+		if (producer < size)
+			queue[producer++] = ptr;
+		else if (destroy)
+			destroy(ptr);
+
+	__ptr_ring_set_size(r, size);
+	r->producer = producer;
+	r->consumer_head = 0;
+	r->consumer_tail = 0;
+	old = r->queue;
+	r->queue = queue;
+
+	return old;
+}
+
+/*
+ * Note: producer lock is nested within consumer lock, so if you
+ * resize you must make sure all uses nest correctly.
+ * In particular if you consume ring in interrupt or BH context, you must
+ * disable interrupts/BH when doing so.
+ */
+static inline int ptr_ring_resize(struct ptr_ring *r, int size,
+				  void (*destroy)(void *))
+{
+	void **queue = __ptr_ring_init_queue_alloc(size);
+	void **old;
+
+	if (!queue)
+		return -ENOMEM;
+
+	qemu_spin_lock(&(r)->consumer_lock);
+	qemu_spin_lock(&(r)->producer_lock);
+
+	old = __ptr_ring_swap_queue(r, queue, size, destroy);
+
+	qemu_spin_unlock(&(r)->producer_lock);
+	qemu_spin_unlock(&(r)->consumer_lock);
+
+	g_free(old);
+
+	return 0;
+}
+
+/*
+ * Note: producer lock is nested within consumer lock, so if you
+ * resize you must make sure all uses nest correctly.
+ * In particular if you consume ring in interrupt or BH context, you must
+ * disable interrupts/BH when doing so.
+ */
+static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
+					   unsigned int nrings,
+					   int size,
+					   void (*destroy)(void *))
+{
+	void ***queues;
+	int i;
+
+	queues = g_try_new(void **, nrings);
+	if (!queues)
+		goto noqueues;
+
+	for (i = 0; i < nrings; ++i) {
+		queues[i] = __ptr_ring_init_queue_alloc(size);
+		if (!queues[i])
+			goto nomem;
+	}
+
+	for (i = 0; i < nrings; ++i) {
+		qemu_spin_lock(&(rings[i])->consumer_lock);
+		qemu_spin_lock(&(rings[i])->producer_lock);
+		queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
+						  size, destroy);
+		qemu_spin_unlock(&(rings[i])->producer_lock);
+		qemu_spin_unlock(&(rings[i])->consumer_lock);
+	}
+
+	for (i = 0; i < nrings; ++i)
+		g_free(queues[i]);
+
+	g_free(queues);
+
+	return 0;
+
+nomem:
+	while (--i >= 0)
+		g_free(queues[i]);
+
+	g_free(queues);
+
+noqueues:
+	return -ENOMEM;
+}
+
+static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
+{
+	void *ptr;
+
+	if (destroy)
+		while ((ptr = ptr_ring_consume(r)))
+			destroy(ptr);
+	g_free(r->queue);
+}
+
+#endif /* _LINUX_PTR_RING_H  */
Xiao Guangrong July 3, 2018, 7:31 a.m. UTC | #16
On 06/29/2018 09:08 PM, Michael S. Tsirkin wrote:
> On Fri, Jun 29, 2018 at 03:30:44PM +0800, Xiao Guangrong wrote:
>>
>> Hi Michael,
>>
>> On 06/20/2018 08:38 PM, Michael S. Tsirkin wrote:
>>> On Mon, Jun 04, 2018 at 05:55:17PM +0800, guangrong.xiao@gmail.com wrote:
>>>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>
>>>>
>>>>
>>>> (1) https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
>>>> (2) http://dpdk.org/doc/api/rte__ring_8h.html
>>>>
>>>> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
>>>
>>> So instead of all this super-optimized trickiness, how about
>>> a simple port of ptr_ring from linux?
>>>
>>> That one isn't lockless but it's known to outperform
>>> most others for a single producer/single consumer case.
>>> And with a ton of networking going on,
>>> who said it's such a hot spot? OTOH this implementation
>>> has more barriers which slows down each individual thread.
>>> It's also a source of bugs.
>>>
>>
>> Thank you for pointing it out.
>>
>> I just quickly went through the code of ptr_ring that is very nice and
>> really impressive. I will consider to port it to QEMU.
> 
> The port is pretty trivial. See below. It's a SPSC structure though.  So
> you need to use it with lock.  Given the critical section is small, I

Why put these locks into this common struct? For our case, each thread
has its own ring which is SCSP, no lock is needed at all. Atomic operations
still slow things down, see [PATCH 07/12] migration: hold the lock only if
it is really needed. I'd move the inner locks to the user instead.
Paul E. McKenney July 3, 2018, 3:55 p.m. UTC | #17
On Fri, Jun 29, 2018 at 11:55:08AM +0800, Xiao Guangrong wrote:
> 
> 
> On 06/28/2018 07:55 PM, Wei Wang wrote:
> >On 06/28/2018 06:02 PM, Xiao Guangrong wrote:
> >>
> >>CC: Paul, Peter Zijlstra, Stefani, Lai who are all good at memory barrier.
> >>
> >>
> >>On 06/20/2018 12:52 PM, Peter Xu wrote:
> >>>On Mon, Jun 04, 2018 at 05:55:17PM +0800, guangrong.xiao@gmail.com wrote:
> >>>>From: Xiao Guangrong <xiaoguangrong@tencent.com>
> >>>>
> >>>>It's the simple lockless ring buffer implement which supports both
> >>>>single producer vs. single consumer and multiple producers vs.
> >>>>single consumer.
> >>>>
> >>>>Many lessons were learned from Linux Kernel's kfifo (1) and DPDK's
> >>>>rte_ring (2) before i wrote this implement. It corrects some bugs of
> >>>>memory barriers in kfifo and it is the simpler lockless version of
> >>>>rte_ring as currently multiple access is only allowed for producer.
> >>>
> >>>Could you provide some more information about the kfifo bug? Any
> >>>pointer would be appreciated.
> >>>
> >>
> >>Sure, i reported one of the memory barrier issue to linux kernel:
> >>   https://lkml.org/lkml/2018/5/11/58
> >>
> >>Actually, beside that, there is another memory barrier issue in kfifo,
> >>please consider this case:
> >>
> >>   at the beginning
> >>   ring->size = 4
> >>   ring->out = 0
> >>   ring->in = 4
> >>
> >>     Consumer                            Producer
> >> ---------------                     --------------
> >>   index = ring->out; /* index == 0 */
> >>   ring->out++; /* ring->out == 1 */
> >>   < Re-Order >
> >>                                    out = ring->out;
> >>                                    if (ring->in - out >= ring->mask)
> >>                                        return -EFULL;
> >>                                    /* see the ring is not full */
> >>                                    index = ring->in & ring->mask; /* index == 0 */
> >>                                    ring->data[index] = new_data;
> >>                     ring->in++;
> >>
> >>   data = ring->data[index];
> >>   !!!!!! the old data is lost !!!!!!
> >>
> >>So we need to make sure:
> >>1) for the consumer, we should read the ring->data[] out before updating ring->out
> >>2) for the producer, we should read ring->out before updating ring->data[]
> >>
> >>as followings:
> >>      Producer                                       Consumer
> >>  ------------------------------------ ------------------------
> >>      Reading ring->out                            Reading ring->data[index]
> >>      smp_mb()                                     smp_mb()
> >>      Setting ring->data[index] = data ring->out++
> >>
> >>[ i used atomic_store_release() and atomic_load_acquire() instead of smp_mb() in the
> >>  patch. ]
> >>
> >>But i am not sure if we can use smp_acquire__after_ctrl_dep() in the producer?
> >
> >
> >I wonder if this could be solved by simply tweaking the above consumer implementation:
> >
> >[1] index = ring->out;
> >[2] data = ring->data[index];
> >[3] index++;
> >[4] ring->out = index;
> >
> >Now [2] and [3] forms a WAR dependency, which avoids the reordering.
> 
> It can not. [2] and [4] still do not any dependency, CPU and complainer can omit
> the 'index'.

One thing to try would be the Linux-kernel memory model tools in
tools/memory-model in current mainline.  There is a README file describing
how to install and set it up, with a number of files in Documentation
and litmus-tests that can help guide you.

							Thanx, Paul
diff mbox series

Patch

diff --git a/migration/ring.h b/migration/ring.h
new file mode 100644
index 0000000000..da9b8bdcbb
--- /dev/null
+++ b/migration/ring.h
@@ -0,0 +1,265 @@ 
+/*
+ * Ring Buffer
+ *
+ * Multiple producers and single consumer are supported with lock free.
+ *
+ * Copyright (c) 2018 Tencent Inc
+ *
+ * Authors:
+ *  Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#ifndef _RING__
+#define _RING__
+
+#define CACHE_LINE  64
+#define cache_aligned __attribute__((__aligned__(CACHE_LINE)))
+
+#define RING_MULTI_PRODUCER 0x1
+
+struct Ring {
+    unsigned int flags;
+    unsigned int size;
+    unsigned int mask;
+
+    unsigned int in cache_aligned;
+
+    unsigned int out cache_aligned;
+
+    void *data[0] cache_aligned;
+};
+typedef struct Ring Ring;
+
+/*
+ * allocate and initialize the ring
+ *
+ * @size: the number of element, it should be power of 2
+ * @flags: set to RING_MULTI_PRODUCER if the ring has multiple producer,
+ *         otherwise set it to 0, i,e. single producer and single consumer.
+ *
+ * return the ring.
+ */
+static inline Ring *ring_alloc(unsigned int size, unsigned int flags)
+{
+    Ring *ring;
+
+    assert(is_power_of_2(size));
+
+    ring = g_malloc0(sizeof(*ring) + size * sizeof(void *));
+    ring->size = size;
+    ring->mask = ring->size - 1;
+    ring->flags = flags;
+    return ring;
+}
+
+static inline void ring_free(Ring *ring)
+{
+    g_free(ring);
+}
+
+static inline bool __ring_is_empty(unsigned int in, unsigned int out)
+{
+    return in == out;
+}
+
+static inline bool ring_is_empty(Ring *ring)
+{
+    return ring->in == ring->out;
+}
+
+static inline unsigned int ring_len(unsigned int in, unsigned int out)
+{
+    return in - out;
+}
+
+static inline bool
+__ring_is_full(Ring *ring, unsigned int in, unsigned int out)
+{
+    return ring_len(in, out) > ring->mask;
+}
+
+static inline bool ring_is_full(Ring *ring)
+{
+    return __ring_is_full(ring, ring->in, ring->out);
+}
+
+static inline unsigned int ring_index(Ring *ring, unsigned int pos)
+{
+    return pos & ring->mask;
+}
+
+static inline int __ring_put(Ring *ring, void *data)
+{
+    unsigned int index, out;
+
+    out = atomic_load_acquire(&ring->out);
+    /*
+     * smp_mb()
+     *
+     * should read ring->out before updating the entry, see the comments in
+     * __ring_get().
+     */
+
+    if (__ring_is_full(ring, ring->in, out)) {
+        return -ENOBUFS;
+    }
+
+    index = ring_index(ring, ring->in);
+
+    atomic_set(&ring->data[index], data);
+
+    /*
+     * should make sure the entry is updated before increasing ring->in
+     * otherwise the consumer will get a entry but its content is useless.
+     */
+    smp_wmb();
+    atomic_set(&ring->in, ring->in + 1);
+    return 0;
+}
+
+static inline void *__ring_get(Ring *ring)
+{
+    unsigned int index, in;
+    void *data;
+
+    in = atomic_read(&ring->in);
+
+    /*
+     * should read ring->in first to make sure the entry pointed by this
+     * index is available, see the comments in __ring_put().
+     */
+    smp_rmb();
+    if (__ring_is_empty(in, ring->out)) {
+        return NULL;
+    }
+
+    index = ring_index(ring, ring->out);
+
+    data = atomic_read(&ring->data[index]);
+
+    /*
+     * smp_mb()
+     *
+     * once the ring->out is updated the entry originally indicated by the
+     * the index is visible and usable to the producer so that we should
+     * make sure reading the entry out before updating ring->out to avoid
+     * the entry being overwritten by the producer.
+     */
+    atomic_store_release(&ring->out, ring->out + 1);
+
+    return data;
+}
+
+static inline int ring_mp_put(Ring *ring, void *data)
+{
+    unsigned int index, in, in_next, out;
+
+    do {
+        in = atomic_read(&ring->in);
+        out = atomic_read(&ring->out);
+
+        if (__ring_is_full(ring, in, out)) {
+            if (atomic_read(&ring->in) == in &&
+                atomic_read(&ring->out) == out) {
+                return -ENOBUFS;
+            }
+
+            /* a entry has been fetched out, retry. */
+            continue;
+        }
+
+        in_next = in + 1;
+    } while (atomic_cmpxchg(&ring->in, in, in_next) != in);
+
+    index = ring_index(ring, in);
+
+    /*
+     * smp_rmb() paired with the memory barrier of (A) in ring_mp_get()
+     * is implied in atomic_cmpxchg() as we should read ring->out first
+     * before fetching the entry, otherwise this assert will fail.
+     */
+    assert(!atomic_read(&ring->data[index]));
+
+    /*
+     * smp_mb() paired with the memory barrier of (B) in ring_mp_get() is
+     * implied in atomic_cmpxchg(), that is needed here as  we should read
+     * ring->out before updating the entry, it is the same as we did in
+     * __ring_put().
+     *
+     * smp_wmb() paired with the memory barrier of (C) in ring_mp_get()
+     * is implied in atomic_cmpxchg(), that is needed as we should increase
+     * ring->in before updating the entry.
+     */
+    atomic_set(&ring->data[index], data);
+
+    return 0;
+}
+
+static inline void *ring_mp_get(Ring *ring)
+{
+    unsigned int index, in;
+    void *data;
+
+    do {
+        in = atomic_read(&ring->in);
+
+        /*
+         * (C) should read ring->in first to make sure the entry pointed by this
+         * index is available
+         */
+        smp_rmb();
+
+        if (!__ring_is_empty(in, ring->out)) {
+            break;
+        }
+
+        if (atomic_read(&ring->in) == in) {
+            return NULL;
+        }
+        /* new entry has been added in, retry. */
+    } while (1);
+
+    index = ring_index(ring, ring->out);
+
+    do {
+        data = atomic_read(&ring->data[index]);
+        if (data) {
+            break;
+        }
+        /* the producer is updating the entry, retry */
+        cpu_relax();
+    } while (1);
+
+    atomic_set(&ring->data[index], NULL);
+
+    /*
+     * (B) smp_mb() is needed as we should read the entry out before
+     * updating ring->out as we did in __ring_get().
+     *
+     * (A) smp_wmb() is needed as we should make the entry be NULL before
+     * updating ring->out (which will make the entry be visible and usable).
+     */
+    atomic_store_release(&ring->out, ring->out + 1);
+
+    return data;
+}
+
+static inline int ring_put(Ring *ring, void *data)
+{
+    if (ring->flags & RING_MULTI_PRODUCER) {
+        return ring_mp_put(ring, data);
+    }
+    return __ring_put(ring, data);
+}
+
+static inline void *ring_get(Ring *ring)
+{
+    if (ring->flags & RING_MULTI_PRODUCER) {
+        return ring_mp_get(ring);
+    }
+    return __ring_get(ring);
+}
+#endif