Message ID | 20220406161954.832191-2-tjaalton@ubuntu.com |
---|---|
State | New |
Headers | show |
Series | io_uring regression - lost write request | expand |
On 06.04.22 18:19, Timo Aaltonen wrote: > From: Jens Axboe <axboe@kernel.dk> > > BugLink: https://bugs.launchpad.net/bugs/1952222 > > We've got a few issues that all boil down to the fact that we have one > list of pending work items, yet two different types of workers to > serve them. This causes some oddities around workers switching type and > even hashed work vs regular work on the same bounded list. > > Just separate them out cleanly, similarly to how we already do > accounting of what is running. That provides a clean separation and > removes some corner cases that can cause stalls when handling IO > that is punted to io-wq. > > Fixes: ecc53c48c13d ("io-wq: check max_worker limits if a worker transitions bound state") > Signed-off-by: Jens Axboe <axboe@kernel.dk> > (backported from commit f95dc207b93da9c88ddbb7741ec3730c6657b88e; minor adjustments) > Signed-off-by: Timo Aaltonen <timo.aaltonen@canonical.com> Acked-by: Stefan Bader <stefan.bader@canonical.com> > --- Assuming this is more or less tested via oem and can be double checked when it lands in 5.13 kernels. -Stefan > fs/io-wq.c | 158 +++++++++++++++++++++++------------------------------ > 1 file changed, 69 insertions(+), 89 deletions(-) > > diff --git a/fs/io-wq.c b/fs/io-wq.c > index ba7aaf2b95d0..bded284a56d0 100644 > --- a/fs/io-wq.c > +++ b/fs/io-wq.c > @@ -34,7 +34,7 @@ enum { > }; > > enum { > - IO_WQE_FLAG_STALLED = 1, /* stalled on hash */ > + IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ > }; > > /* > @@ -73,25 +73,24 @@ struct io_wqe_acct { > unsigned max_workers; > int index; > atomic_t nr_running; > + struct io_wq_work_list work_list; > + unsigned long flags; > }; > > enum { > IO_WQ_ACCT_BOUND, > IO_WQ_ACCT_UNBOUND, > + IO_WQ_ACCT_NR, > }; > > /* > * Per-node worker thread pool > */ > struct io_wqe { > - struct { > - raw_spinlock_t lock; > - struct io_wq_work_list work_list; > - unsigned flags; > - } ____cacheline_aligned_in_smp; > + raw_spinlock_t lock; > + struct io_wqe_acct acct[2]; > > int node; > - struct io_wqe_acct acct[2]; > > struct hlist_nulls_head free_list; > struct list_head all_list; > @@ -196,11 +195,10 @@ static void io_worker_exit(struct io_worker *worker) > do_exit(0); > } > > -static inline bool io_wqe_run_queue(struct io_wqe *wqe) > - __must_hold(wqe->lock) > +static inline bool io_acct_run_queue(struct io_wqe_acct *acct) > { > - if (!wq_list_empty(&wqe->work_list) && > - !(wqe->flags & IO_WQE_FLAG_STALLED)) > + if (!wq_list_empty(&acct->work_list) && > + !test_bit(IO_ACCT_STALLED_BIT, &acct->flags)) > return true; > return false; > } > @@ -209,7 +207,8 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe) > * Check head of free list for an available worker. If one isn't available, > * caller must create one. > */ > -static bool io_wqe_activate_free_worker(struct io_wqe *wqe) > +static bool io_wqe_activate_free_worker(struct io_wqe *wqe, > + struct io_wqe_acct *acct) > __must_hold(RCU) > { > struct hlist_nulls_node *n; > @@ -223,6 +222,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe) > hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) { > if (!io_worker_get(worker)) > continue; > + if (io_wqe_get_acct(worker) != acct) { > + io_worker_release(worker); > + continue; > + } > if (wake_up_process(worker->task)) { > io_worker_release(worker); > return true; > @@ -341,7 +344,7 @@ static void io_wqe_dec_running(struct io_worker *worker) > if (!(worker->flags & IO_WORKER_F_UP)) > return; > > - if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) { > + if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) { > atomic_inc(&acct->nr_running); > atomic_inc(&wqe->wq->worker_refs); > io_queue_worker_create(wqe, worker, acct); > @@ -356,29 +359,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, > struct io_wq_work *work) > __must_hold(wqe->lock) > { > - bool worker_bound, work_bound; > - > - BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1); > - > if (worker->flags & IO_WORKER_F_FREE) { > worker->flags &= ~IO_WORKER_F_FREE; > hlist_nulls_del_init_rcu(&worker->nulls_node); > } > - > - /* > - * If worker is moving from bound to unbound (or vice versa), then > - * ensure we update the running accounting. > - */ > - worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; > - work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; > - if (worker_bound != work_bound) { > - int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND; > - io_wqe_dec_running(worker); > - worker->flags ^= IO_WORKER_F_BOUND; > - wqe->acct[index].nr_workers--; > - wqe->acct[index ^ 1].nr_workers++; > - io_wqe_inc_running(worker); > - } > } > > /* > @@ -420,44 +404,23 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) > return ret; > } > > -/* > - * We can always run the work if the worker is currently the same type as > - * the work (eg both are bound, or both are unbound). If they are not the > - * same, only allow it if incrementing the worker count would be allowed. > - */ > -static bool io_worker_can_run_work(struct io_worker *worker, > - struct io_wq_work *work) > -{ > - struct io_wqe_acct *acct; > - > - if (!(worker->flags & IO_WORKER_F_BOUND) != > - !(work->flags & IO_WQ_WORK_UNBOUND)) > - return true; > - > - /* not the same type, check if we'd go over the limit */ > - acct = io_work_get_acct(worker->wqe, work); > - return acct->nr_workers < acct->max_workers; > -} > - > -static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, > +static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct, > struct io_worker *worker) > __must_hold(wqe->lock) > { > struct io_wq_work_node *node, *prev; > struct io_wq_work *work, *tail; > unsigned int stall_hash = -1U; > + struct io_wqe *wqe = worker->wqe; > > - wq_list_for_each(node, prev, &wqe->work_list) { > + wq_list_for_each(node, prev, &acct->work_list) { > unsigned int hash; > > work = container_of(node, struct io_wq_work, list); > > - if (!io_worker_can_run_work(worker, work)) > - break; > - > /* not hashed, can run anytime */ > if (!io_wq_is_hashed(work)) { > - wq_list_del(&wqe->work_list, node, prev); > + wq_list_del(&acct->work_list, node, prev); > return work; > } > > @@ -468,7 +431,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, > /* hashed, can run if not already running */ > if (!test_and_set_bit(hash, &wqe->wq->hash->map)) { > wqe->hash_tail[hash] = NULL; > - wq_list_cut(&wqe->work_list, &tail->list, prev); > + wq_list_cut(&acct->work_list, &tail->list, prev); > return work; > } > if (stall_hash == -1U) > @@ -484,12 +447,12 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, > * Set this before dropping the lock to avoid racing with new > * work being added and clearing the stalled bit. > */ > - wqe->flags |= IO_WQE_FLAG_STALLED; > + set_bit(IO_ACCT_STALLED_BIT, &acct->flags); > raw_spin_unlock(&wqe->lock); > unstalled = io_wait_on_hash(wqe, stall_hash); > raw_spin_lock(&wqe->lock); > if (unstalled) { > - wqe->flags &= ~IO_WQE_FLAG_STALLED; > + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); > if (wq_has_sleeper(&wqe->wq->hash->wait)) > wake_up(&wqe->wq->hash->wait); > } > @@ -526,6 +489,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); > static void io_worker_handle_work(struct io_worker *worker) > __releases(wqe->lock) > { > + struct io_wqe_acct *acct = io_wqe_get_acct(worker); > struct io_wqe *wqe = worker->wqe; > struct io_wq *wq = wqe->wq; > bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); > @@ -540,7 +504,7 @@ static void io_worker_handle_work(struct io_worker *worker) > * can't make progress, any work completion or insertion will > * clear the stalled flag. > */ > - work = io_get_next_work(wqe, worker); > + work = io_get_next_work(acct, worker); > if (work) > __io_worker_busy(wqe, worker, work); > > @@ -576,7 +540,7 @@ static void io_worker_handle_work(struct io_worker *worker) > /* serialize hash clear with wake_up() */ > spin_lock_irq(&wq->hash->wait.lock); > clear_bit(hash, &wq->hash->map); > - wqe->flags &= ~IO_WQE_FLAG_STALLED; > + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); > spin_unlock_irq(&wq->hash->wait.lock); > if (wq_has_sleeper(&wq->hash->wait)) > wake_up(&wq->hash->wait); > @@ -595,6 +559,7 @@ static void io_worker_handle_work(struct io_worker *worker) > static int io_wqe_worker(void *data) > { > struct io_worker *worker = data; > + struct io_wqe_acct *acct = io_wqe_get_acct(worker); > struct io_wqe *wqe = worker->wqe; > struct io_wq *wq = wqe->wq; > char buf[TASK_COMM_LEN]; > @@ -610,7 +575,7 @@ static int io_wqe_worker(void *data) > set_current_state(TASK_INTERRUPTIBLE); > loop: > raw_spin_lock_irq(&wqe->lock); > - if (io_wqe_run_queue(wqe)) { > + if (io_acct_run_queue(acct)) { > io_worker_handle_work(worker); > goto loop; > } > @@ -636,7 +601,7 @@ static int io_wqe_worker(void *data) > > if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { > raw_spin_lock_irq(&wqe->lock); > - if (!wq_list_empty(&wqe->work_list)) > + if (!wq_list_empty(&acct->work_list)) > io_worker_handle_work(worker); > else > raw_spin_unlock_irq(&wqe->lock); > @@ -782,12 +747,13 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) > > static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) > { > + struct io_wqe_acct *acct = io_work_get_acct(wqe, work); > unsigned int hash; > struct io_wq_work *tail; > > if (!io_wq_is_hashed(work)) { > append: > - wq_list_add_tail(&work->list, &wqe->work_list); > + wq_list_add_tail(&work->list, &acct->work_list); > return; > } > > @@ -797,7 +763,7 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) > if (!tail) > goto append; > > - wq_list_add_after(&work->list, &tail->list, &wqe->work_list); > + wq_list_add_after(&work->list, &tail->list, &acct->work_list); > } > > static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) > @@ -819,10 +785,10 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) > > raw_spin_lock_irqsave(&wqe->lock, flags); > io_wqe_insert_work(wqe, work); > - wqe->flags &= ~IO_WQE_FLAG_STALLED; > + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); > > rcu_read_lock(); > - do_create = !io_wqe_activate_free_worker(wqe); > + do_create = !io_wqe_activate_free_worker(wqe, acct); > rcu_read_unlock(); > > raw_spin_unlock_irqrestore(&wqe->lock, flags); > @@ -875,6 +841,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe, > struct io_wq_work *work, > struct io_wq_work_node *prev) > { > + struct io_wqe_acct *acct = io_work_get_acct(wqe, work); > unsigned int hash = io_get_work_hash(work); > struct io_wq_work *prev_work = NULL; > > @@ -886,7 +853,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe, > else > wqe->hash_tail[hash] = NULL; > } > - wq_list_del(&wqe->work_list, &work->list, prev); > + wq_list_del(&acct->work_list, &work->list, prev); > } > > static void io_wqe_cancel_pending_work(struct io_wqe *wqe, > @@ -895,22 +862,27 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe, > struct io_wq_work_node *node, *prev; > struct io_wq_work *work; > unsigned long flags; > + int i; > > retry: > raw_spin_lock_irqsave(&wqe->lock, flags); > - wq_list_for_each(node, prev, &wqe->work_list) { > - work = container_of(node, struct io_wq_work, list); > - if (!match->fn(work, match->data)) > - continue; > - io_wqe_remove_pending(wqe, work, prev); > - raw_spin_unlock_irqrestore(&wqe->lock, flags); > - io_run_cancel(work, wqe); > - match->nr_pending++; > - if (!match->cancel_all) > - return; > + for (i = 0; i < IO_WQ_ACCT_NR; i++) { > + struct io_wqe_acct *acct = io_get_acct(wqe, i == 0); > > - /* not safe to continue after unlock */ > - goto retry; > + wq_list_for_each(node, prev, &acct->work_list) { > + work = container_of(node, struct io_wq_work, list); > + if (!match->fn(work, match->data)) > + continue; > + io_wqe_remove_pending(wqe, work, prev); > + raw_spin_unlock_irqrestore(&wqe->lock, flags); > + io_run_cancel(work, wqe); > + match->nr_pending++; > + if (!match->cancel_all) > + return; > + > + /* not safe to continue after unlock */ > + goto retry; > + } > } > raw_spin_unlock_irqrestore(&wqe->lock, flags); > } > @@ -971,18 +943,24 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode, > int sync, void *key) > { > struct io_wqe *wqe = container_of(wait, struct io_wqe, wait); > + int i; > > list_del_init(&wait->entry); > > rcu_read_lock(); > - io_wqe_activate_free_worker(wqe); > + for (i = 0; i < IO_WQ_ACCT_NR; i++) { > + struct io_wqe_acct *acct = &wqe->acct[i]; > + > + if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) > + io_wqe_activate_free_worker(wqe, acct); > + } > rcu_read_unlock(); > return 1; > } > > struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) > { > - int ret = -ENOMEM, node; > + int ret = -ENOMEM, node, i; > struct io_wq *wq; > > if (WARN_ON_ONCE(!data->free_work || !data->do_work)) > @@ -1019,18 +997,20 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) > goto err; > wq->wqes[node] = wqe; > wqe->node = alloc_node; > - wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND; > - wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND; > wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; > - atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); > wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = > task_rlimit(current, RLIMIT_NPROC); > - atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); > - wqe->wait.func = io_wqe_hash_wake; > INIT_LIST_HEAD(&wqe->wait.entry); > + wqe->wait.func = io_wqe_hash_wake; > + for (i = 0; i < IO_WQ_ACCT_NR; i++) { > + struct io_wqe_acct *acct = &wqe->acct[i]; > + > + acct->index = i; > + atomic_set(&acct->nr_running, 0); > + INIT_WQ_LIST(&acct->work_list); > + } > wqe->wq = wq; > raw_spin_lock_init(&wqe->lock); > - INIT_WQ_LIST(&wqe->work_list); > INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); > INIT_LIST_HEAD(&wqe->all_list); > }
On 06.04.22 18:19, Timo Aaltonen wrote: > From: Jens Axboe <axboe@kernel.dk> > > BugLink: https://bugs.launchpad.net/bugs/1952222 > > We've got a few issues that all boil down to the fact that we have one > list of pending work items, yet two different types of workers to > serve them. This causes some oddities around workers switching type and > even hashed work vs regular work on the same bounded list. > > Just separate them out cleanly, similarly to how we already do > accounting of what is running. That provides a clean separation and > removes some corner cases that can cause stalls when handling IO > that is punted to io-wq. > > Fixes: ecc53c48c13d ("io-wq: check max_worker limits if a worker transitions bound state") > Signed-off-by: Jens Axboe <axboe@kernel.dk> > (backported from commit f95dc207b93da9c88ddbb7741ec3730c6657b88e; minor adjustments) To follow the pattern, this should be added as something like: (backported from commit f95dc207b93da9c88ddbb7741ec3730c6657b88e) [ tjaalton: minor adjustments ] Which can be fixed when applying the patch. > Signed-off-by: Timo Aaltonen <timo.aaltonen@canonical.com> Acked-by: Kleber Sacilotto de Souza <kleber.souza@canonical.com> Thanks > --- > fs/io-wq.c | 158 +++++++++++++++++++++++------------------------------ > 1 file changed, 69 insertions(+), 89 deletions(-) > > diff --git a/fs/io-wq.c b/fs/io-wq.c > index ba7aaf2b95d0..bded284a56d0 100644 > --- a/fs/io-wq.c > +++ b/fs/io-wq.c > @@ -34,7 +34,7 @@ enum { > }; > > enum { > - IO_WQE_FLAG_STALLED = 1, /* stalled on hash */ > + IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ > }; > > /* > @@ -73,25 +73,24 @@ struct io_wqe_acct { > unsigned max_workers; > int index; > atomic_t nr_running; > + struct io_wq_work_list work_list; > + unsigned long flags; > }; > > enum { > IO_WQ_ACCT_BOUND, > IO_WQ_ACCT_UNBOUND, > + IO_WQ_ACCT_NR, > }; > > /* > * Per-node worker thread pool > */ > struct io_wqe { > - struct { > - raw_spinlock_t lock; > - struct io_wq_work_list work_list; > - unsigned flags; > - } ____cacheline_aligned_in_smp; > + raw_spinlock_t lock; > + struct io_wqe_acct acct[2]; > > int node; > - struct io_wqe_acct acct[2]; > > struct hlist_nulls_head free_list; > struct list_head all_list; > @@ -196,11 +195,10 @@ static void io_worker_exit(struct io_worker *worker) > do_exit(0); > } > > -static inline bool io_wqe_run_queue(struct io_wqe *wqe) > - __must_hold(wqe->lock) > +static inline bool io_acct_run_queue(struct io_wqe_acct *acct) > { > - if (!wq_list_empty(&wqe->work_list) && > - !(wqe->flags & IO_WQE_FLAG_STALLED)) > + if (!wq_list_empty(&acct->work_list) && > + !test_bit(IO_ACCT_STALLED_BIT, &acct->flags)) > return true; > return false; > } > @@ -209,7 +207,8 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe) > * Check head of free list for an available worker. If one isn't available, > * caller must create one. > */ > -static bool io_wqe_activate_free_worker(struct io_wqe *wqe) > +static bool io_wqe_activate_free_worker(struct io_wqe *wqe, > + struct io_wqe_acct *acct) > __must_hold(RCU) > { > struct hlist_nulls_node *n; > @@ -223,6 +222,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe) > hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) { > if (!io_worker_get(worker)) > continue; > + if (io_wqe_get_acct(worker) != acct) { > + io_worker_release(worker); > + continue; > + } > if (wake_up_process(worker->task)) { > io_worker_release(worker); > return true; > @@ -341,7 +344,7 @@ static void io_wqe_dec_running(struct io_worker *worker) > if (!(worker->flags & IO_WORKER_F_UP)) > return; > > - if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) { > + if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) { > atomic_inc(&acct->nr_running); > atomic_inc(&wqe->wq->worker_refs); > io_queue_worker_create(wqe, worker, acct); > @@ -356,29 +359,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, > struct io_wq_work *work) > __must_hold(wqe->lock) > { > - bool worker_bound, work_bound; > - > - BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1); > - > if (worker->flags & IO_WORKER_F_FREE) { > worker->flags &= ~IO_WORKER_F_FREE; > hlist_nulls_del_init_rcu(&worker->nulls_node); > } > - > - /* > - * If worker is moving from bound to unbound (or vice versa), then > - * ensure we update the running accounting. > - */ > - worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; > - work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; > - if (worker_bound != work_bound) { > - int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND; > - io_wqe_dec_running(worker); > - worker->flags ^= IO_WORKER_F_BOUND; > - wqe->acct[index].nr_workers--; > - wqe->acct[index ^ 1].nr_workers++; > - io_wqe_inc_running(worker); > - } > } > > /* > @@ -420,44 +404,23 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) > return ret; > } > > -/* > - * We can always run the work if the worker is currently the same type as > - * the work (eg both are bound, or both are unbound). If they are not the > - * same, only allow it if incrementing the worker count would be allowed. > - */ > -static bool io_worker_can_run_work(struct io_worker *worker, > - struct io_wq_work *work) > -{ > - struct io_wqe_acct *acct; > - > - if (!(worker->flags & IO_WORKER_F_BOUND) != > - !(work->flags & IO_WQ_WORK_UNBOUND)) > - return true; > - > - /* not the same type, check if we'd go over the limit */ > - acct = io_work_get_acct(worker->wqe, work); > - return acct->nr_workers < acct->max_workers; > -} > - > -static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, > +static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct, > struct io_worker *worker) > __must_hold(wqe->lock) > { > struct io_wq_work_node *node, *prev; > struct io_wq_work *work, *tail; > unsigned int stall_hash = -1U; > + struct io_wqe *wqe = worker->wqe; > > - wq_list_for_each(node, prev, &wqe->work_list) { > + wq_list_for_each(node, prev, &acct->work_list) { > unsigned int hash; > > work = container_of(node, struct io_wq_work, list); > > - if (!io_worker_can_run_work(worker, work)) > - break; > - > /* not hashed, can run anytime */ > if (!io_wq_is_hashed(work)) { > - wq_list_del(&wqe->work_list, node, prev); > + wq_list_del(&acct->work_list, node, prev); > return work; > } > > @@ -468,7 +431,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, > /* hashed, can run if not already running */ > if (!test_and_set_bit(hash, &wqe->wq->hash->map)) { > wqe->hash_tail[hash] = NULL; > - wq_list_cut(&wqe->work_list, &tail->list, prev); > + wq_list_cut(&acct->work_list, &tail->list, prev); > return work; > } > if (stall_hash == -1U) > @@ -484,12 +447,12 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, > * Set this before dropping the lock to avoid racing with new > * work being added and clearing the stalled bit. > */ > - wqe->flags |= IO_WQE_FLAG_STALLED; > + set_bit(IO_ACCT_STALLED_BIT, &acct->flags); > raw_spin_unlock(&wqe->lock); > unstalled = io_wait_on_hash(wqe, stall_hash); > raw_spin_lock(&wqe->lock); > if (unstalled) { > - wqe->flags &= ~IO_WQE_FLAG_STALLED; > + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); > if (wq_has_sleeper(&wqe->wq->hash->wait)) > wake_up(&wqe->wq->hash->wait); > } > @@ -526,6 +489,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); > static void io_worker_handle_work(struct io_worker *worker) > __releases(wqe->lock) > { > + struct io_wqe_acct *acct = io_wqe_get_acct(worker); > struct io_wqe *wqe = worker->wqe; > struct io_wq *wq = wqe->wq; > bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); > @@ -540,7 +504,7 @@ static void io_worker_handle_work(struct io_worker *worker) > * can't make progress, any work completion or insertion will > * clear the stalled flag. > */ > - work = io_get_next_work(wqe, worker); > + work = io_get_next_work(acct, worker); > if (work) > __io_worker_busy(wqe, worker, work); > > @@ -576,7 +540,7 @@ static void io_worker_handle_work(struct io_worker *worker) > /* serialize hash clear with wake_up() */ > spin_lock_irq(&wq->hash->wait.lock); > clear_bit(hash, &wq->hash->map); > - wqe->flags &= ~IO_WQE_FLAG_STALLED; > + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); > spin_unlock_irq(&wq->hash->wait.lock); > if (wq_has_sleeper(&wq->hash->wait)) > wake_up(&wq->hash->wait); > @@ -595,6 +559,7 @@ static void io_worker_handle_work(struct io_worker *worker) > static int io_wqe_worker(void *data) > { > struct io_worker *worker = data; > + struct io_wqe_acct *acct = io_wqe_get_acct(worker); > struct io_wqe *wqe = worker->wqe; > struct io_wq *wq = wqe->wq; > char buf[TASK_COMM_LEN]; > @@ -610,7 +575,7 @@ static int io_wqe_worker(void *data) > set_current_state(TASK_INTERRUPTIBLE); > loop: > raw_spin_lock_irq(&wqe->lock); > - if (io_wqe_run_queue(wqe)) { > + if (io_acct_run_queue(acct)) { > io_worker_handle_work(worker); > goto loop; > } > @@ -636,7 +601,7 @@ static int io_wqe_worker(void *data) > > if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { > raw_spin_lock_irq(&wqe->lock); > - if (!wq_list_empty(&wqe->work_list)) > + if (!wq_list_empty(&acct->work_list)) > io_worker_handle_work(worker); > else > raw_spin_unlock_irq(&wqe->lock); > @@ -782,12 +747,13 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) > > static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) > { > + struct io_wqe_acct *acct = io_work_get_acct(wqe, work); > unsigned int hash; > struct io_wq_work *tail; > > if (!io_wq_is_hashed(work)) { > append: > - wq_list_add_tail(&work->list, &wqe->work_list); > + wq_list_add_tail(&work->list, &acct->work_list); > return; > } > > @@ -797,7 +763,7 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) > if (!tail) > goto append; > > - wq_list_add_after(&work->list, &tail->list, &wqe->work_list); > + wq_list_add_after(&work->list, &tail->list, &acct->work_list); > } > > static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) > @@ -819,10 +785,10 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) > > raw_spin_lock_irqsave(&wqe->lock, flags); > io_wqe_insert_work(wqe, work); > - wqe->flags &= ~IO_WQE_FLAG_STALLED; > + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); > > rcu_read_lock(); > - do_create = !io_wqe_activate_free_worker(wqe); > + do_create = !io_wqe_activate_free_worker(wqe, acct); > rcu_read_unlock(); > > raw_spin_unlock_irqrestore(&wqe->lock, flags); > @@ -875,6 +841,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe, > struct io_wq_work *work, > struct io_wq_work_node *prev) > { > + struct io_wqe_acct *acct = io_work_get_acct(wqe, work); > unsigned int hash = io_get_work_hash(work); > struct io_wq_work *prev_work = NULL; > > @@ -886,7 +853,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe, > else > wqe->hash_tail[hash] = NULL; > } > - wq_list_del(&wqe->work_list, &work->list, prev); > + wq_list_del(&acct->work_list, &work->list, prev); > } > > static void io_wqe_cancel_pending_work(struct io_wqe *wqe, > @@ -895,22 +862,27 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe, > struct io_wq_work_node *node, *prev; > struct io_wq_work *work; > unsigned long flags; > + int i; > > retry: > raw_spin_lock_irqsave(&wqe->lock, flags); > - wq_list_for_each(node, prev, &wqe->work_list) { > - work = container_of(node, struct io_wq_work, list); > - if (!match->fn(work, match->data)) > - continue; > - io_wqe_remove_pending(wqe, work, prev); > - raw_spin_unlock_irqrestore(&wqe->lock, flags); > - io_run_cancel(work, wqe); > - match->nr_pending++; > - if (!match->cancel_all) > - return; > + for (i = 0; i < IO_WQ_ACCT_NR; i++) { > + struct io_wqe_acct *acct = io_get_acct(wqe, i == 0); > > - /* not safe to continue after unlock */ > - goto retry; > + wq_list_for_each(node, prev, &acct->work_list) { > + work = container_of(node, struct io_wq_work, list); > + if (!match->fn(work, match->data)) > + continue; > + io_wqe_remove_pending(wqe, work, prev); > + raw_spin_unlock_irqrestore(&wqe->lock, flags); > + io_run_cancel(work, wqe); > + match->nr_pending++; > + if (!match->cancel_all) > + return; > + > + /* not safe to continue after unlock */ > + goto retry; > + } > } > raw_spin_unlock_irqrestore(&wqe->lock, flags); > } > @@ -971,18 +943,24 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode, > int sync, void *key) > { > struct io_wqe *wqe = container_of(wait, struct io_wqe, wait); > + int i; > > list_del_init(&wait->entry); > > rcu_read_lock(); > - io_wqe_activate_free_worker(wqe); > + for (i = 0; i < IO_WQ_ACCT_NR; i++) { > + struct io_wqe_acct *acct = &wqe->acct[i]; > + > + if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) > + io_wqe_activate_free_worker(wqe, acct); > + } > rcu_read_unlock(); > return 1; > } > > struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) > { > - int ret = -ENOMEM, node; > + int ret = -ENOMEM, node, i; > struct io_wq *wq; > > if (WARN_ON_ONCE(!data->free_work || !data->do_work)) > @@ -1019,18 +997,20 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) > goto err; > wq->wqes[node] = wqe; > wqe->node = alloc_node; > - wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND; > - wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND; > wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; > - atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); > wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = > task_rlimit(current, RLIMIT_NPROC); > - atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); > - wqe->wait.func = io_wqe_hash_wake; > INIT_LIST_HEAD(&wqe->wait.entry); > + wqe->wait.func = io_wqe_hash_wake; > + for (i = 0; i < IO_WQ_ACCT_NR; i++) { > + struct io_wqe_acct *acct = &wqe->acct[i]; > + > + acct->index = i; > + atomic_set(&acct->nr_running, 0); > + INIT_WQ_LIST(&acct->work_list); > + } > wqe->wq = wq; > raw_spin_lock_init(&wqe->lock); > - INIT_WQ_LIST(&wqe->work_list); > INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); > INIT_LIST_HEAD(&wqe->all_list); > }
diff --git a/fs/io-wq.c b/fs/io-wq.c index ba7aaf2b95d0..bded284a56d0 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -34,7 +34,7 @@ enum { }; enum { - IO_WQE_FLAG_STALLED = 1, /* stalled on hash */ + IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ }; /* @@ -73,25 +73,24 @@ struct io_wqe_acct { unsigned max_workers; int index; atomic_t nr_running; + struct io_wq_work_list work_list; + unsigned long flags; }; enum { IO_WQ_ACCT_BOUND, IO_WQ_ACCT_UNBOUND, + IO_WQ_ACCT_NR, }; /* * Per-node worker thread pool */ struct io_wqe { - struct { - raw_spinlock_t lock; - struct io_wq_work_list work_list; - unsigned flags; - } ____cacheline_aligned_in_smp; + raw_spinlock_t lock; + struct io_wqe_acct acct[2]; int node; - struct io_wqe_acct acct[2]; struct hlist_nulls_head free_list; struct list_head all_list; @@ -196,11 +195,10 @@ static void io_worker_exit(struct io_worker *worker) do_exit(0); } -static inline bool io_wqe_run_queue(struct io_wqe *wqe) - __must_hold(wqe->lock) +static inline bool io_acct_run_queue(struct io_wqe_acct *acct) { - if (!wq_list_empty(&wqe->work_list) && - !(wqe->flags & IO_WQE_FLAG_STALLED)) + if (!wq_list_empty(&acct->work_list) && + !test_bit(IO_ACCT_STALLED_BIT, &acct->flags)) return true; return false; } @@ -209,7 +207,8 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe) * Check head of free list for an available worker. If one isn't available, * caller must create one. */ -static bool io_wqe_activate_free_worker(struct io_wqe *wqe) +static bool io_wqe_activate_free_worker(struct io_wqe *wqe, + struct io_wqe_acct *acct) __must_hold(RCU) { struct hlist_nulls_node *n; @@ -223,6 +222,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe) hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) { if (!io_worker_get(worker)) continue; + if (io_wqe_get_acct(worker) != acct) { + io_worker_release(worker); + continue; + } if (wake_up_process(worker->task)) { io_worker_release(worker); return true; @@ -341,7 +344,7 @@ static void io_wqe_dec_running(struct io_worker *worker) if (!(worker->flags & IO_WORKER_F_UP)) return; - if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) { + if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) { atomic_inc(&acct->nr_running); atomic_inc(&wqe->wq->worker_refs); io_queue_worker_create(wqe, worker, acct); @@ -356,29 +359,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, struct io_wq_work *work) __must_hold(wqe->lock) { - bool worker_bound, work_bound; - - BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1); - if (worker->flags & IO_WORKER_F_FREE) { worker->flags &= ~IO_WORKER_F_FREE; hlist_nulls_del_init_rcu(&worker->nulls_node); } - - /* - * If worker is moving from bound to unbound (or vice versa), then - * ensure we update the running accounting. - */ - worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; - work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; - if (worker_bound != work_bound) { - int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND; - io_wqe_dec_running(worker); - worker->flags ^= IO_WORKER_F_BOUND; - wqe->acct[index].nr_workers--; - wqe->acct[index ^ 1].nr_workers++; - io_wqe_inc_running(worker); - } } /* @@ -420,44 +404,23 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) return ret; } -/* - * We can always run the work if the worker is currently the same type as - * the work (eg both are bound, or both are unbound). If they are not the - * same, only allow it if incrementing the worker count would be allowed. - */ -static bool io_worker_can_run_work(struct io_worker *worker, - struct io_wq_work *work) -{ - struct io_wqe_acct *acct; - - if (!(worker->flags & IO_WORKER_F_BOUND) != - !(work->flags & IO_WQ_WORK_UNBOUND)) - return true; - - /* not the same type, check if we'd go over the limit */ - acct = io_work_get_acct(worker->wqe, work); - return acct->nr_workers < acct->max_workers; -} - -static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, +static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct, struct io_worker *worker) __must_hold(wqe->lock) { struct io_wq_work_node *node, *prev; struct io_wq_work *work, *tail; unsigned int stall_hash = -1U; + struct io_wqe *wqe = worker->wqe; - wq_list_for_each(node, prev, &wqe->work_list) { + wq_list_for_each(node, prev, &acct->work_list) { unsigned int hash; work = container_of(node, struct io_wq_work, list); - if (!io_worker_can_run_work(worker, work)) - break; - /* not hashed, can run anytime */ if (!io_wq_is_hashed(work)) { - wq_list_del(&wqe->work_list, node, prev); + wq_list_del(&acct->work_list, node, prev); return work; } @@ -468,7 +431,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, /* hashed, can run if not already running */ if (!test_and_set_bit(hash, &wqe->wq->hash->map)) { wqe->hash_tail[hash] = NULL; - wq_list_cut(&wqe->work_list, &tail->list, prev); + wq_list_cut(&acct->work_list, &tail->list, prev); return work; } if (stall_hash == -1U) @@ -484,12 +447,12 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, * Set this before dropping the lock to avoid racing with new * work being added and clearing the stalled bit. */ - wqe->flags |= IO_WQE_FLAG_STALLED; + set_bit(IO_ACCT_STALLED_BIT, &acct->flags); raw_spin_unlock(&wqe->lock); unstalled = io_wait_on_hash(wqe, stall_hash); raw_spin_lock(&wqe->lock); if (unstalled) { - wqe->flags &= ~IO_WQE_FLAG_STALLED; + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); if (wq_has_sleeper(&wqe->wq->hash->wait)) wake_up(&wqe->wq->hash->wait); } @@ -526,6 +489,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); static void io_worker_handle_work(struct io_worker *worker) __releases(wqe->lock) { + struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); @@ -540,7 +504,7 @@ static void io_worker_handle_work(struct io_worker *worker) * can't make progress, any work completion or insertion will * clear the stalled flag. */ - work = io_get_next_work(wqe, worker); + work = io_get_next_work(acct, worker); if (work) __io_worker_busy(wqe, worker, work); @@ -576,7 +540,7 @@ static void io_worker_handle_work(struct io_worker *worker) /* serialize hash clear with wake_up() */ spin_lock_irq(&wq->hash->wait.lock); clear_bit(hash, &wq->hash->map); - wqe->flags &= ~IO_WQE_FLAG_STALLED; + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); spin_unlock_irq(&wq->hash->wait.lock); if (wq_has_sleeper(&wq->hash->wait)) wake_up(&wq->hash->wait); @@ -595,6 +559,7 @@ static void io_worker_handle_work(struct io_worker *worker) static int io_wqe_worker(void *data) { struct io_worker *worker = data; + struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; char buf[TASK_COMM_LEN]; @@ -610,7 +575,7 @@ static int io_wqe_worker(void *data) set_current_state(TASK_INTERRUPTIBLE); loop: raw_spin_lock_irq(&wqe->lock); - if (io_wqe_run_queue(wqe)) { + if (io_acct_run_queue(acct)) { io_worker_handle_work(worker); goto loop; } @@ -636,7 +601,7 @@ static int io_wqe_worker(void *data) if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { raw_spin_lock_irq(&wqe->lock); - if (!wq_list_empty(&wqe->work_list)) + if (!wq_list_empty(&acct->work_list)) io_worker_handle_work(worker); else raw_spin_unlock_irq(&wqe->lock); @@ -782,12 +747,13 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) { + struct io_wqe_acct *acct = io_work_get_acct(wqe, work); unsigned int hash; struct io_wq_work *tail; if (!io_wq_is_hashed(work)) { append: - wq_list_add_tail(&work->list, &wqe->work_list); + wq_list_add_tail(&work->list, &acct->work_list); return; } @@ -797,7 +763,7 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) if (!tail) goto append; - wq_list_add_after(&work->list, &tail->list, &wqe->work_list); + wq_list_add_after(&work->list, &tail->list, &acct->work_list); } static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) @@ -819,10 +785,10 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) raw_spin_lock_irqsave(&wqe->lock, flags); io_wqe_insert_work(wqe, work); - wqe->flags &= ~IO_WQE_FLAG_STALLED; + clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); rcu_read_lock(); - do_create = !io_wqe_activate_free_worker(wqe); + do_create = !io_wqe_activate_free_worker(wqe, acct); rcu_read_unlock(); raw_spin_unlock_irqrestore(&wqe->lock, flags); @@ -875,6 +841,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe, struct io_wq_work *work, struct io_wq_work_node *prev) { + struct io_wqe_acct *acct = io_work_get_acct(wqe, work); unsigned int hash = io_get_work_hash(work); struct io_wq_work *prev_work = NULL; @@ -886,7 +853,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe, else wqe->hash_tail[hash] = NULL; } - wq_list_del(&wqe->work_list, &work->list, prev); + wq_list_del(&acct->work_list, &work->list, prev); } static void io_wqe_cancel_pending_work(struct io_wqe *wqe, @@ -895,22 +862,27 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe, struct io_wq_work_node *node, *prev; struct io_wq_work *work; unsigned long flags; + int i; retry: raw_spin_lock_irqsave(&wqe->lock, flags); - wq_list_for_each(node, prev, &wqe->work_list) { - work = container_of(node, struct io_wq_work, list); - if (!match->fn(work, match->data)) - continue; - io_wqe_remove_pending(wqe, work, prev); - raw_spin_unlock_irqrestore(&wqe->lock, flags); - io_run_cancel(work, wqe); - match->nr_pending++; - if (!match->cancel_all) - return; + for (i = 0; i < IO_WQ_ACCT_NR; i++) { + struct io_wqe_acct *acct = io_get_acct(wqe, i == 0); - /* not safe to continue after unlock */ - goto retry; + wq_list_for_each(node, prev, &acct->work_list) { + work = container_of(node, struct io_wq_work, list); + if (!match->fn(work, match->data)) + continue; + io_wqe_remove_pending(wqe, work, prev); + raw_spin_unlock_irqrestore(&wqe->lock, flags); + io_run_cancel(work, wqe); + match->nr_pending++; + if (!match->cancel_all) + return; + + /* not safe to continue after unlock */ + goto retry; + } } raw_spin_unlock_irqrestore(&wqe->lock, flags); } @@ -971,18 +943,24 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode, int sync, void *key) { struct io_wqe *wqe = container_of(wait, struct io_wqe, wait); + int i; list_del_init(&wait->entry); rcu_read_lock(); - io_wqe_activate_free_worker(wqe); + for (i = 0; i < IO_WQ_ACCT_NR; i++) { + struct io_wqe_acct *acct = &wqe->acct[i]; + + if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) + io_wqe_activate_free_worker(wqe, acct); + } rcu_read_unlock(); return 1; } struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) { - int ret = -ENOMEM, node; + int ret = -ENOMEM, node, i; struct io_wq *wq; if (WARN_ON_ONCE(!data->free_work || !data->do_work)) @@ -1019,18 +997,20 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) goto err; wq->wqes[node] = wqe; wqe->node = alloc_node; - wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND; - wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND; wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; - atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = task_rlimit(current, RLIMIT_NPROC); - atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); - wqe->wait.func = io_wqe_hash_wake; INIT_LIST_HEAD(&wqe->wait.entry); + wqe->wait.func = io_wqe_hash_wake; + for (i = 0; i < IO_WQ_ACCT_NR; i++) { + struct io_wqe_acct *acct = &wqe->acct[i]; + + acct->index = i; + atomic_set(&acct->nr_running, 0); + INIT_WQ_LIST(&acct->work_list); + } wqe->wq = wq; raw_spin_lock_init(&wqe->lock); - INIT_WQ_LIST(&wqe->work_list); INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); INIT_LIST_HEAD(&wqe->all_list); }