Message ID | 20240704072154.1716301-1-anton.ivanov@cambridgegreys.com |
---|---|
State | Changes Requested |
Headers | show |
Series | um: vector: Replace locks guarding queue depth with atomics | expand |
Hi Anton, Thanks for taking a look! Also thanks for the other explanation. On Thu, 2024-07-04 at 08:21 +0100, anton.ivanov@cambridgegreys.com wrote: > From: Anton Ivanov <anton.ivanov@cambridgegreys.com> > > UML vector drivers use ring buffer structures which map > preallocated skbs onto mmsg vectors for use with sendmmsg > and recvmmsg. They are designed around a single consumer, > single producer pattern allowing simultaneous enqueue and > dequeue. Right, I didn't grok that from just the code yesterday :) > Lock debugging with preemption showed possible races when > locking the queue depth. This patch addresses this by > removing extra locks while making queue depth inc/dec and > access atomic. I don't think this is fully correct in SMP, and I think we should fix it even if we don't have SMP (yet?) See, the other thing about spinlocks is that they serve as barriers, and I don't think the atomics you use have "enough" of them (per the docs in atomic_t.txt)? If you have concurrent producers and consumers, you really want the producer to actually have fully prepared the data before it's visible to the consumer, which means you need to have a full barrier before incrementing queue_length. But you're now incrementing it by just atomic_add(), and atomic_t.txt says: - RMW operations that have no return value are unordered; So I think the producer should have /* * adding to queue_length makes the prepared buffers * visible to the consumer, ensure they're actually * completely written/visible before */ smp_mb__before_atomic(); atomic_add(...); Or so? > + atomic_sub(advance, &qi->queue_depth); > > - if (qi->queue_depth == 0) { > - qi->head = 0; > - qi->tail = 0; > - } > - queue_depth = qi->queue_depth; > - spin_unlock(&qi->tail_lock); > - return queue_depth; > + return atomic_read(&qi->queue_depth); Or just use atomic_sub_return()? Though that also implies a barrier, which I think isn't needed on the consumer side. However I think it's clearer to just remove the return value and make this function void. > static int vector_advancetail(struct vector_queue *qi, int advance) > { > - int queue_depth; > - > qi->tail = > (qi->tail + advance) > % qi->max_depth; > - spin_lock(&qi->head_lock); > - qi->queue_depth += advance; > - queue_depth = qi->queue_depth; > - spin_unlock(&qi->head_lock); > - return queue_depth; > + atomic_add(advance, &qi->queue_depth); > + return atomic_read(&qi->queue_depth); > } Or maybe here instead of the barrier use atomic_add_return() which implies a full barrier on both sides, but I don't think you need that, the barrier before is enough? > @@ -411,61 +386,58 @@ static int vector_send(struct vector_queue *qi) > int result = 0, send_len, queue_depth = qi->max_depth; > > if (spin_trylock(&qi->head_lock)) { [...] > + /* update queue_depth to current value */ > + queue_depth = atomic_read(&qi->queue_depth); > + while (queue_depth > 0) { I think it'd be clearer to write this as while ((queue_depth = atomic_read(...))) { and simply not modify the queue_depth to the return value of consume_vector_skbs() here: [...] > + if (result > 0) { > + queue_depth = > + consume_vector_skbs(qi, result); > + /* This is equivalent to an TX IRQ. > + * Restart the upper layers to feed us > + * more packets. > */ > - if (result != send_len) { > - vp->estats.tx_restart_queue++; > - break; > - } > + if (result > vp->estats.tx_queue_max) > + vp->estats.tx_queue_max = result; > + vp->estats.tx_queue_running_average = > + (vp->estats.tx_queue_running_average + result) >> 1; > + } > + netif_wake_queue(qi->dev); > + /* if TX is busy, break out of the send loop, > + * poll write IRQ will reschedule xmit for us > + */ > + if (result != send_len) { > + vp->estats.tx_restart_queue++; > + break; > } > } The loop doesn't need queue_depth until it goes back to the beginning anyway. (it probably also never even executes twice unless you actually have SMP or preemption?) > @@ -675,11 +647,13 @@ static void prep_queue_for_rx(struct vector_queue *qi) > struct vector_private *vp = netdev_priv(qi->dev); > struct mmsghdr *mmsg_vector = qi->mmsg_vector; > void **skbuff_vector = qi->skbuff_vector; > - int i; > + int i, queue_depth; > + > + queue_depth = atomic_read(&qi->queue_depth); > > - if (qi->queue_depth == 0) > + if (queue_depth == 0) > return; > - for (i = 0; i < qi->queue_depth; i++) { > + for (i = 0; i < queue_depth; i++) { > /* it is OK if allocation fails - recvmmsg with NULL data in > * iov argument still performs an RX, just drops the packet > * This allows us stop faffing around with a "drop buffer" > @@ -689,7 +663,7 @@ static void prep_queue_for_rx(struct vector_queue *qi) > skbuff_vector++; > mmsg_vector++; > } > - qi->queue_depth = 0; > + atomic_set(&qi->queue_depth, 0); atomic_read() and then atomic_set() rather than atomic_sub() seems questionable, but you didn't have locks here before so maybe this is somehow logically never in parallel? But it is done in NAPI polling via vector_mmsg_rx(), so not sure I understand. > @@ -987,7 +961,7 @@ static int vector_mmsg_rx(struct vector_private *vp, int budget) > * many do we need to prep the next time prep_queue_for_rx() is called. > */ > > - qi->queue_depth = packet_count; > + atomic_add(packet_count, &qi->queue_depth); > > for (i = 0; i < packet_count; i++) { > skb = (*skbuff_vector); especially since here you _did_ convert to atomic_add(). johannes
On 04/07/2024 10:17, Johannes Berg wrote: > Hi Anton, > > Thanks for taking a look! Also thanks for the other explanation. > > On Thu, 2024-07-04 at 08:21 +0100, anton.ivanov@cambridgegreys.com > wrote: >> From: Anton Ivanov <anton.ivanov@cambridgegreys.com> >> >> UML vector drivers use ring buffer structures which map >> preallocated skbs onto mmsg vectors for use with sendmmsg >> and recvmmsg. They are designed around a single consumer, >> single producer pattern allowing simultaneous enqueue and >> dequeue. > > Right, I didn't grok that from just the code yesterday :) > >> Lock debugging with preemption showed possible races when >> locking the queue depth. This patch addresses this by >> removing extra locks while making queue depth inc/dec and >> access atomic. > > I don't think this is fully correct in SMP, and I think we should fix it > even if we don't have SMP (yet?) > > See, the other thing about spinlocks is that they serve as barriers, and > I don't think the atomics you use have "enough" of them (per the docs in > atomic_t.txt)? > > If you have concurrent producers and consumers, you really want the > producer to actually have fully prepared the data before it's visible to > the consumer, which means you need to have a full barrier before > incrementing queue_length. Indeed. I will add that for v2. > > But you're now incrementing it by just atomic_add(), and atomic_t.txt > says: > > - RMW operations that have no return value are unordered; > > So I think the producer should have > > /* > * adding to queue_length makes the prepared buffers > * visible to the consumer, ensure they're actually > * completely written/visible before > */ > smp_mb__before_atomic(); > atomic_add(...); > > Or so? > > >> + atomic_sub(advance, &qi->queue_depth); >> >> - if (qi->queue_depth == 0) { >> - qi->head = 0; >> - qi->tail = 0; >> - } >> - queue_depth = qi->queue_depth; >> - spin_unlock(&qi->tail_lock); >> - return queue_depth; >> + return atomic_read(&qi->queue_depth); > > Or just use atomic_sub_return()? Though that also implies a barrier, > which I think isn't needed on the consumer side. Let me have a look at it once again. IIRC you need only producer barriers. > > However I think it's clearer to just remove the return value and make > this function void. > >> static int vector_advancetail(struct vector_queue *qi, int advance) >> { >> - int queue_depth; >> - >> qi->tail = >> (qi->tail + advance) >> % qi->max_depth; >> - spin_lock(&qi->head_lock); >> - qi->queue_depth += advance; >> - queue_depth = qi->queue_depth; >> - spin_unlock(&qi->head_lock); >> - return queue_depth; >> + atomic_add(advance, &qi->queue_depth); >> + return atomic_read(&qi->queue_depth); >> } > > Or maybe here instead of the barrier use atomic_add_return() which > implies a full barrier on both sides, but I don't think you need that, > the barrier before is enough? > > >> @@ -411,61 +386,58 @@ static int vector_send(struct vector_queue *qi) >> int result = 0, send_len, queue_depth = qi->max_depth; >> >> if (spin_trylock(&qi->head_lock)) { > [...] >> + /* update queue_depth to current value */ >> + queue_depth = atomic_read(&qi->queue_depth); >> + while (queue_depth > 0) { > > I think it'd be clearer to write this as > > while ((queue_depth = atomic_read(...))) { > > and simply not modify the queue_depth to the return value of > consume_vector_skbs() here: > > [...] > >> + if (result > 0) { >> + queue_depth = >> + consume_vector_skbs(qi, result); >> + /* This is equivalent to an TX IRQ. >> + * Restart the upper layers to feed us >> + * more packets. >> */ >> - if (result != send_len) { >> - vp->estats.tx_restart_queue++; >> - break; >> - } >> + if (result > vp->estats.tx_queue_max) >> + vp->estats.tx_queue_max = result; >> + vp->estats.tx_queue_running_average = >> + (vp->estats.tx_queue_running_average + result) >> 1; >> + } >> + netif_wake_queue(qi->dev); >> + /* if TX is busy, break out of the send loop, >> + * poll write IRQ will reschedule xmit for us >> + */ >> + if (result != send_len) { >> + vp->estats.tx_restart_queue++; >> + break; >> } >> } > > The loop doesn't need queue_depth until it goes back to the beginning > anyway. > > (it probably also never even executes twice unless you actually have SMP > or preemption?) It does. If half of the vector is at the end of the array which is used to imitate a ring buffer and the other half is at the beginning. Quite a common condition actually. There is an extra issue there - stats. I need to double-check the locking when they are being fetched. > >> @@ -675,11 +647,13 @@ static void prep_queue_for_rx(struct vector_queue *qi) >> struct vector_private *vp = netdev_priv(qi->dev); >> struct mmsghdr *mmsg_vector = qi->mmsg_vector; >> void **skbuff_vector = qi->skbuff_vector; >> - int i; >> + int i, queue_depth; >> + >> + queue_depth = atomic_read(&qi->queue_depth); >> >> - if (qi->queue_depth == 0) >> + if (queue_depth == 0) >> return; >> - for (i = 0; i < qi->queue_depth; i++) { >> + for (i = 0; i < queue_depth; i++) { >> /* it is OK if allocation fails - recvmmsg with NULL data in >> * iov argument still performs an RX, just drops the packet >> * This allows us stop faffing around with a "drop buffer" >> @@ -689,7 +663,7 @@ static void prep_queue_for_rx(struct vector_queue *qi) >> skbuff_vector++; >> mmsg_vector++; >> } >> - qi->queue_depth = 0; >> + atomic_set(&qi->queue_depth, 0); > > atomic_read() and then atomic_set() rather than atomic_sub() seems > questionable, but you didn't have locks here before so maybe this is > somehow logically never in parallel? But it is done in NAPI polling via > vector_mmsg_rx(), so not sure I understand. > >> @@ -987,7 +961,7 @@ static int vector_mmsg_rx(struct vector_private *vp, int budget) >> * many do we need to prep the next time prep_queue_for_rx() is called. >> */ >> >> - qi->queue_depth = packet_count; >> + atomic_add(packet_count, &qi->queue_depth); >> >> for (i = 0; i < packet_count; i++) { >> skb = (*skbuff_vector); > > especially since here you _did_ convert to atomic_add(). > > johannes >
On 04/07/2024 10:45, Anton Ivanov wrote: > > > On 04/07/2024 10:17, Johannes Berg wrote: >> Hi Anton, >> >> Thanks for taking a look! Also thanks for the other explanation. >> >> On Thu, 2024-07-04 at 08:21 +0100, anton.ivanov@cambridgegreys.com >> wrote: >>> From: Anton Ivanov <anton.ivanov@cambridgegreys.com> >>> >>> UML vector drivers use ring buffer structures which map >>> preallocated skbs onto mmsg vectors for use with sendmmsg >>> and recvmmsg. They are designed around a single consumer, >>> single producer pattern allowing simultaneous enqueue and >>> dequeue. >> >> Right, I didn't grok that from just the code yesterday :) >> >>> Lock debugging with preemption showed possible races when >>> locking the queue depth. This patch addresses this by >>> removing extra locks while making queue depth inc/dec and >>> access atomic. >> >> I don't think this is fully correct in SMP, and I think we should fix it >> even if we don't have SMP (yet?) >> >> See, the other thing about spinlocks is that they serve as barriers, and >> I don't think the atomics you use have "enough" of them (per the docs in >> atomic_t.txt)? >> >> If you have concurrent producers and consumers, you really want the >> producer to actually have fully prepared the data before it's visible to >> the consumer, which means you need to have a full barrier before >> incrementing queue_length. > > Indeed. I will add that for v2. > >> >> But you're now incrementing it by just atomic_add(), and atomic_t.txt >> says: >> >> - RMW operations that have no return value are unordered; >> >> So I think the producer should have >> >> /* >> * adding to queue_length makes the prepared buffers >> * visible to the consumer, ensure they're actually >> * completely written/visible before >> */ >> smp_mb__before_atomic(); >> atomic_add(...); >> >> Or so? >> >> >>> + atomic_sub(advance, &qi->queue_depth); >>> - if (qi->queue_depth == 0) { >>> - qi->head = 0; >>> - qi->tail = 0; >>> - } >>> - queue_depth = qi->queue_depth; >>> - spin_unlock(&qi->tail_lock); >>> - return queue_depth; >>> + return atomic_read(&qi->queue_depth); >> >> Or just use atomic_sub_return()? Though that also implies a barrier, >> which I think isn't needed on the consumer side. > > Let me have a look at it once again. IIRC you need only producer barriers. > >> >> However I think it's clearer to just remove the return value and make >> this function void. >> >>> static int vector_advancetail(struct vector_queue *qi, int advance) >>> { >>> - int queue_depth; >>> - >>> qi->tail = >>> (qi->tail + advance) >>> % qi->max_depth; >>> - spin_lock(&qi->head_lock); >>> - qi->queue_depth += advance; >>> - queue_depth = qi->queue_depth; >>> - spin_unlock(&qi->head_lock); >>> - return queue_depth; >>> + atomic_add(advance, &qi->queue_depth); >>> + return atomic_read(&qi->queue_depth); >>> } >> >> Or maybe here instead of the barrier use atomic_add_return() which >> implies a full barrier on both sides, but I don't think you need that, >> the barrier before is enough? >> >> >>> @@ -411,61 +386,58 @@ static int vector_send(struct vector_queue *qi) >>> int result = 0, send_len, queue_depth = qi->max_depth; >>> if (spin_trylock(&qi->head_lock)) { >> [...] >>> + /* update queue_depth to current value */ >>> + queue_depth = atomic_read(&qi->queue_depth); >>> + while (queue_depth > 0) { >> >> I think it'd be clearer to write this as >> >> while ((queue_depth = atomic_read(...))) { >> >> and simply not modify the queue_depth to the return value of >> consume_vector_skbs() here: >> >> [...] >> >>> + if (result > 0) { >>> + queue_depth = >>> + consume_vector_skbs(qi, result); >>> + /* This is equivalent to an TX IRQ. >>> + * Restart the upper layers to feed us >>> + * more packets. >>> */ >>> - if (result != send_len) { >>> - vp->estats.tx_restart_queue++; >>> - break; >>> - } >>> + if (result > vp->estats.tx_queue_max) >>> + vp->estats.tx_queue_max = result; >>> + vp->estats.tx_queue_running_average = >>> + (vp->estats.tx_queue_running_average + result) >> 1; >>> + } >>> + netif_wake_queue(qi->dev); >>> + /* if TX is busy, break out of the send loop, >>> + * poll write IRQ will reschedule xmit for us >>> + */ >>> + if (result != send_len) { >>> + vp->estats.tx_restart_queue++; >>> + break; >>> } >>> } >> >> The loop doesn't need queue_depth until it goes back to the beginning >> anyway. >> >> (it probably also never even executes twice unless you actually have SMP >> or preemption?) > > It does. If half of the vector is at the end of the array which is used to > imitate a ring buffer and the other half is at the beginning. Quite a common > condition actually. > > There is an extra issue there - stats. I need to double-check the locking when > they are being fetched. Same story - these need atomics. Otherwise we can potentially get the same ABBA lock issue when they are being fetched. > >> >>> @@ -675,11 +647,13 @@ static void prep_queue_for_rx(struct vector_queue *qi) >>> struct vector_private *vp = netdev_priv(qi->dev); >>> struct mmsghdr *mmsg_vector = qi->mmsg_vector; >>> void **skbuff_vector = qi->skbuff_vector; >>> - int i; >>> + int i, queue_depth; >>> + >>> + queue_depth = atomic_read(&qi->queue_depth); >>> - if (qi->queue_depth == 0) >>> + if (queue_depth == 0) >>> return; >>> - for (i = 0; i < qi->queue_depth; i++) { >>> + for (i = 0; i < queue_depth; i++) { >>> /* it is OK if allocation fails - recvmmsg with NULL data in >>> * iov argument still performs an RX, just drops the packet >>> * This allows us stop faffing around with a "drop buffer" >>> @@ -689,7 +663,7 @@ static void prep_queue_for_rx(struct vector_queue *qi) >>> skbuff_vector++; >>> mmsg_vector++; >>> } >>> - qi->queue_depth = 0; >>> + atomic_set(&qi->queue_depth, 0); >> >> atomic_read() and then atomic_set() rather than atomic_sub() seems >> questionable, but you didn't have locks here before so maybe this is >> somehow logically never in parallel? But it is done in NAPI polling via >> vector_mmsg_rx(), so not sure I understand. >> >>> @@ -987,7 +961,7 @@ static int vector_mmsg_rx(struct vector_private *vp, int budget) >>> * many do we need to prep the next time prep_queue_for_rx() is called. >>> */ >>> - qi->queue_depth = packet_count; >>> + atomic_add(packet_count, &qi->queue_depth); >>> for (i = 0; i < packet_count; i++) { >>> skb = (*skbuff_vector); >> >> especially since here you _did_ convert to atomic_add(). >> >> johannes >> >
On Thu, 2024-07-04 at 10:45 +0100, Anton Ivanov wrote: > > (it probably also never even executes twice unless you actually have SMP > > or preemption?) > > It does. If half of the vector is at the end of the array which is used to > imitate a ring buffer and the other half is at the beginning. Quite a common > condition actually. Ah, right, I missed this bit: /* Adjust vector size if wraparound */ if (send_len + qi->head > qi->max_depth) send_len = qi->max_depth - qi->head; > There is an extra issue there - stats. I need to double-check the locking when > they are being fetched. The memcpy() is an issue, but I think reading them individually should be OK regardless? But I suppose you could just take both locks there, hopefully nobody is accessing them _really_ frequently? At least I think you can take both locks since now no other path is taking both? But you'd need spin_lock_bh(). johannes
On Thu, 2024-07-04 at 10:52 +0100, Anton Ivanov wrote: > > > > There is an extra issue there - stats. I need to double-check the locking when > > they are being fetched. > > Same story - these need atomics. Otherwise we can potentially get the same ABBA > lock issue when they are being fetched. > Email overlap ... I don't think so? If no other path now uses _both_ locks, the stats path can, it'll just be "AB" never "BA". johannes
On Thu, 2024-07-04 at 11:55 +0200, Johannes Berg wrote: > On Thu, 2024-07-04 at 10:52 +0100, Anton Ivanov wrote: > > > > > > There is an extra issue there - stats. I need to double-check the locking when > > > they are being fetched. > > > > Same story - these need atomics. Otherwise we can potentially get the same ABBA > > lock issue when they are being fetched. > > > > Email overlap ... > > I don't think so? If no other path now uses _both_ locks, the stats path > can, it'll just be "AB" never "BA". Or just split into tx and rx stats, and use the correct lock separately? johannes
On 04/07/2024 10:55, Johannes Berg wrote: > On Thu, 2024-07-04 at 10:52 +0100, Anton Ivanov wrote: >>> >>> There is an extra issue there - stats. I need to double-check the locking when >>> they are being fetched. >> >> Same story - these need atomics. Otherwise we can potentially get the same ABBA >> lock issue when they are being fetched. >> > > Email overlap ... > > I don't think so? If no other path now uses _both_ locks, the stats path > can, it'll just be "AB" never "BA". Good point. > > johannes > >
diff --git a/arch/um/drivers/vector_kern.c b/arch/um/drivers/vector_kern.c index dc2feae789cb..5c9bf458fe0a 100644 --- a/arch/um/drivers/vector_kern.c +++ b/arch/um/drivers/vector_kern.c @@ -22,6 +22,7 @@ #include <linux/interrupt.h> #include <linux/firmware.h> #include <linux/fs.h> +#include <asm/atomic.h> #include <uapi/linux/filter.h> #include <init.h> #include <irq_kern.h> @@ -232,12 +233,6 @@ static int get_transport_options(struct arglist *def) static char *drop_buffer; -/* Array backed queues optimized for bulk enqueue/dequeue and - * 1:N (small values of N) or 1:1 enqueuer/dequeuer ratios. - * For more details and full design rationale see - * http://foswiki.cambridgegreys.com/Main/EatYourTailAndEnjoyIt - */ - /* * Advance the mmsg queue head by n = advance. Resets the queue to @@ -247,27 +242,14 @@ static char *drop_buffer; static int vector_advancehead(struct vector_queue *qi, int advance) { - int queue_depth; - qi->head = (qi->head + advance) % qi->max_depth; - spin_lock(&qi->tail_lock); - qi->queue_depth -= advance; - - /* we are at 0, use this to - * reset head and tail so we can use max size vectors - */ + atomic_sub(advance, &qi->queue_depth); - if (qi->queue_depth == 0) { - qi->head = 0; - qi->tail = 0; - } - queue_depth = qi->queue_depth; - spin_unlock(&qi->tail_lock); - return queue_depth; + return atomic_read(&qi->queue_depth); } /* Advance the queue tail by n = advance. @@ -277,16 +259,11 @@ static int vector_advancehead(struct vector_queue *qi, int advance) static int vector_advancetail(struct vector_queue *qi, int advance) { - int queue_depth; - qi->tail = (qi->tail + advance) % qi->max_depth; - spin_lock(&qi->head_lock); - qi->queue_depth += advance; - queue_depth = qi->queue_depth; - spin_unlock(&qi->head_lock); - return queue_depth; + atomic_add(advance, &qi->queue_depth); + return atomic_read(&qi->queue_depth); } static int prep_msg(struct vector_private *vp, @@ -339,9 +316,7 @@ static int vector_enqueue(struct vector_queue *qi, struct sk_buff *skb) int iov_count; spin_lock(&qi->tail_lock); - spin_lock(&qi->head_lock); - queue_depth = qi->queue_depth; - spin_unlock(&qi->head_lock); + queue_depth = atomic_read(&qi->queue_depth); if (skb) packet_len = skb->len; @@ -411,61 +386,58 @@ static int vector_send(struct vector_queue *qi) int result = 0, send_len, queue_depth = qi->max_depth; if (spin_trylock(&qi->head_lock)) { - if (spin_trylock(&qi->tail_lock)) { - /* update queue_depth to current value */ - queue_depth = qi->queue_depth; - spin_unlock(&qi->tail_lock); - while (queue_depth > 0) { - /* Calculate the start of the vector */ - send_len = queue_depth; - send_from = qi->mmsg_vector; - send_from += qi->head; - /* Adjust vector size if wraparound */ - if (send_len + qi->head > qi->max_depth) - send_len = qi->max_depth - qi->head; - /* Try to TX as many packets as possible */ - if (send_len > 0) { - result = uml_vector_sendmmsg( - vp->fds->tx_fd, - send_from, - send_len, - 0 - ); - vp->in_write_poll = - (result != send_len); - } - /* For some of the sendmmsg error scenarios - * we may end being unsure in the TX success - * for all packets. It is safer to declare - * them all TX-ed and blame the network. - */ - if (result < 0) { - if (net_ratelimit()) - netdev_err(vp->dev, "sendmmsg err=%i\n", - result); - vp->in_error = true; - result = send_len; - } - if (result > 0) { - queue_depth = - consume_vector_skbs(qi, result); - /* This is equivalent to an TX IRQ. - * Restart the upper layers to feed us - * more packets. - */ - if (result > vp->estats.tx_queue_max) - vp->estats.tx_queue_max = result; - vp->estats.tx_queue_running_average = - (vp->estats.tx_queue_running_average + result) >> 1; - } - netif_wake_queue(qi->dev); - /* if TX is busy, break out of the send loop, - * poll write IRQ will reschedule xmit for us + /* update queue_depth to current value */ + queue_depth = atomic_read(&qi->queue_depth); + while (queue_depth > 0) { + /* Calculate the start of the vector */ + send_len = queue_depth; + send_from = qi->mmsg_vector; + send_from += qi->head; + /* Adjust vector size if wraparound */ + if (send_len + qi->head > qi->max_depth) + send_len = qi->max_depth - qi->head; + /* Try to TX as many packets as possible */ + if (send_len > 0) { + result = uml_vector_sendmmsg( + vp->fds->tx_fd, + send_from, + send_len, + 0 + ); + vp->in_write_poll = + (result != send_len); + } + /* For some of the sendmmsg error scenarios + * we may end being unsure in the TX success + * for all packets. It is safer to declare + * them all TX-ed and blame the network. + */ + if (result < 0) { + if (net_ratelimit()) + netdev_err(vp->dev, "sendmmsg err=%i\n", + result); + vp->in_error = true; + result = send_len; + } + if (result > 0) { + queue_depth = + consume_vector_skbs(qi, result); + /* This is equivalent to an TX IRQ. + * Restart the upper layers to feed us + * more packets. */ - if (result != send_len) { - vp->estats.tx_restart_queue++; - break; - } + if (result > vp->estats.tx_queue_max) + vp->estats.tx_queue_max = result; + vp->estats.tx_queue_running_average = + (vp->estats.tx_queue_running_average + result) >> 1; + } + netif_wake_queue(qi->dev); + /* if TX is busy, break out of the send loop, + * poll write IRQ will reschedule xmit for us + */ + if (result != send_len) { + vp->estats.tx_restart_queue++; + break; } } spin_unlock(&qi->head_lock); @@ -589,7 +561,7 @@ static struct vector_queue *create_queue( } spin_lock_init(&result->head_lock); spin_lock_init(&result->tail_lock); - result->queue_depth = 0; + atomic_set(&result->queue_depth, 0); result->head = 0; result->tail = 0; return result; @@ -675,11 +647,13 @@ static void prep_queue_for_rx(struct vector_queue *qi) struct vector_private *vp = netdev_priv(qi->dev); struct mmsghdr *mmsg_vector = qi->mmsg_vector; void **skbuff_vector = qi->skbuff_vector; - int i; + int i, queue_depth; + + queue_depth = atomic_read(&qi->queue_depth); - if (qi->queue_depth == 0) + if (queue_depth == 0) return; - for (i = 0; i < qi->queue_depth; i++) { + for (i = 0; i < queue_depth; i++) { /* it is OK if allocation fails - recvmmsg with NULL data in * iov argument still performs an RX, just drops the packet * This allows us stop faffing around with a "drop buffer" @@ -689,7 +663,7 @@ static void prep_queue_for_rx(struct vector_queue *qi) skbuff_vector++; mmsg_vector++; } - qi->queue_depth = 0; + atomic_set(&qi->queue_depth, 0); } static struct vector_device *find_device(int n) @@ -987,7 +961,7 @@ static int vector_mmsg_rx(struct vector_private *vp, int budget) * many do we need to prep the next time prep_queue_for_rx() is called. */ - qi->queue_depth = packet_count; + atomic_add(packet_count, &qi->queue_depth); for (i = 0; i < packet_count; i++) { skb = (*skbuff_vector); @@ -1234,7 +1208,7 @@ static int vector_net_open(struct net_device *dev) vp->rx_header_size, MAX_IOV_SIZE ); - vp->rx_queue->queue_depth = get_depth(vp->parsed); + atomic_set(&vp->rx_queue->queue_depth, get_depth(vp->parsed)); } else { vp->header_rxbuffer = kmalloc( vp->rx_header_size, diff --git a/arch/um/drivers/vector_kern.h b/arch/um/drivers/vector_kern.h index 2a1fa8e0f3e1..1b8de936056d 100644 --- a/arch/um/drivers/vector_kern.h +++ b/arch/um/drivers/vector_kern.h @@ -14,6 +14,7 @@ #include <linux/ctype.h> #include <linux/workqueue.h> #include <linux/interrupt.h> +#include <asm/atomic.h> #include "vector_user.h" @@ -44,7 +45,8 @@ struct vector_queue { struct net_device *dev; spinlock_t head_lock; spinlock_t tail_lock; - int queue_depth, head, tail, max_depth, max_iov_frags; + atomic_t queue_depth; + int head, tail, max_depth, max_iov_frags; short options; };