diff mbox series

um: vector: Replace locks guarding queue depth with atomics

Message ID 20240704072154.1716301-1-anton.ivanov@cambridgegreys.com
State Changes Requested
Headers show
Series um: vector: Replace locks guarding queue depth with atomics | expand

Commit Message

Anton Ivanov July 4, 2024, 7:21 a.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

UML vector drivers use ring buffer structures which map
preallocated skbs onto mmsg vectors for use with sendmmsg
and recvmmsg. They are designed around a single consumer,
single producer pattern allowing simultaneous enqueue and
dequeue.

Lock debugging with preemption showed possible races when
locking the queue depth. This patch addresses this by
removing extra locks while making queue depth inc/dec and
access atomic.

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 arch/um/drivers/vector_kern.c | 158 ++++++++++++++--------------------
 arch/um/drivers/vector_kern.h |   4 +-
 2 files changed, 69 insertions(+), 93 deletions(-)

Comments

Johannes Berg July 4, 2024, 9:17 a.m. UTC | #1
Hi Anton,

Thanks for taking a look! Also thanks for the other explanation.

On Thu, 2024-07-04 at 08:21 +0100, anton.ivanov@cambridgegreys.com
wrote:
> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> 
> UML vector drivers use ring buffer structures which map
> preallocated skbs onto mmsg vectors for use with sendmmsg
> and recvmmsg. They are designed around a single consumer,
> single producer pattern allowing simultaneous enqueue and
> dequeue.

Right, I didn't grok that from just the code yesterday :)

> Lock debugging with preemption showed possible races when
> locking the queue depth. This patch addresses this by
> removing extra locks while making queue depth inc/dec and
> access atomic.

I don't think this is fully correct in SMP, and I think we should fix it
even if we don't have SMP (yet?)

See, the other thing about spinlocks is that they serve as barriers, and
I don't think the atomics you use have "enough" of them (per the docs in
atomic_t.txt)?

If you have concurrent producers and consumers, you really want the
producer to actually have fully prepared the data before it's visible to
the consumer, which means you need to have a full barrier before
incrementing queue_length.

But you're now incrementing it by just atomic_add(), and atomic_t.txt
says:

 - RMW operations that have no return value are unordered;

So I think the producer should have

	/*
	 * adding to queue_length makes the prepared buffers
	 * visible to the consumer, ensure they're actually
	 * completely written/visible before
	 */
	smp_mb__before_atomic();
	atomic_add(...);

Or so?


> +	atomic_sub(advance, &qi->queue_depth);
>  
> -	if (qi->queue_depth == 0) {
> -		qi->head = 0;
> -		qi->tail = 0;
> -	}
> -	queue_depth = qi->queue_depth;
> -	spin_unlock(&qi->tail_lock);
> -	return queue_depth;
> +	return atomic_read(&qi->queue_depth);

Or just use atomic_sub_return()? Though that also implies a barrier,
which I think isn't needed on the consumer side.

However I think it's clearer to just remove the return value and make
this function void.

>  static int vector_advancetail(struct vector_queue *qi, int advance)
>  {
> -	int queue_depth;
> -
>  	qi->tail =
>  		(qi->tail + advance)
>  			% qi->max_depth;
> -	spin_lock(&qi->head_lock);
> -	qi->queue_depth += advance;
> -	queue_depth = qi->queue_depth;
> -	spin_unlock(&qi->head_lock);
> -	return queue_depth;
> +	atomic_add(advance, &qi->queue_depth);
> +	return atomic_read(&qi->queue_depth);
>  }

Or maybe here instead of the barrier use atomic_add_return() which
implies a full barrier on both sides, but I don't think you need that,
the barrier before is enough?


> @@ -411,61 +386,58 @@ static int vector_send(struct vector_queue *qi)
>  	int result = 0, send_len, queue_depth = qi->max_depth;
>  
>  	if (spin_trylock(&qi->head_lock)) {
[...]
> +		/* update queue_depth to current value */
> +		queue_depth = atomic_read(&qi->queue_depth);
> +		while (queue_depth > 0) {

I think it'd be clearer to write this as

	while ((queue_depth = atomic_read(...))) {

and simply not modify the queue_depth to the return value of
consume_vector_skbs() here:

[...]

> +			if (result > 0) {
> +				queue_depth =
> +					consume_vector_skbs(qi, result);
> +				/* This is equivalent to an TX IRQ.
> +				 * Restart the upper layers to feed us
> +				 * more packets.
>  				 */
> -				if (result != send_len) {
> -					vp->estats.tx_restart_queue++;
> -					break;
> -				}
> +				if (result > vp->estats.tx_queue_max)
> +					vp->estats.tx_queue_max = result;
> +				vp->estats.tx_queue_running_average =
> +					(vp->estats.tx_queue_running_average + result) >> 1;
> +			}
> +			netif_wake_queue(qi->dev);
> +			/* if TX is busy, break out of the send loop,
> +			 *  poll write IRQ will reschedule xmit for us
> +			 */
> +			if (result != send_len) {
> +				vp->estats.tx_restart_queue++;
> +				break;
>  			}
>  		}

The loop doesn't need queue_depth until it goes back to the beginning
anyway.

(it probably also never even executes twice unless you actually have SMP
or preemption?)

> @@ -675,11 +647,13 @@ static void prep_queue_for_rx(struct vector_queue *qi)
>  	struct vector_private *vp = netdev_priv(qi->dev);
>  	struct mmsghdr *mmsg_vector = qi->mmsg_vector;
>  	void **skbuff_vector = qi->skbuff_vector;
> -	int i;
> +	int i, queue_depth;
> +
> +	queue_depth = atomic_read(&qi->queue_depth);
>  
> -	if (qi->queue_depth == 0)
> +	if (queue_depth == 0)
>  		return;
> -	for (i = 0; i < qi->queue_depth; i++) {
> +	for (i = 0; i < queue_depth; i++) {
>  		/* it is OK if allocation fails - recvmmsg with NULL data in
>  		 * iov argument still performs an RX, just drops the packet
>  		 * This allows us stop faffing around with a "drop buffer"
> @@ -689,7 +663,7 @@ static void prep_queue_for_rx(struct vector_queue *qi)
>  		skbuff_vector++;
>  		mmsg_vector++;
>  	}
> -	qi->queue_depth = 0;
> +	atomic_set(&qi->queue_depth, 0);

atomic_read() and then atomic_set() rather than atomic_sub() seems
questionable, but you didn't have locks here before so maybe this is
somehow logically never in parallel? But it is done in NAPI polling via
vector_mmsg_rx(), so not sure I understand.

> @@ -987,7 +961,7 @@ static int vector_mmsg_rx(struct vector_private *vp, int budget)
>  	 * many do we need to prep the next time prep_queue_for_rx() is called.
>  	 */
>  
> -	qi->queue_depth = packet_count;
> +	atomic_add(packet_count, &qi->queue_depth);
>  
>  	for (i = 0; i < packet_count; i++) {
>  		skb = (*skbuff_vector);

especially since here you _did_ convert to atomic_add().

johannes
Anton Ivanov July 4, 2024, 9:45 a.m. UTC | #2
On 04/07/2024 10:17, Johannes Berg wrote:
> Hi Anton,
> 
> Thanks for taking a look! Also thanks for the other explanation.
> 
> On Thu, 2024-07-04 at 08:21 +0100, anton.ivanov@cambridgegreys.com
> wrote:
>> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>>
>> UML vector drivers use ring buffer structures which map
>> preallocated skbs onto mmsg vectors for use with sendmmsg
>> and recvmmsg. They are designed around a single consumer,
>> single producer pattern allowing simultaneous enqueue and
>> dequeue.
> 
> Right, I didn't grok that from just the code yesterday :)
> 
>> Lock debugging with preemption showed possible races when
>> locking the queue depth. This patch addresses this by
>> removing extra locks while making queue depth inc/dec and
>> access atomic.
> 
> I don't think this is fully correct in SMP, and I think we should fix it
> even if we don't have SMP (yet?)
> 
> See, the other thing about spinlocks is that they serve as barriers, and
> I don't think the atomics you use have "enough" of them (per the docs in
> atomic_t.txt)?
> 
> If you have concurrent producers and consumers, you really want the
> producer to actually have fully prepared the data before it's visible to
> the consumer, which means you need to have a full barrier before
> incrementing queue_length.

Indeed. I will add that for v2.

> 
> But you're now incrementing it by just atomic_add(), and atomic_t.txt
> says:
> 
>   - RMW operations that have no return value are unordered;
> 
> So I think the producer should have
> 
> 	/*
> 	 * adding to queue_length makes the prepared buffers
> 	 * visible to the consumer, ensure they're actually
> 	 * completely written/visible before
> 	 */
> 	smp_mb__before_atomic();
> 	atomic_add(...);
> 
> Or so?
> 
> 
>> +	atomic_sub(advance, &qi->queue_depth);
>>   
>> -	if (qi->queue_depth == 0) {
>> -		qi->head = 0;
>> -		qi->tail = 0;
>> -	}
>> -	queue_depth = qi->queue_depth;
>> -	spin_unlock(&qi->tail_lock);
>> -	return queue_depth;
>> +	return atomic_read(&qi->queue_depth);
> 
> Or just use atomic_sub_return()? Though that also implies a barrier,
> which I think isn't needed on the consumer side.

Let me have a look at it once again. IIRC you need only producer barriers.

> 
> However I think it's clearer to just remove the return value and make
> this function void.
> 
>>   static int vector_advancetail(struct vector_queue *qi, int advance)
>>   {
>> -	int queue_depth;
>> -
>>   	qi->tail =
>>   		(qi->tail + advance)
>>   			% qi->max_depth;
>> -	spin_lock(&qi->head_lock);
>> -	qi->queue_depth += advance;
>> -	queue_depth = qi->queue_depth;
>> -	spin_unlock(&qi->head_lock);
>> -	return queue_depth;
>> +	atomic_add(advance, &qi->queue_depth);
>> +	return atomic_read(&qi->queue_depth);
>>   }
> 
> Or maybe here instead of the barrier use atomic_add_return() which
> implies a full barrier on both sides, but I don't think you need that,
> the barrier before is enough?
> 
> 
>> @@ -411,61 +386,58 @@ static int vector_send(struct vector_queue *qi)
>>   	int result = 0, send_len, queue_depth = qi->max_depth;
>>   
>>   	if (spin_trylock(&qi->head_lock)) {
> [...]
>> +		/* update queue_depth to current value */
>> +		queue_depth = atomic_read(&qi->queue_depth);
>> +		while (queue_depth > 0) {
> 
> I think it'd be clearer to write this as
> 
> 	while ((queue_depth = atomic_read(...))) {
> 
> and simply not modify the queue_depth to the return value of
> consume_vector_skbs() here:
> 
> [...]
> 
>> +			if (result > 0) {
>> +				queue_depth =
>> +					consume_vector_skbs(qi, result);
>> +				/* This is equivalent to an TX IRQ.
>> +				 * Restart the upper layers to feed us
>> +				 * more packets.
>>   				 */
>> -				if (result != send_len) {
>> -					vp->estats.tx_restart_queue++;
>> -					break;
>> -				}
>> +				if (result > vp->estats.tx_queue_max)
>> +					vp->estats.tx_queue_max = result;
>> +				vp->estats.tx_queue_running_average =
>> +					(vp->estats.tx_queue_running_average + result) >> 1;
>> +			}
>> +			netif_wake_queue(qi->dev);
>> +			/* if TX is busy, break out of the send loop,
>> +			 *  poll write IRQ will reschedule xmit for us
>> +			 */
>> +			if (result != send_len) {
>> +				vp->estats.tx_restart_queue++;
>> +				break;
>>   			}
>>   		}
> 
> The loop doesn't need queue_depth until it goes back to the beginning
> anyway.
> 
> (it probably also never even executes twice unless you actually have SMP
> or preemption?)

It does. If half of the vector is at the end of the array which is used to
imitate a ring buffer and the other half is at the beginning. Quite a common
condition actually.

There is an extra issue there - stats. I need to double-check the locking when
they are being fetched.

> 
>> @@ -675,11 +647,13 @@ static void prep_queue_for_rx(struct vector_queue *qi)
>>   	struct vector_private *vp = netdev_priv(qi->dev);
>>   	struct mmsghdr *mmsg_vector = qi->mmsg_vector;
>>   	void **skbuff_vector = qi->skbuff_vector;
>> -	int i;
>> +	int i, queue_depth;
>> +
>> +	queue_depth = atomic_read(&qi->queue_depth);
>>   
>> -	if (qi->queue_depth == 0)
>> +	if (queue_depth == 0)
>>   		return;
>> -	for (i = 0; i < qi->queue_depth; i++) {
>> +	for (i = 0; i < queue_depth; i++) {
>>   		/* it is OK if allocation fails - recvmmsg with NULL data in
>>   		 * iov argument still performs an RX, just drops the packet
>>   		 * This allows us stop faffing around with a "drop buffer"
>> @@ -689,7 +663,7 @@ static void prep_queue_for_rx(struct vector_queue *qi)
>>   		skbuff_vector++;
>>   		mmsg_vector++;
>>   	}
>> -	qi->queue_depth = 0;
>> +	atomic_set(&qi->queue_depth, 0);
> 
> atomic_read() and then atomic_set() rather than atomic_sub() seems
> questionable, but you didn't have locks here before so maybe this is
> somehow logically never in parallel? But it is done in NAPI polling via
> vector_mmsg_rx(), so not sure I understand.
> 
>> @@ -987,7 +961,7 @@ static int vector_mmsg_rx(struct vector_private *vp, int budget)
>>   	 * many do we need to prep the next time prep_queue_for_rx() is called.
>>   	 */
>>   
>> -	qi->queue_depth = packet_count;
>> +	atomic_add(packet_count, &qi->queue_depth);
>>   
>>   	for (i = 0; i < packet_count; i++) {
>>   		skb = (*skbuff_vector);
> 
> especially since here you _did_ convert to atomic_add().
> 
> johannes
>
Anton Ivanov July 4, 2024, 9:52 a.m. UTC | #3
On 04/07/2024 10:45, Anton Ivanov wrote:
> 
> 
> On 04/07/2024 10:17, Johannes Berg wrote:
>> Hi Anton,
>>
>> Thanks for taking a look! Also thanks for the other explanation.
>>
>> On Thu, 2024-07-04 at 08:21 +0100, anton.ivanov@cambridgegreys.com
>> wrote:
>>> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>>>
>>> UML vector drivers use ring buffer structures which map
>>> preallocated skbs onto mmsg vectors for use with sendmmsg
>>> and recvmmsg. They are designed around a single consumer,
>>> single producer pattern allowing simultaneous enqueue and
>>> dequeue.
>>
>> Right, I didn't grok that from just the code yesterday :)
>>
>>> Lock debugging with preemption showed possible races when
>>> locking the queue depth. This patch addresses this by
>>> removing extra locks while making queue depth inc/dec and
>>> access atomic.
>>
>> I don't think this is fully correct in SMP, and I think we should fix it
>> even if we don't have SMP (yet?)
>>
>> See, the other thing about spinlocks is that they serve as barriers, and
>> I don't think the atomics you use have "enough" of them (per the docs in
>> atomic_t.txt)?
>>
>> If you have concurrent producers and consumers, you really want the
>> producer to actually have fully prepared the data before it's visible to
>> the consumer, which means you need to have a full barrier before
>> incrementing queue_length.
> 
> Indeed. I will add that for v2.
> 
>>
>> But you're now incrementing it by just atomic_add(), and atomic_t.txt
>> says:
>>
>>   - RMW operations that have no return value are unordered;
>>
>> So I think the producer should have
>>
>>     /*
>>      * adding to queue_length makes the prepared buffers
>>      * visible to the consumer, ensure they're actually
>>      * completely written/visible before
>>      */
>>     smp_mb__before_atomic();
>>     atomic_add(...);
>>
>> Or so?
>>
>>
>>> +    atomic_sub(advance, &qi->queue_depth);
>>> -    if (qi->queue_depth == 0) {
>>> -        qi->head = 0;
>>> -        qi->tail = 0;
>>> -    }
>>> -    queue_depth = qi->queue_depth;
>>> -    spin_unlock(&qi->tail_lock);
>>> -    return queue_depth;
>>> +    return atomic_read(&qi->queue_depth);
>>
>> Or just use atomic_sub_return()? Though that also implies a barrier,
>> which I think isn't needed on the consumer side.
> 
> Let me have a look at it once again. IIRC you need only producer barriers.
> 
>>
>> However I think it's clearer to just remove the return value and make
>> this function void.
>>
>>>   static int vector_advancetail(struct vector_queue *qi, int advance)
>>>   {
>>> -    int queue_depth;
>>> -
>>>       qi->tail =
>>>           (qi->tail + advance)
>>>               % qi->max_depth;
>>> -    spin_lock(&qi->head_lock);
>>> -    qi->queue_depth += advance;
>>> -    queue_depth = qi->queue_depth;
>>> -    spin_unlock(&qi->head_lock);
>>> -    return queue_depth;
>>> +    atomic_add(advance, &qi->queue_depth);
>>> +    return atomic_read(&qi->queue_depth);
>>>   }
>>
>> Or maybe here instead of the barrier use atomic_add_return() which
>> implies a full barrier on both sides, but I don't think you need that,
>> the barrier before is enough?
>>
>>
>>> @@ -411,61 +386,58 @@ static int vector_send(struct vector_queue *qi)
>>>       int result = 0, send_len, queue_depth = qi->max_depth;
>>>       if (spin_trylock(&qi->head_lock)) {
>> [...]
>>> +        /* update queue_depth to current value */
>>> +        queue_depth = atomic_read(&qi->queue_depth);
>>> +        while (queue_depth > 0) {
>>
>> I think it'd be clearer to write this as
>>
>>     while ((queue_depth = atomic_read(...))) {
>>
>> and simply not modify the queue_depth to the return value of
>> consume_vector_skbs() here:
>>
>> [...]
>>
>>> +            if (result > 0) {
>>> +                queue_depth =
>>> +                    consume_vector_skbs(qi, result);
>>> +                /* This is equivalent to an TX IRQ.
>>> +                 * Restart the upper layers to feed us
>>> +                 * more packets.
>>>                    */
>>> -                if (result != send_len) {
>>> -                    vp->estats.tx_restart_queue++;
>>> -                    break;
>>> -                }
>>> +                if (result > vp->estats.tx_queue_max)
>>> +                    vp->estats.tx_queue_max = result;
>>> +                vp->estats.tx_queue_running_average =
>>> +                    (vp->estats.tx_queue_running_average + result) >> 1;
>>> +            }
>>> +            netif_wake_queue(qi->dev);
>>> +            /* if TX is busy, break out of the send loop,
>>> +             *  poll write IRQ will reschedule xmit for us
>>> +             */
>>> +            if (result != send_len) {
>>> +                vp->estats.tx_restart_queue++;
>>> +                break;
>>>               }
>>>           }
>>
>> The loop doesn't need queue_depth until it goes back to the beginning
>> anyway.
>>
>> (it probably also never even executes twice unless you actually have SMP
>> or preemption?)
> 
> It does. If half of the vector is at the end of the array which is used to
> imitate a ring buffer and the other half is at the beginning. Quite a common
> condition actually.
> 
> There is an extra issue there - stats. I need to double-check the locking when
> they are being fetched.

Same story - these need atomics. Otherwise we can potentially get the same ABBA
lock issue when they are being fetched.

> 
>>
>>> @@ -675,11 +647,13 @@ static void prep_queue_for_rx(struct vector_queue *qi)
>>>       struct vector_private *vp = netdev_priv(qi->dev);
>>>       struct mmsghdr *mmsg_vector = qi->mmsg_vector;
>>>       void **skbuff_vector = qi->skbuff_vector;
>>> -    int i;
>>> +    int i, queue_depth;
>>> +
>>> +    queue_depth = atomic_read(&qi->queue_depth);
>>> -    if (qi->queue_depth == 0)
>>> +    if (queue_depth == 0)
>>>           return;
>>> -    for (i = 0; i < qi->queue_depth; i++) {
>>> +    for (i = 0; i < queue_depth; i++) {
>>>           /* it is OK if allocation fails - recvmmsg with NULL data in
>>>            * iov argument still performs an RX, just drops the packet
>>>            * This allows us stop faffing around with a "drop buffer"
>>> @@ -689,7 +663,7 @@ static void prep_queue_for_rx(struct vector_queue *qi)
>>>           skbuff_vector++;
>>>           mmsg_vector++;
>>>       }
>>> -    qi->queue_depth = 0;
>>> +    atomic_set(&qi->queue_depth, 0);
>>
>> atomic_read() and then atomic_set() rather than atomic_sub() seems
>> questionable, but you didn't have locks here before so maybe this is
>> somehow logically never in parallel? But it is done in NAPI polling via
>> vector_mmsg_rx(), so not sure I understand.
>>
>>> @@ -987,7 +961,7 @@ static int vector_mmsg_rx(struct vector_private *vp, int budget)
>>>        * many do we need to prep the next time prep_queue_for_rx() is called.
>>>        */
>>> -    qi->queue_depth = packet_count;
>>> +    atomic_add(packet_count, &qi->queue_depth);
>>>       for (i = 0; i < packet_count; i++) {
>>>           skb = (*skbuff_vector);
>>
>> especially since here you _did_ convert to atomic_add().
>>
>> johannes
>>
>
Johannes Berg July 4, 2024, 9:54 a.m. UTC | #4
On Thu, 2024-07-04 at 10:45 +0100, Anton Ivanov wrote:

> > (it probably also never even executes twice unless you actually have SMP
> > or preemption?)
> 
> It does. If half of the vector is at the end of the array which is used to
> imitate a ring buffer and the other half is at the beginning. Quite a common
> condition actually.

Ah, right, I missed this bit:

                        /* Adjust vector size if wraparound */
                        if (send_len + qi->head > qi->max_depth)
                                send_len = qi->max_depth - qi->head;

> There is an extra issue there - stats. I need to double-check the locking when
> they are being fetched.

The memcpy() is an issue, but I think reading them individually should
be OK regardless?

But I suppose you could just take both locks there, hopefully nobody is
accessing them _really_ frequently? At least I think you can take both
locks since now no other path is taking both? But you'd need
spin_lock_bh().


johannes
Johannes Berg July 4, 2024, 9:55 a.m. UTC | #5
On Thu, 2024-07-04 at 10:52 +0100, Anton Ivanov wrote:
> > 
> > There is an extra issue there - stats. I need to double-check the locking when
> > they are being fetched.
> 
> Same story - these need atomics. Otherwise we can potentially get the same ABBA
> lock issue when they are being fetched.
> 

Email overlap ...

I don't think so? If no other path now uses _both_ locks, the stats path
can, it'll just be "AB" never "BA".

johannes
Johannes Berg July 4, 2024, 9:56 a.m. UTC | #6
On Thu, 2024-07-04 at 11:55 +0200, Johannes Berg wrote:
> On Thu, 2024-07-04 at 10:52 +0100, Anton Ivanov wrote:
> > > 
> > > There is an extra issue there - stats. I need to double-check the locking when
> > > they are being fetched.
> > 
> > Same story - these need atomics. Otherwise we can potentially get the same ABBA
> > lock issue when they are being fetched.
> > 
> 
> Email overlap ...
> 
> I don't think so? If no other path now uses _both_ locks, the stats path
> can, it'll just be "AB" never "BA".

Or just split into tx and rx stats, and use the correct lock separately?

johannes
Anton Ivanov July 4, 2024, 10:54 a.m. UTC | #7
On 04/07/2024 10:55, Johannes Berg wrote:
> On Thu, 2024-07-04 at 10:52 +0100, Anton Ivanov wrote:
>>>
>>> There is an extra issue there - stats. I need to double-check the locking when
>>> they are being fetched.
>>
>> Same story - these need atomics. Otherwise we can potentially get the same ABBA
>> lock issue when they are being fetched.
>>
> 
> Email overlap ...
> 
> I don't think so? If no other path now uses _both_ locks, the stats path
> can, it'll just be "AB" never "BA".

Good point.

> 
> johannes
> 
>
diff mbox series

Patch

diff --git a/arch/um/drivers/vector_kern.c b/arch/um/drivers/vector_kern.c
index dc2feae789cb..5c9bf458fe0a 100644
--- a/arch/um/drivers/vector_kern.c
+++ b/arch/um/drivers/vector_kern.c
@@ -22,6 +22,7 @@ 
 #include <linux/interrupt.h>
 #include <linux/firmware.h>
 #include <linux/fs.h>
+#include <asm/atomic.h>
 #include <uapi/linux/filter.h>
 #include <init.h>
 #include <irq_kern.h>
@@ -232,12 +233,6 @@  static int get_transport_options(struct arglist *def)
 
 static char *drop_buffer;
 
-/* Array backed queues optimized for bulk enqueue/dequeue and
- * 1:N (small values of N) or 1:1 enqueuer/dequeuer ratios.
- * For more details and full design rationale see
- * http://foswiki.cambridgegreys.com/Main/EatYourTailAndEnjoyIt
- */
-
 
 /*
  * Advance the mmsg queue head by n = advance. Resets the queue to
@@ -247,27 +242,14 @@  static char *drop_buffer;
 
 static int vector_advancehead(struct vector_queue *qi, int advance)
 {
-	int queue_depth;
-
 	qi->head =
 		(qi->head + advance)
 			% qi->max_depth;
 
 
-	spin_lock(&qi->tail_lock);
-	qi->queue_depth -= advance;
-
-	/* we are at 0, use this to
-	 * reset head and tail so we can use max size vectors
-	 */
+	atomic_sub(advance, &qi->queue_depth);
 
-	if (qi->queue_depth == 0) {
-		qi->head = 0;
-		qi->tail = 0;
-	}
-	queue_depth = qi->queue_depth;
-	spin_unlock(&qi->tail_lock);
-	return queue_depth;
+	return atomic_read(&qi->queue_depth);
 }
 
 /*	Advance the queue tail by n = advance.
@@ -277,16 +259,11 @@  static int vector_advancehead(struct vector_queue *qi, int advance)
 
 static int vector_advancetail(struct vector_queue *qi, int advance)
 {
-	int queue_depth;
-
 	qi->tail =
 		(qi->tail + advance)
 			% qi->max_depth;
-	spin_lock(&qi->head_lock);
-	qi->queue_depth += advance;
-	queue_depth = qi->queue_depth;
-	spin_unlock(&qi->head_lock);
-	return queue_depth;
+	atomic_add(advance, &qi->queue_depth);
+	return atomic_read(&qi->queue_depth);
 }
 
 static int prep_msg(struct vector_private *vp,
@@ -339,9 +316,7 @@  static int vector_enqueue(struct vector_queue *qi, struct sk_buff *skb)
 	int iov_count;
 
 	spin_lock(&qi->tail_lock);
-	spin_lock(&qi->head_lock);
-	queue_depth = qi->queue_depth;
-	spin_unlock(&qi->head_lock);
+	queue_depth = atomic_read(&qi->queue_depth);
 
 	if (skb)
 		packet_len = skb->len;
@@ -411,61 +386,58 @@  static int vector_send(struct vector_queue *qi)
 	int result = 0, send_len, queue_depth = qi->max_depth;
 
 	if (spin_trylock(&qi->head_lock)) {
-		if (spin_trylock(&qi->tail_lock)) {
-			/* update queue_depth to current value */
-			queue_depth = qi->queue_depth;
-			spin_unlock(&qi->tail_lock);
-			while (queue_depth > 0) {
-				/* Calculate the start of the vector */
-				send_len = queue_depth;
-				send_from = qi->mmsg_vector;
-				send_from += qi->head;
-				/* Adjust vector size if wraparound */
-				if (send_len + qi->head > qi->max_depth)
-					send_len = qi->max_depth - qi->head;
-				/* Try to TX as many packets as possible */
-				if (send_len > 0) {
-					result = uml_vector_sendmmsg(
-						 vp->fds->tx_fd,
-						 send_from,
-						 send_len,
-						 0
-					);
-					vp->in_write_poll =
-						(result != send_len);
-				}
-				/* For some of the sendmmsg error scenarios
-				 * we may end being unsure in the TX success
-				 * for all packets. It is safer to declare
-				 * them all TX-ed and blame the network.
-				 */
-				if (result < 0) {
-					if (net_ratelimit())
-						netdev_err(vp->dev, "sendmmsg err=%i\n",
-							result);
-					vp->in_error = true;
-					result = send_len;
-				}
-				if (result > 0) {
-					queue_depth =
-						consume_vector_skbs(qi, result);
-					/* This is equivalent to an TX IRQ.
-					 * Restart the upper layers to feed us
-					 * more packets.
-					 */
-					if (result > vp->estats.tx_queue_max)
-						vp->estats.tx_queue_max = result;
-					vp->estats.tx_queue_running_average =
-						(vp->estats.tx_queue_running_average + result) >> 1;
-				}
-				netif_wake_queue(qi->dev);
-				/* if TX is busy, break out of the send loop,
-				 *  poll write IRQ will reschedule xmit for us
+		/* update queue_depth to current value */
+		queue_depth = atomic_read(&qi->queue_depth);
+		while (queue_depth > 0) {
+			/* Calculate the start of the vector */
+			send_len = queue_depth;
+			send_from = qi->mmsg_vector;
+			send_from += qi->head;
+			/* Adjust vector size if wraparound */
+			if (send_len + qi->head > qi->max_depth)
+				send_len = qi->max_depth - qi->head;
+			/* Try to TX as many packets as possible */
+			if (send_len > 0) {
+				result = uml_vector_sendmmsg(
+					 vp->fds->tx_fd,
+					 send_from,
+					 send_len,
+					 0
+				);
+				vp->in_write_poll =
+					(result != send_len);
+			}
+			/* For some of the sendmmsg error scenarios
+			 * we may end being unsure in the TX success
+			 * for all packets. It is safer to declare
+			 * them all TX-ed and blame the network.
+			 */
+			if (result < 0) {
+				if (net_ratelimit())
+					netdev_err(vp->dev, "sendmmsg err=%i\n",
+						result);
+				vp->in_error = true;
+				result = send_len;
+			}
+			if (result > 0) {
+				queue_depth =
+					consume_vector_skbs(qi, result);
+				/* This is equivalent to an TX IRQ.
+				 * Restart the upper layers to feed us
+				 * more packets.
 				 */
-				if (result != send_len) {
-					vp->estats.tx_restart_queue++;
-					break;
-				}
+				if (result > vp->estats.tx_queue_max)
+					vp->estats.tx_queue_max = result;
+				vp->estats.tx_queue_running_average =
+					(vp->estats.tx_queue_running_average + result) >> 1;
+			}
+			netif_wake_queue(qi->dev);
+			/* if TX is busy, break out of the send loop,
+			 *  poll write IRQ will reschedule xmit for us
+			 */
+			if (result != send_len) {
+				vp->estats.tx_restart_queue++;
+				break;
 			}
 		}
 		spin_unlock(&qi->head_lock);
@@ -589,7 +561,7 @@  static struct vector_queue *create_queue(
 	}
 	spin_lock_init(&result->head_lock);
 	spin_lock_init(&result->tail_lock);
-	result->queue_depth = 0;
+	atomic_set(&result->queue_depth, 0);
 	result->head = 0;
 	result->tail = 0;
 	return result;
@@ -675,11 +647,13 @@  static void prep_queue_for_rx(struct vector_queue *qi)
 	struct vector_private *vp = netdev_priv(qi->dev);
 	struct mmsghdr *mmsg_vector = qi->mmsg_vector;
 	void **skbuff_vector = qi->skbuff_vector;
-	int i;
+	int i, queue_depth;
+
+	queue_depth = atomic_read(&qi->queue_depth);
 
-	if (qi->queue_depth == 0)
+	if (queue_depth == 0)
 		return;
-	for (i = 0; i < qi->queue_depth; i++) {
+	for (i = 0; i < queue_depth; i++) {
 		/* it is OK if allocation fails - recvmmsg with NULL data in
 		 * iov argument still performs an RX, just drops the packet
 		 * This allows us stop faffing around with a "drop buffer"
@@ -689,7 +663,7 @@  static void prep_queue_for_rx(struct vector_queue *qi)
 		skbuff_vector++;
 		mmsg_vector++;
 	}
-	qi->queue_depth = 0;
+	atomic_set(&qi->queue_depth, 0);
 }
 
 static struct vector_device *find_device(int n)
@@ -987,7 +961,7 @@  static int vector_mmsg_rx(struct vector_private *vp, int budget)
 	 * many do we need to prep the next time prep_queue_for_rx() is called.
 	 */
 
-	qi->queue_depth = packet_count;
+	atomic_add(packet_count, &qi->queue_depth);
 
 	for (i = 0; i < packet_count; i++) {
 		skb = (*skbuff_vector);
@@ -1234,7 +1208,7 @@  static int vector_net_open(struct net_device *dev)
 			vp->rx_header_size,
 			MAX_IOV_SIZE
 		);
-		vp->rx_queue->queue_depth = get_depth(vp->parsed);
+		atomic_set(&vp->rx_queue->queue_depth, get_depth(vp->parsed));
 	} else {
 		vp->header_rxbuffer = kmalloc(
 			vp->rx_header_size,
diff --git a/arch/um/drivers/vector_kern.h b/arch/um/drivers/vector_kern.h
index 2a1fa8e0f3e1..1b8de936056d 100644
--- a/arch/um/drivers/vector_kern.h
+++ b/arch/um/drivers/vector_kern.h
@@ -14,6 +14,7 @@ 
 #include <linux/ctype.h>
 #include <linux/workqueue.h>
 #include <linux/interrupt.h>
+#include <asm/atomic.h>
 
 #include "vector_user.h"
 
@@ -44,7 +45,8 @@  struct vector_queue {
 	struct net_device *dev;
 	spinlock_t head_lock;
 	spinlock_t tail_lock;
-	int queue_depth, head, tail, max_depth, max_iov_frags;
+	atomic_t queue_depth;
+	int head, tail, max_depth, max_iov_frags;
 	short options;
 };