Message ID | 20220728063120.2867508-4-npiggin@gmail.com (mailing list archive) |
---|---|
State | Changes Requested |
Headers | show |
Series | powerpc: alternate queued spinlock implementation | expand |
On Thu, 2022-07-28 at 16:31 +1000, Nicholas Piggin wrote: <snip> > > +/* > + * Bitfields in the atomic value: > + * > + * 0: locked bit > + * 16-31: tail cpu (+1) > + */ > +#define _Q_SET_MASK(type) (((1U << _Q_ ## type ## _BITS) - 1)\ > + << _Q_ ## type ## _OFFSET) > +#define _Q_LOCKED_OFFSET 0 > +#define _Q_LOCKED_BITS 1 > +#define _Q_LOCKED_MASK _Q_SET_MASK(LOCKED) > +#define _Q_LOCKED_VAL (1U << _Q_LOCKED_OFFSET) > + > +#define _Q_TAIL_CPU_OFFSET 16 > +#define _Q_TAIL_CPU_BITS (32 - _Q_TAIL_CPU_OFFSET) > +#define _Q_TAIL_CPU_MASK _Q_SET_MASK(TAIL_CPU) > + Just to state the obvious this is: #define _Q_LOCKED_OFFSET 0 #define _Q_LOCKED_BITS 1 #define _Q_LOCKED_MASK 0x00000001 #define _Q_LOCKED_VAL 1 #define _Q_TAIL_CPU_OFFSET 16 #define _Q_TAIL_CPU_BITS 16 #define _Q_TAIL_CPU_MASK 0xffff0000 > +#if CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS) > +#error "qspinlock does not support such large CONFIG_NR_CPUS" > +#endif > + > #endif /* _ASM_POWERPC_QSPINLOCK_TYPES_H */ > diff --git a/arch/powerpc/lib/qspinlock.c b/arch/powerpc/lib/qspinlock.c > index 8dbce99a373c..5ebb88d95636 100644 > --- a/arch/powerpc/lib/qspinlock.c > +++ b/arch/powerpc/lib/qspinlock.c > @@ -1,12 +1,172 @@ > // SPDX-License-Identifier: GPL-2.0-or-later > +#include <linux/atomic.h> > +#include <linux/bug.h> > +#include <linux/compiler.h> > #include <linux/export.h> > -#include <linux/processor.h> > +#include <linux/percpu.h> > +#include <linux/smp.h> > #include <asm/qspinlock.h> > > -void queued_spin_lock_slowpath(struct qspinlock *lock) > +#define MAX_NODES 4 > + > +struct qnode { > + struct qnode *next; > + struct qspinlock *lock; > + u8 locked; /* 1 if lock acquired */ > +}; > + > +struct qnodes { > + int count; > + struct qnode nodes[MAX_NODES]; > +}; I think it could be worth commenting why qnodes::count instead _Q_TAIL_IDX_OFFSET. > + > +static DEFINE_PER_CPU_ALIGNED(struct qnodes, qnodes); > + > +static inline int encode_tail_cpu(void) I think the generic version that takes smp_processor_id() as a parameter is clearer - at least with this function name. > +{ > + return (smp_processor_id() + 1) << _Q_TAIL_CPU_OFFSET; > +} > + > +static inline int get_tail_cpu(int val) It seems like there should be a "decode" function to pair up with the "encode" function. > +{ > + return (val >> _Q_TAIL_CPU_OFFSET) - 1; > +} > + > +/* Take the lock by setting the bit, no other CPUs may concurrently lock it. */ Does that comment mean it is not necessary to use an atomic_or here? > +static __always_inline void lock_set_locked(struct qspinlock *lock) nit: could just be called set_locked() > +{ > + atomic_or(_Q_LOCKED_VAL, &lock->val); > + __atomic_acquire_fence(); > +} > + > +/* Take lock, clearing tail, cmpxchg with val (which must not be locked) */ > +static __always_inline int trylock_clear_tail_cpu(struct qspinlock *lock, int val) > +{ > + int newval = _Q_LOCKED_VAL; > + > + if (atomic_cmpxchg_acquire(&lock->val, val, newval) == val) > + return 1; > + else > + return 0; same optional style nit: return (atomic_cmpxchg_acquire(&lock->val, val, newval) == val); > +} > + > +/* > + * Publish our tail, replacing previous tail. Return previous value. > + * > + * This provides a release barrier for publishing node, and an acquire barrier > + * for getting the old node. > + */ > +static __always_inline int publish_tail_cpu(struct qspinlock *lock, int tail) Did you change from the xchg_tail() name in the generic version because of the release and acquire barriers this provides? Does "publish" generally imply the old value will be returned? > { > - while (!queued_spin_trylock(lock)) > + for (;;) { > + int val = atomic_read(&lock->val); > + int newval = (val & ~_Q_TAIL_CPU_MASK) | tail; > + int old; > + > + old = atomic_cmpxchg(&lock->val, val, newval); > + if (old == val) > + return old; > + } > +} > + > +static struct qnode *get_tail_qnode(struct qspinlock *lock, int val) > +{ > + int cpu = get_tail_cpu(val); > + struct qnodes *qnodesp = per_cpu_ptr(&qnodes, cpu); > + int idx; > + > + for (idx = 0; idx < MAX_NODES; idx++) { > + struct qnode *qnode = &qnodesp->nodes[idx]; > + if (qnode->lock == lock) > + return qnode; > + } In case anyone else is confused by this, Nick explained each cpu can only queue on a unique spinlock once regardless of "idx" level. > + > + BUG(); > +} > + > +static inline void queued_spin_lock_mcs_queue(struct qspinlock *lock) > +{ > + struct qnodes *qnodesp; > + struct qnode *next, *node; > + int val, old, tail; > + int idx; > + > + BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS)); > + > + qnodesp = this_cpu_ptr(&qnodes); > + if (unlikely(qnodesp->count == MAX_NODES)) { The comparison is >= in the generic, I guess we've no nested NMI so this is safe? > + while (!queued_spin_trylock(lock)) > + cpu_relax(); > + return; > + } > + > + idx = qnodesp->count++; > + /* > + * Ensure that we increment the head node->count before initialising > + * the actual node. If the compiler is kind enough to reorder these > + * stores, then an IRQ could overwrite our assignments. > + */ > + barrier(); > + node = &qnodesp->nodes[idx]; > + node->next = NULL; > + node->lock = lock; > + node->locked = 0; > + > + tail = encode_tail_cpu(); > + > + old = publish_tail_cpu(lock, tail); > + > + /* > + * If there was a previous node; link it and wait until reaching the > + * head of the waitqueue. > + */ > + if (old & _Q_TAIL_CPU_MASK) { > + struct qnode *prev = get_tail_qnode(lock, old); > + > + /* Link @node into the waitqueue. */ > + WRITE_ONCE(prev->next, node); > + > + /* Wait for mcs node lock to be released */ > + while (!node->locked) > + cpu_relax(); > + > + smp_rmb(); /* acquire barrier for the mcs lock */ > + } > + > + /* We're at the head of the waitqueue, wait for the lock. */ > + while ((val = atomic_read(&lock->val)) & _Q_LOCKED_VAL) > + cpu_relax(); > + > + /* If we're the last queued, must clean up the tail. */ > + if ((val & _Q_TAIL_CPU_MASK) == tail) { > + if (trylock_clear_tail_cpu(lock, val)) > + goto release; > + /* Another waiter must have enqueued */ > + } > + > + /* We must be the owner, just set the lock bit and acquire */ > + lock_set_locked(lock); > + > + /* contended path; must wait for next != NULL (MCS protocol) */ > + while (!(next = READ_ONCE(node->next))) > cpu_relax(); > + > + /* > + * Unlock the next mcs waiter node. Release barrier is not required > + * here because the acquirer is only accessing the lock word, and > + * the acquire barrier we took the lock with orders that update vs > + * this store to locked. The corresponding barrier is the smp_rmb() > + * acquire barrier for mcs lock, above. > + */ > + WRITE_ONCE(next->locked, 1); > + > +release: > + qnodesp->count--; /* release the node */ > +} > + > +void queued_spin_lock_slowpath(struct qspinlock *lock) > +{ > + queued_spin_lock_mcs_queue(lock); > } > EXPORT_SYMBOL(queued_spin_lock_slowpath); >
On Thu, 2022-07-28 at 16:31 +1000, Nicholas Piggin wrote: <snip> [resend as utf-8, not utf-7] > > +/* > + * Bitfields in the atomic value: > + * > + * 0: locked bit > + * 16-31: tail cpu (+1) > + */ > +#define _Q_SET_MASK(type) (((1U << _Q_ ## type ## _BITS) - 1)\ > + << _Q_ ## type ## _OFFSET) > +#define _Q_LOCKED_OFFSET 0 > +#define _Q_LOCKED_BITS 1 > +#define _Q_LOCKED_MASK _Q_SET_MASK(LOCKED) > +#define _Q_LOCKED_VAL (1U << _Q_LOCKED_OFFSET) > + > +#define _Q_TAIL_CPU_OFFSET 16 > +#define _Q_TAIL_CPU_BITS (32 - _Q_TAIL_CPU_OFFSET) > +#define _Q_TAIL_CPU_MASK _Q_SET_MASK(TAIL_CPU) > + Just to state the obvious this is: #define _Q_LOCKED_OFFSET 0 #define _Q_LOCKED_BITS 1 #define _Q_LOCKED_MASK 0x00000001 #define _Q_LOCKED_VAL 1 #define _Q_TAIL_CPU_OFFSET 16 #define _Q_TAIL_CPU_BITS 16 #define _Q_TAIL_CPU_MASK 0xffff0000 > +#if CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS) > +#error "qspinlock does not support such large CONFIG_NR_CPUS" > +#endif > + > #endif /* _ASM_POWERPC_QSPINLOCK_TYPES_H */ > diff --git a/arch/powerpc/lib/qspinlock.c b/arch/powerpc/lib/qspinlock.c > index 8dbce99a373c..5ebb88d95636 100644 > --- a/arch/powerpc/lib/qspinlock.c > +++ b/arch/powerpc/lib/qspinlock.c > @@ -1,12 +1,172 @@ > // SPDX-License-Identifier: GPL-2.0-or-later > +#include <linux/atomic.h> > +#include <linux/bug.h> > +#include <linux/compiler.h> > #include <linux/export.h> > -#include <linux/processor.h> > +#include <linux/percpu.h> > +#include <linux/smp.h> > #include <asm/qspinlock.h> > > -void queued_spin_lock_slowpath(struct qspinlock *lock) > +#define MAX_NODES 4 > + > +struct qnode { > + struct qnode *next; > + struct qspinlock *lock; > + u8 locked; /* 1 if lock acquired */ > +}; > + > +struct qnodes { > + int count; > + struct qnode nodes[MAX_NODES]; > +}; I think it could be worth commenting why qnodes::count instead _Q_TAIL_IDX_OFFSET. > + > +static DEFINE_PER_CPU_ALIGNED(struct qnodes, qnodes); > + > +static inline int encode_tail_cpu(void) I think the generic version that takes smp_processor_id() as a parameter is clearer - at least with this function name. > +{ > + return (smp_processor_id() + 1) << _Q_TAIL_CPU_OFFSET; > +} > + > +static inline int get_tail_cpu(int val) It seems like there should be a "decode" function to pair up with the "encode" function. > +{ > + return (val >> _Q_TAIL_CPU_OFFSET) - 1; > +} > + > +/* Take the lock by setting the bit, no other CPUs may concurrently lock it. */ Does that comment mean it is not necessary to use an atomic_or here? > +static __always_inline void lock_set_locked(struct qspinlock *lock) nit: could just be called set_locked() > +{ > + atomic_or(_Q_LOCKED_VAL, &lock->val); > + __atomic_acquire_fence(); > +} > + > +/* Take lock, clearing tail, cmpxchg with val (which must not be locked) */ > +static __always_inline int trylock_clear_tail_cpu(struct qspinlock *lock, int val) > +{ > + int newval = _Q_LOCKED_VAL; > + > + if (atomic_cmpxchg_acquire(&lock->val, val, newval) == val) > + return 1; > + else > + return 0; same optional style nit: return (atomic_cmpxchg_acquire(&lock->val, val, newval) == val); > +} > + > +/* > + * Publish our tail, replacing previous tail. Return previous value. > + * > + * This provides a release barrier for publishing node, and an acquire barrier > + * for getting the old node. > + */ > +static __always_inline int publish_tail_cpu(struct qspinlock *lock, int tail) Did you change from the xchg_tail() name in the generic version because of the release and acquire barriers this provides? Does "publish" generally imply the old value will be returned? > { > - while (!queued_spin_trylock(lock)) > + for (;;) { > + int val = atomic_read(&lock->val); > + int newval = (val & ~_Q_TAIL_CPU_MASK) | tail; > + int old; > + > + old = atomic_cmpxchg(&lock->val, val, newval); > + if (old == val) > + return old; > + } > +} > + > +static struct qnode *get_tail_qnode(struct qspinlock *lock, int val) > +{ > + int cpu = get_tail_cpu(val); > + struct qnodes *qnodesp = per_cpu_ptr(&qnodes, cpu); > + int idx; > + > + for (idx = 0; idx < MAX_NODES; idx++) { > + struct qnode *qnode = &qnodesp->nodes[idx]; > + if (qnode->lock == lock) > + return qnode; > + } In case anyone else is confused by this, Nick explained each cpu can only queue on a unique spinlock once regardless of "idx" level. > + > + BUG(); > +} > + > +static inline void queued_spin_lock_mcs_queue(struct qspinlock *lock) > +{ > + struct qnodes *qnodesp; > + struct qnode *next, *node; > + int val, old, tail; > + int idx; > + > + BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS)); > + > + qnodesp = this_cpu_ptr(&qnodes); > + if (unlikely(qnodesp->count == MAX_NODES)) { The comparison is >= in the generic, I guess we've no nested NMI so this is safe? > + while (!queued_spin_trylock(lock)) > + cpu_relax(); > + return; > + } > + > + idx = qnodesp->count++; > + /* > + * Ensure that we increment the head node->count before initialising > + * the actual node. If the compiler is kind enough to reorder these > + * stores, then an IRQ could overwrite our assignments. > + */ > + barrier(); > + node = &qnodesp->nodes[idx]; > + node->next = NULL; > + node->lock = lock; > + node->locked = 0; > + > + tail = encode_tail_cpu(); > + > + old = publish_tail_cpu(lock, tail); > + > + /* > + * If there was a previous node; link it and wait until reaching the > + * head of the waitqueue. > + */ > + if (old & _Q_TAIL_CPU_MASK) { > + struct qnode *prev = get_tail_qnode(lock, old); > + > + /* Link @node into the waitqueue. */ > + WRITE_ONCE(prev->next, node); > + > + /* Wait for mcs node lock to be released */ > + while (!node->locked) > + cpu_relax(); > + > + smp_rmb(); /* acquire barrier for the mcs lock */ > + } > + > + /* We're at the head of the waitqueue, wait for the lock. */ > + while ((val = atomic_read(&lock->val)) & _Q_LOCKED_VAL) > + cpu_relax(); > + > + /* If we're the last queued, must clean up the tail. */ > + if ((val & _Q_TAIL_CPU_MASK) == tail) { > + if (trylock_clear_tail_cpu(lock, val)) > + goto release; > + /* Another waiter must have enqueued */ > + } > + > + /* We must be the owner, just set the lock bit and acquire */ > + lock_set_locked(lock); > + > + /* contended path; must wait for next != NULL (MCS protocol) */ > + while (!(next = READ_ONCE(node->next))) > cpu_relax(); > + > + /* > + * Unlock the next mcs waiter node. Release barrier is not required > + * here because the acquirer is only accessing the lock word, and > + * the acquire barrier we took the lock with orders that update vs > + * this store to locked. The corresponding barrier is the smp_rmb() > + * acquire barrier for mcs lock, above. > + */ > + WRITE_ONCE(next->locked, 1); > + > +release: > + qnodesp->count--; /* release the node */ > +} > + > +void queued_spin_lock_slowpath(struct qspinlock *lock) > +{ > + queued_spin_lock_mcs_queue(lock); > } > EXPORT_SYMBOL(queued_spin_lock_slowpath); >
On Thu Nov 10, 2022 at 10:36 AM AEST, Jordan Niethe wrote: > On Thu, 2022-07-28 at 16:31 +1000, Nicholas Piggin wrote: > <snip> > [resend as utf-8, not utf-7] > > > > +/* > > + * Bitfields in the atomic value: > > + * > > + * 0: locked bit > > + * 16-31: tail cpu (+1) > > + */ > > +#define _Q_SET_MASK(type) (((1U << _Q_ ## type ## _BITS) - 1)\ > > + << _Q_ ## type ## _OFFSET) > > +#define _Q_LOCKED_OFFSET 0 > > +#define _Q_LOCKED_BITS 1 > > +#define _Q_LOCKED_MASK _Q_SET_MASK(LOCKED) > > +#define _Q_LOCKED_VAL (1U << _Q_LOCKED_OFFSET) > > + > > +#define _Q_TAIL_CPU_OFFSET 16 > > +#define _Q_TAIL_CPU_BITS (32 - _Q_TAIL_CPU_OFFSET) > > +#define _Q_TAIL_CPU_MASK _Q_SET_MASK(TAIL_CPU) > > + > > Just to state the obvious this is: > > #define _Q_LOCKED_OFFSET 0 > #define _Q_LOCKED_BITS 1 > #define _Q_LOCKED_MASK 0x00000001 > #define _Q_LOCKED_VAL 1 > > #define _Q_TAIL_CPU_OFFSET 16 > #define _Q_TAIL_CPU_BITS 16 > #define _Q_TAIL_CPU_MASK 0xffff0000 Yeah. I'm wondering if that's a better style in the first place. Generic qspinlock this can chance so there's slightly more reason to do it that way. > > +#if CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS) > > +#error "qspinlock does not support such large CONFIG_NR_CPUS" > > +#endif > > + > > #endif /* _ASM_POWERPC_QSPINLOCK_TYPES_H */ > > diff --git a/arch/powerpc/lib/qspinlock.c b/arch/powerpc/lib/qspinlock.c > > index 8dbce99a373c..5ebb88d95636 100644 > > --- a/arch/powerpc/lib/qspinlock.c > > +++ b/arch/powerpc/lib/qspinlock.c > > @@ -1,12 +1,172 @@ > > // SPDX-License-Identifier: GPL-2.0-or-later > > +#include <linux/atomic.h> > > +#include <linux/bug.h> > > +#include <linux/compiler.h> > > #include <linux/export.h> > > -#include <linux/processor.h> > > +#include <linux/percpu.h> > > +#include <linux/smp.h> > > #include <asm/qspinlock.h> > > > > -void queued_spin_lock_slowpath(struct qspinlock *lock) > > +#define MAX_NODES 4 > > + > > +struct qnode { > > + struct qnode *next; > > + struct qspinlock *lock; > > + u8 locked; /* 1 if lock acquired */ > > +}; > > + > > +struct qnodes { > > + int count; > > + struct qnode nodes[MAX_NODES]; > > +}; > > I think it could be worth commenting why qnodes::count instead _Q_TAIL_IDX_OFFSET. I wasn't sure what you meant by this. > > + > > +static DEFINE_PER_CPU_ALIGNED(struct qnodes, qnodes); > > + > > +static inline int encode_tail_cpu(void) > > I think the generic version that takes smp_processor_id() as a parameter is clearer - at least with this function name. Agree. > > +{ > > + return (smp_processor_id() + 1) << _Q_TAIL_CPU_OFFSET; > > +} > > + > > +static inline int get_tail_cpu(int val) > > It seems like there should be a "decode" function to pair up with the "encode" function. Agree. > > +{ > > + return (val >> _Q_TAIL_CPU_OFFSET) - 1; > > +} > > + > > +/* Take the lock by setting the bit, no other CPUs may concurrently lock it. */ > > Does that comment mean it is not necessary to use an atomic_or here? No, only that it can't be locked. It can still be modified by another queuer. > > +static __always_inline void lock_set_locked(struct qspinlock *lock) > > nit: could just be called set_locked() Yep. > > +{ > > + atomic_or(_Q_LOCKED_VAL, &lock->val); > > + __atomic_acquire_fence(); > > +} > > + > > +/* Take lock, clearing tail, cmpxchg with val (which must not be locked) */ > > +static __always_inline int trylock_clear_tail_cpu(struct qspinlock *lock, int val) > > +{ > > + int newval = _Q_LOCKED_VAL; > > + > > + if (atomic_cmpxchg_acquire(&lock->val, val, newval) == val) > > + return 1; > > + else > > + return 0; > > same optional style nit: return (atomic_cmpxchg_acquire(&lock->val, val, newval) == val); Am thinking about it :) > > +} > > + > > +/* > > + * Publish our tail, replacing previous tail. Return previous value. > > + * > > + * This provides a release barrier for publishing node, and an acquire barrier > > + * for getting the old node. > > + */ > > +static __always_inline int publish_tail_cpu(struct qspinlock *lock, int tail) > > Did you change from the xchg_tail() name in the generic version because of the release and acquire barriers this provides? > Does "publish" generally imply the old value will be returned? Yes publish I thought is a bit more obvious that's where it becomes visible to other CPUs. It doesn't imply return, but I thought those semantis are the self-documenting part. > > > { > > - while (!queued_spin_trylock(lock)) > > + for (;;) { > > + int val = atomic_read(&lock->val); > > + int newval = (val & ~_Q_TAIL_CPU_MASK) | tail; > > + int old; > > + > > + old = atomic_cmpxchg(&lock->val, val, newval); > > + if (old == val) > > + return old; > > + } > > +} > > + > > +static struct qnode *get_tail_qnode(struct qspinlock *lock, int val) > > +{ > > + int cpu = get_tail_cpu(val); > > + struct qnodes *qnodesp = per_cpu_ptr(&qnodes, cpu); > > + int idx; > > + > > + for (idx = 0; idx < MAX_NODES; idx++) { > > + struct qnode *qnode = &qnodesp->nodes[idx]; > > + if (qnode->lock == lock) > > + return qnode; > > + } > > In case anyone else is confused by this, Nick explained each cpu can only queue on a unique spinlock once regardless of "idx" level. > > > + > > + BUG(); > > +} > > + > > +static inline void queued_spin_lock_mcs_queue(struct qspinlock *lock) > > +{ > > + struct qnodes *qnodesp; > > + struct qnode *next, *node; > > + int val, old, tail; > > + int idx; > > + > > + BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS)); > > + > > + qnodesp = this_cpu_ptr(&qnodes); > > + if (unlikely(qnodesp->count == MAX_NODES)) { > > The comparison is >= in the generic, I guess we've no nested NMI so this is safe? No... we could have nested NMI so this is wrong, good catch. Thanks, Nick
diff --git a/arch/powerpc/include/asm/qspinlock.h b/arch/powerpc/include/asm/qspinlock.h index cb2b4f91e976..f06117aa60e1 100644 --- a/arch/powerpc/include/asm/qspinlock.h +++ b/arch/powerpc/include/asm/qspinlock.h @@ -18,12 +18,12 @@ static __always_inline int queued_spin_value_unlocked(struct qspinlock lock) static __always_inline int queued_spin_is_contended(struct qspinlock *lock) { - return 0; + return !!(atomic_read(&lock->val) & _Q_TAIL_CPU_MASK); } static __always_inline int queued_spin_trylock(struct qspinlock *lock) { - if (atomic_cmpxchg_acquire(&lock->val, 0, 1) == 0) + if (atomic_cmpxchg_acquire(&lock->val, 0, _Q_LOCKED_VAL) == 0) return 1; return 0; } @@ -38,7 +38,11 @@ static __always_inline void queued_spin_lock(struct qspinlock *lock) static inline void queued_spin_unlock(struct qspinlock *lock) { - atomic_set_release(&lock->val, 0); + for (;;) { + int val = atomic_read(&lock->val); + if (atomic_cmpxchg_release(&lock->val, val, val & ~_Q_LOCKED_VAL) == val) + return; + } } #define arch_spin_is_locked(l) queued_spin_is_locked(l) diff --git a/arch/powerpc/include/asm/qspinlock_types.h b/arch/powerpc/include/asm/qspinlock_types.h index 59606bc0c774..9630e714c70d 100644 --- a/arch/powerpc/include/asm/qspinlock_types.h +++ b/arch/powerpc/include/asm/qspinlock_types.h @@ -10,4 +10,25 @@ typedef struct qspinlock { #define __ARCH_SPIN_LOCK_UNLOCKED { .val = ATOMIC_INIT(0) } +/* + * Bitfields in the atomic value: + * + * 0: locked bit + * 16-31: tail cpu (+1) + */ +#define _Q_SET_MASK(type) (((1U << _Q_ ## type ## _BITS) - 1)\ + << _Q_ ## type ## _OFFSET) +#define _Q_LOCKED_OFFSET 0 +#define _Q_LOCKED_BITS 1 +#define _Q_LOCKED_MASK _Q_SET_MASK(LOCKED) +#define _Q_LOCKED_VAL (1U << _Q_LOCKED_OFFSET) + +#define _Q_TAIL_CPU_OFFSET 16 +#define _Q_TAIL_CPU_BITS (32 - _Q_TAIL_CPU_OFFSET) +#define _Q_TAIL_CPU_MASK _Q_SET_MASK(TAIL_CPU) + +#if CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS) +#error "qspinlock does not support such large CONFIG_NR_CPUS" +#endif + #endif /* _ASM_POWERPC_QSPINLOCK_TYPES_H */ diff --git a/arch/powerpc/lib/qspinlock.c b/arch/powerpc/lib/qspinlock.c index 8dbce99a373c..5ebb88d95636 100644 --- a/arch/powerpc/lib/qspinlock.c +++ b/arch/powerpc/lib/qspinlock.c @@ -1,12 +1,172 @@ // SPDX-License-Identifier: GPL-2.0-or-later +#include <linux/atomic.h> +#include <linux/bug.h> +#include <linux/compiler.h> #include <linux/export.h> -#include <linux/processor.h> +#include <linux/percpu.h> +#include <linux/smp.h> #include <asm/qspinlock.h> -void queued_spin_lock_slowpath(struct qspinlock *lock) +#define MAX_NODES 4 + +struct qnode { + struct qnode *next; + struct qspinlock *lock; + u8 locked; /* 1 if lock acquired */ +}; + +struct qnodes { + int count; + struct qnode nodes[MAX_NODES]; +}; + +static DEFINE_PER_CPU_ALIGNED(struct qnodes, qnodes); + +static inline int encode_tail_cpu(void) +{ + return (smp_processor_id() + 1) << _Q_TAIL_CPU_OFFSET; +} + +static inline int get_tail_cpu(int val) +{ + return (val >> _Q_TAIL_CPU_OFFSET) - 1; +} + +/* Take the lock by setting the bit, no other CPUs may concurrently lock it. */ +static __always_inline void lock_set_locked(struct qspinlock *lock) +{ + atomic_or(_Q_LOCKED_VAL, &lock->val); + __atomic_acquire_fence(); +} + +/* Take lock, clearing tail, cmpxchg with val (which must not be locked) */ +static __always_inline int trylock_clear_tail_cpu(struct qspinlock *lock, int val) +{ + int newval = _Q_LOCKED_VAL; + + if (atomic_cmpxchg_acquire(&lock->val, val, newval) == val) + return 1; + else + return 0; +} + +/* + * Publish our tail, replacing previous tail. Return previous value. + * + * This provides a release barrier for publishing node, and an acquire barrier + * for getting the old node. + */ +static __always_inline int publish_tail_cpu(struct qspinlock *lock, int tail) { - while (!queued_spin_trylock(lock)) + for (;;) { + int val = atomic_read(&lock->val); + int newval = (val & ~_Q_TAIL_CPU_MASK) | tail; + int old; + + old = atomic_cmpxchg(&lock->val, val, newval); + if (old == val) + return old; + } +} + +static struct qnode *get_tail_qnode(struct qspinlock *lock, int val) +{ + int cpu = get_tail_cpu(val); + struct qnodes *qnodesp = per_cpu_ptr(&qnodes, cpu); + int idx; + + for (idx = 0; idx < MAX_NODES; idx++) { + struct qnode *qnode = &qnodesp->nodes[idx]; + if (qnode->lock == lock) + return qnode; + } + + BUG(); +} + +static inline void queued_spin_lock_mcs_queue(struct qspinlock *lock) +{ + struct qnodes *qnodesp; + struct qnode *next, *node; + int val, old, tail; + int idx; + + BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS)); + + qnodesp = this_cpu_ptr(&qnodes); + if (unlikely(qnodesp->count == MAX_NODES)) { + while (!queued_spin_trylock(lock)) + cpu_relax(); + return; + } + + idx = qnodesp->count++; + /* + * Ensure that we increment the head node->count before initialising + * the actual node. If the compiler is kind enough to reorder these + * stores, then an IRQ could overwrite our assignments. + */ + barrier(); + node = &qnodesp->nodes[idx]; + node->next = NULL; + node->lock = lock; + node->locked = 0; + + tail = encode_tail_cpu(); + + old = publish_tail_cpu(lock, tail); + + /* + * If there was a previous node; link it and wait until reaching the + * head of the waitqueue. + */ + if (old & _Q_TAIL_CPU_MASK) { + struct qnode *prev = get_tail_qnode(lock, old); + + /* Link @node into the waitqueue. */ + WRITE_ONCE(prev->next, node); + + /* Wait for mcs node lock to be released */ + while (!node->locked) + cpu_relax(); + + smp_rmb(); /* acquire barrier for the mcs lock */ + } + + /* We're at the head of the waitqueue, wait for the lock. */ + while ((val = atomic_read(&lock->val)) & _Q_LOCKED_VAL) + cpu_relax(); + + /* If we're the last queued, must clean up the tail. */ + if ((val & _Q_TAIL_CPU_MASK) == tail) { + if (trylock_clear_tail_cpu(lock, val)) + goto release; + /* Another waiter must have enqueued */ + } + + /* We must be the owner, just set the lock bit and acquire */ + lock_set_locked(lock); + + /* contended path; must wait for next != NULL (MCS protocol) */ + while (!(next = READ_ONCE(node->next))) cpu_relax(); + + /* + * Unlock the next mcs waiter node. Release barrier is not required + * here because the acquirer is only accessing the lock word, and + * the acquire barrier we took the lock with orders that update vs + * this store to locked. The corresponding barrier is the smp_rmb() + * acquire barrier for mcs lock, above. + */ + WRITE_ONCE(next->locked, 1); + +release: + qnodesp->count--; /* release the node */ +} + +void queued_spin_lock_slowpath(struct qspinlock *lock) +{ + queued_spin_lock_mcs_queue(lock); } EXPORT_SYMBOL(queued_spin_lock_slowpath);