Message ID | 20111127225343.GA7827@bubble.grove.modra.org |
---|---|
State | New |
Headers | show |
On Mon, Nov 28, 2011 at 09:23:43AM +1030, Alan Modra wrote: > + int count = *sem; > + > + while ((count & 0x7fffffff) != 0) > + { > + int oldval = count; > + __atomic_compare_exchange_4 (sem, &oldval, count - 1, > + false, MEMMODEL_ACQUIRE, MEMMODEL_RELAXED); > + if (__builtin_expect (oldval == count, 1)) > + return; Why aren't you instead testing the return value of __atomic_compare_exchange_4 (here and in other cases)? Jakub
On Mon, Nov 28, 2011 at 05:23:37PM +0100, Jakub Jelinek wrote: > On Mon, Nov 28, 2011 at 09:23:43AM +1030, Alan Modra wrote: > > + int count = *sem; > > + > > + while ((count & 0x7fffffff) != 0) > > + { > > + int oldval = count; > > + __atomic_compare_exchange_4 (sem, &oldval, count - 1, > > + false, MEMMODEL_ACQUIRE, MEMMODEL_RELAXED); > > + if (__builtin_expect (oldval == count, 1)) > > + return; > > Why aren't you instead testing the return value of > __atomic_compare_exchange_4 (here and in other cases)? If you use the return value on powerpc, you find that requires two load immediate instructions (loading zero and one), and a compare against zero. That makes three fewer instructions as written, because the oldval == count comparison has already been done inside the atomic sequence. I'd expect fewer on most other architectures unless they happen to have a compare and exchange instruction that sets condition bits (ie. Intel). Even on Intel the way I've written the code shouldn't take more instructions with a properly written cmpxchg rtl description. Does it? Hmm, I suppose you could argue that powerpc and others ought to not generate those three extra instructions when using the return value. I'll see about fixing powerpc.
On 11/28/2011 02:16 PM, Alan Modra wrote: > Hmm, I suppose you could argue that powerpc and others ought to not > generate those three extra instructions when using the return value. > I'll see about fixing powerpc. True. For weak, the value *should* always be used (otherwise the user program is broken). However, we can do better by considering the value to be stored in CR0. We perform the comparison in the loop, which sets CR0 == EQ (true) or NE (false); CR0 is set again by the STWCX insn in the same fashion. So the /* ??? It's either this or an unlikely jump over (set bool 1). */ x = gen_rtx_EQ (SImode, cond, const0_rtx); emit_insn (gen_rtx_SET (VOIDmode, boolval, x)); code currently emitted by rs6000_expand_atomic_compare_and_swap is sufficient, and merely needs to be adjusted so that it is computed after label2, and other settings to boolval removed. This may be sufficient for better stong compare-and-swap as well. r~
On 11/27/2011 02:53 PM, Alan Modra wrote: > +enum memmodel > +{ > + MEMMODEL_RELAXED = 0, > + MEMMODEL_CONSUME = 1, > + MEMMODEL_ACQUIRE = 2, > + MEMMODEL_RELEASE = 3, > + MEMMODEL_ACQ_REL = 4, > + MEMMODEL_SEQ_CST = 5, > + MEMMODEL_LAST = 6 > +}; This should probably go to libgomp.h. > /* This is a Linux specific implementation of a semaphore synchronization > mechanism for libgomp. This type is private to the library. This > + counting semaphore implementation uses atomic instructions and the > + futex syscall, and a single 32-bit int to store semaphore state. > + The low 31 bits are the count, the top bit is a flag set when some > + threads may be waiting. */ I think we'll get better code generation on a number of targets if we make the low bit the waiting bit, and the higher bits the count. This we do (count & 1) and (count + 2) instead of larger constants needing to be generated. Not changed below... > +static inline void > +gomp_sem_wait (gomp_sem_t *sem) > { > + int count = *sem; > + > + while ((count & 0x7fffffff) != 0) > + { > + int oldval = count; > + __atomic_compare_exchange_4 (sem, &oldval, count - 1, > + false, MEMMODEL_ACQUIRE, MEMMODEL_RELAXED); > + if (__builtin_expect (oldval == count, 1)) > + return; > + count = oldval; > + } I'd really prefer not to hard-code any sizes at all. Let's change the explicit _4 to _n everywhere. The loop above doesn't actually make sense to me. If the compare-and-swap succeeds, then oldval == count - 1. Which doesn't seem to be what you're testing at all. Perhaps this entire function is better written as { int count = *sem; do { if (__builtin_expect (count & 0x80000000u, 0) { gomp_sem_wait_slow (sem, count); return; } } while (!__atomic_compare_exchange_n (sem, &count, count - 1, true, MEMMODEL_ACQUIRE, MEMMODEL_RELAXED)); } > +gomp_sem_post (gomp_sem_t *sem) > { > + int count = *sem; > + > + while (1) > + { > + int oldval = count; > + __atomic_compare_exchange_4 (sem, &oldval, (count + 1) & 0x7fffffff, > + false, MEMMODEL_RELEASE, MEMMODEL_RELAXED); > + if (__builtin_expect (oldval == count, 1)) > + break; Again, this equality doesn't make sense. Further, you aren't testing for the high bit set inside the loop, nor are you preserving the high bit. Perhaps better written as { int count = *sem; do { if (__builtin_expect (count & 0x80000000u, 0)) { gomp_sem_post_slow (sem, count); return; } /* We can't post more than 2**31-1 times. */ assert (count < 0x7fffffff); } while (!__atomic_compare_exchange_n (sem, &count, count + 1, true, MEMMODEL_RELEASE, MEMMODEL_RELAXED)); } ... All that said, I don't see how we can ever clear the wait bit once its set? Are we better off splitting the 32-bit value into two 16-bit fields for value+waiters? Or similarly splitting a 64-bit value? That way we can at least update both fields atomically, and we ought never lose a waiter. r~
On Mon, Nov 28, 2011 at 04:00:10PM -0800, Richard Henderson wrote: > On 11/27/2011 02:53 PM, Alan Modra wrote: > > +enum memmodel > > +{ > > + MEMMODEL_RELAXED = 0, > > + MEMMODEL_CONSUME = 1, > > + MEMMODEL_ACQUIRE = 2, > > + MEMMODEL_RELEASE = 3, > > + MEMMODEL_ACQ_REL = 4, > > + MEMMODEL_SEQ_CST = 5, > > + MEMMODEL_LAST = 6 > > +}; > > This should probably go to libgomp.h. I wondered about that. > > /* This is a Linux specific implementation of a semaphore synchronization > > mechanism for libgomp. This type is private to the library. This > > + counting semaphore implementation uses atomic instructions and the > > + futex syscall, and a single 32-bit int to store semaphore state. > > + The low 31 bits are the count, the top bit is a flag set when some > > + threads may be waiting. */ > > I think we'll get better code generation on a number of targets if we make the low bit the waiting bit, and the higher bits the count. This we do (count & 1) and (count + 2) instead of larger constants needing to be generated. Not changed below... Funny, that's the way I wrote this code at first, then went for the wait bit as the sign bit because you can test > 0 in one place where you want "not waiting and count non-zero". > > +static inline void > > +gomp_sem_wait (gomp_sem_t *sem) > > { > > + int count = *sem; > > + > > + while ((count & 0x7fffffff) != 0) > > + { > > + int oldval = count; > > + __atomic_compare_exchange_4 (sem, &oldval, count - 1, > > + false, MEMMODEL_ACQUIRE, MEMMODEL_RELAXED); > > + if (__builtin_expect (oldval == count, 1)) > > + return; > > + count = oldval; > > + } > > I'd really prefer not to hard-code any sizes at all. Let's change the explicit _4 to _n everywhere. OK. > The loop above doesn't actually make sense to me. If the compare-and-swap succeeds, then oldval == count - 1. Which doesn't seem to be what you're testing at all. Huh? If it succeeds, oldval == count and we return. > Perhaps this entire function is better written as > > { > int count = *sem; > do > { > if (__builtin_expect (count & 0x80000000u, 0) > { > gomp_sem_wait_slow (sem, count); > return; > } > } > while (!__atomic_compare_exchange_n (sem, &count, count - 1, true, > MEMMODEL_ACQUIRE, MEMMODEL_RELAXED)); > } No, we don't need the slow path if we have *sem & 0x7fffffff non-zero. > > > +gomp_sem_post (gomp_sem_t *sem) > > { > > + int count = *sem; > > + > > + while (1) > > + { > > + int oldval = count; > > + __atomic_compare_exchange_4 (sem, &oldval, (count + 1) & 0x7fffffff, > > + false, MEMMODEL_RELEASE, MEMMODEL_RELAXED); > > + if (__builtin_expect (oldval == count, 1)) > > + break; > > Again, this equality doesn't make sense. Further, you aren't testing for the > high bit set inside the loop, nor are you preserving the high bit. See above about the equality. We don't want to preserve the wait bit here. Otherwise we'd never go back to the uncontended state, which answers > ... All that said, I don't see how we can ever clear the wait bit > once its set? this question. > Are we better off splitting the 32-bit value into two 16-bit fields for value+waiters? Or similarly splitting a 64-bit value? That way we can at least update both fields atomically, and we ought never lose a waiter. Other splits of the field are certainly possible, but of course restrict the max number, and you'd need the fancy futex op_wait. Some targets don't have 64-bit atomics, so we can't really go that way. Yes, if I did keep track of number of waiters we'd save one unnecessary futex_wake call, but I'm quite confident no waiters will be lost just using a flag.
Index: libgomp/config/linux/sem.h =================================================================== --- libgomp/config/linux/sem.h (revision 181718) +++ libgomp/config/linux/sem.h (working copy) @@ -24,34 +24,74 @@ /* This is a Linux specific implementation of a semaphore synchronization mechanism for libgomp. This type is private to the library. This - implementation uses atomic instructions and the futex syscall. */ + counting semaphore implementation uses atomic instructions and the + futex syscall, and a single 32-bit int to store semaphore state. + The low 31 bits are the count, the top bit is a flag set when some + threads may be waiting. */ #ifndef GOMP_SEM_H #define GOMP_SEM_H 1 typedef int gomp_sem_t; -static inline void gomp_sem_init (gomp_sem_t *sem, int value) +enum memmodel +{ + MEMMODEL_RELAXED = 0, + MEMMODEL_CONSUME = 1, + MEMMODEL_ACQUIRE = 2, + MEMMODEL_RELEASE = 3, + MEMMODEL_ACQ_REL = 4, + MEMMODEL_SEQ_CST = 5, + MEMMODEL_LAST = 6 +}; + +extern void gomp_sem_wait_slow (gomp_sem_t *, int); +extern void gomp_sem_post_slow (gomp_sem_t *); + +static inline void +gomp_sem_init (gomp_sem_t *sem, int value) { *sem = value; } -extern void gomp_sem_wait_slow (gomp_sem_t *); -static inline void gomp_sem_wait (gomp_sem_t *sem) +static inline void +gomp_sem_destroy (gomp_sem_t *sem) { - if (!__sync_bool_compare_and_swap (sem, 1, 0)) - gomp_sem_wait_slow (sem); } -extern void gomp_sem_post_slow (gomp_sem_t *); -static inline void gomp_sem_post (gomp_sem_t *sem) +static inline void +gomp_sem_wait (gomp_sem_t *sem) { - if (!__sync_bool_compare_and_swap (sem, 0, 1)) - gomp_sem_post_slow (sem); + int count = *sem; + + while ((count & 0x7fffffff) != 0) + { + int oldval = count; + __atomic_compare_exchange_4 (sem, &oldval, count - 1, + false, MEMMODEL_ACQUIRE, MEMMODEL_RELAXED); + if (__builtin_expect (oldval == count, 1)) + return; + count = oldval; + } + gomp_sem_wait_slow (sem, count); } -static inline void gomp_sem_destroy (gomp_sem_t *sem) +static inline void +gomp_sem_post (gomp_sem_t *sem) { -} + int count = *sem; + + while (1) + { + int oldval = count; + __atomic_compare_exchange_4 (sem, &oldval, (count + 1) & 0x7fffffff, + false, MEMMODEL_RELEASE, MEMMODEL_RELAXED); + if (__builtin_expect (oldval == count, 1)) + break; + count = oldval; + } + if (__builtin_expect (count & 0x80000000, 0)) + gomp_sem_post_slow (sem); +} #endif /* GOMP_SEM_H */ Index: libgomp/config/linux/sem.c =================================================================== --- libgomp/config/linux/sem.c (revision 181718) +++ libgomp/config/linux/sem.c (working copy) @@ -28,34 +28,69 @@ #include "wait.h" - void -gomp_sem_wait_slow (gomp_sem_t *sem) +gomp_sem_wait_slow (gomp_sem_t *sem, int count) { - while (1) + int oldval, newval; + + /* First loop spins a while. */ + while (count == 0) { - int val = __sync_val_compare_and_swap (sem, 0, -1); - if (val > 0) + if (do_spin (sem, 0)) + { + /* Spin timeout, nothing changed. Set waiting flag. */ + oldval = 0; + __atomic_compare_exchange_4 (sem, &oldval, 0x80000000, false, + MEMMODEL_ACQUIRE, MEMMODEL_RELAXED); + count = oldval; + if (oldval == 0) + { + futex_wait (sem, 0x80000000); + count = *sem; + } + break; + } + /* Something changed. If positive, we're good to go. */ + else if ((count = *sem) > 0) { - if (__sync_bool_compare_and_swap (sem, val, val - 1)) + oldval = count; + __atomic_compare_exchange_4 (sem, &oldval, count - 1, false, + MEMMODEL_ACQUIRE, MEMMODEL_RELAXED); + if (oldval == count) return; + count = oldval; } - do_wait (sem, -1); + } + + /* Second loop waits until semaphore is posted. We always exit this + loop with wait flag set, so next post will awaken a thread. */ + while (1) + { + oldval = count; + newval = 0x80000000; + if ((count & 0x7fffffff) != 0) + newval |= count - 1; + __atomic_compare_exchange_4 (sem, &oldval, newval, false, + MEMMODEL_ACQUIRE, MEMMODEL_RELAXED); + if (oldval == count) + { + if ((count & 0x7fffffff) != 0) + { + /* If we can wake more threads, do so now. */ + if ((count & 0x7fffffff) > 1) + gomp_sem_post_slow (sem); + break; + } + do_wait (sem, 0x80000000); + count = *sem; + } + else + count = oldval; } } void gomp_sem_post_slow (gomp_sem_t *sem) { - int old, tmp = *sem, wake; - - do - { - old = tmp; - wake = old > 0 ? old + 1 : 1; - tmp = __sync_val_compare_and_swap (sem, old, wake); - } - while (old != tmp); - - futex_wake (sem, wake); + futex_wake (sem, 1); }