Linux Audio

Check our new training course

Yocto / OpenEmbedded training

Feb 10-13, 2025
Register
Loading...
Note: File does not exist in v6.8.
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Basic worker thread pool for io_uring
   4 *
   5 * Copyright (C) 2019 Jens Axboe
   6 *
   7 */
   8#include <linux/kernel.h>
   9#include <linux/init.h>
  10#include <linux/errno.h>
  11#include <linux/sched/signal.h>
  12#include <linux/mm.h>
  13#include <linux/sched/mm.h>
  14#include <linux/percpu.h>
  15#include <linux/slab.h>
  16#include <linux/kthread.h>
  17#include <linux/rculist_nulls.h>
  18#include <linux/fs_struct.h>
  19#include <linux/task_work.h>
  20
  21#include "io-wq.h"
  22
  23#define WORKER_IDLE_TIMEOUT	(5 * HZ)
  24
  25enum {
  26	IO_WORKER_F_UP		= 1,	/* up and active */
  27	IO_WORKER_F_RUNNING	= 2,	/* account as running */
  28	IO_WORKER_F_FREE	= 4,	/* worker on free list */
  29	IO_WORKER_F_EXITING	= 8,	/* worker exiting */
  30	IO_WORKER_F_FIXED	= 16,	/* static idle worker */
  31	IO_WORKER_F_BOUND	= 32,	/* is doing bounded work */
  32};
  33
  34enum {
  35	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
  36	IO_WQ_BIT_CANCEL	= 1,	/* cancel work on list */
  37	IO_WQ_BIT_ERROR		= 2,	/* error on setup */
  38};
  39
  40enum {
  41	IO_WQE_FLAG_STALLED	= 1,	/* stalled on hash */
  42};
  43
  44/*
  45 * One for each thread in a wqe pool
  46 */
  47struct io_worker {
  48	refcount_t ref;
  49	unsigned flags;
  50	struct hlist_nulls_node nulls_node;
  51	struct list_head all_list;
  52	struct task_struct *task;
  53	struct io_wqe *wqe;
  54
  55	struct io_wq_work *cur_work;
  56	spinlock_t lock;
  57
  58	struct rcu_head rcu;
  59	struct mm_struct *mm;
  60	const struct cred *cur_creds;
  61	const struct cred *saved_creds;
  62	struct files_struct *restore_files;
  63	struct fs_struct *restore_fs;
  64};
  65
  66#if BITS_PER_LONG == 64
  67#define IO_WQ_HASH_ORDER	6
  68#else
  69#define IO_WQ_HASH_ORDER	5
  70#endif
  71
  72#define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)
  73
  74struct io_wqe_acct {
  75	unsigned nr_workers;
  76	unsigned max_workers;
  77	atomic_t nr_running;
  78};
  79
  80enum {
  81	IO_WQ_ACCT_BOUND,
  82	IO_WQ_ACCT_UNBOUND,
  83};
  84
  85/*
  86 * Per-node worker thread pool
  87 */
  88struct io_wqe {
  89	struct {
  90		spinlock_t lock;
  91		struct io_wq_work_list work_list;
  92		unsigned long hash_map;
  93		unsigned flags;
  94	} ____cacheline_aligned_in_smp;
  95
  96	int node;
  97	struct io_wqe_acct acct[2];
  98
  99	struct hlist_nulls_head free_list;
 100	struct list_head all_list;
 101
 102	struct io_wq *wq;
 103	struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
 104};
 105
 106/*
 107 * Per io_wq state
 108  */
 109struct io_wq {
 110	struct io_wqe **wqes;
 111	unsigned long state;
 112
 113	free_work_fn *free_work;
 114	io_wq_work_fn *do_work;
 115
 116	struct task_struct *manager;
 117	struct user_struct *user;
 118	refcount_t refs;
 119	struct completion done;
 120
 121	refcount_t use_refs;
 122};
 123
 124static bool io_worker_get(struct io_worker *worker)
 125{
 126	return refcount_inc_not_zero(&worker->ref);
 127}
 128
 129static void io_worker_release(struct io_worker *worker)
 130{
 131	if (refcount_dec_and_test(&worker->ref))
 132		wake_up_process(worker->task);
 133}
 134
 135/*
 136 * Note: drops the wqe->lock if returning true! The caller must re-acquire
 137 * the lock in that case. Some callers need to restart handling if this
 138 * happens, so we can't just re-acquire the lock on behalf of the caller.
 139 */
 140static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
 141{
 142	bool dropped_lock = false;
 143
 144	if (worker->saved_creds) {
 145		revert_creds(worker->saved_creds);
 146		worker->cur_creds = worker->saved_creds = NULL;
 147	}
 148
 149	if (current->files != worker->restore_files) {
 150		__acquire(&wqe->lock);
 151		spin_unlock_irq(&wqe->lock);
 152		dropped_lock = true;
 153
 154		task_lock(current);
 155		current->files = worker->restore_files;
 156		task_unlock(current);
 157	}
 158
 159	if (current->fs != worker->restore_fs)
 160		current->fs = worker->restore_fs;
 161
 162	/*
 163	 * If we have an active mm, we need to drop the wq lock before unusing
 164	 * it. If we do, return true and let the caller retry the idle loop.
 165	 */
 166	if (worker->mm) {
 167		if (!dropped_lock) {
 168			__acquire(&wqe->lock);
 169			spin_unlock_irq(&wqe->lock);
 170			dropped_lock = true;
 171		}
 172		__set_current_state(TASK_RUNNING);
 173		kthread_unuse_mm(worker->mm);
 174		mmput(worker->mm);
 175		worker->mm = NULL;
 176	}
 177
 178	return dropped_lock;
 179}
 180
 181static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
 182						   struct io_wq_work *work)
 183{
 184	if (work->flags & IO_WQ_WORK_UNBOUND)
 185		return &wqe->acct[IO_WQ_ACCT_UNBOUND];
 186
 187	return &wqe->acct[IO_WQ_ACCT_BOUND];
 188}
 189
 190static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe,
 191						  struct io_worker *worker)
 192{
 193	if (worker->flags & IO_WORKER_F_BOUND)
 194		return &wqe->acct[IO_WQ_ACCT_BOUND];
 195
 196	return &wqe->acct[IO_WQ_ACCT_UNBOUND];
 197}
 198
 199static void io_worker_exit(struct io_worker *worker)
 200{
 201	struct io_wqe *wqe = worker->wqe;
 202	struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
 203	unsigned nr_workers;
 204
 205	/*
 206	 * If we're not at zero, someone else is holding a brief reference
 207	 * to the worker. Wait for that to go away.
 208	 */
 209	set_current_state(TASK_INTERRUPTIBLE);
 210	if (!refcount_dec_and_test(&worker->ref))
 211		schedule();
 212	__set_current_state(TASK_RUNNING);
 213
 214	preempt_disable();
 215	current->flags &= ~PF_IO_WORKER;
 216	if (worker->flags & IO_WORKER_F_RUNNING)
 217		atomic_dec(&acct->nr_running);
 218	if (!(worker->flags & IO_WORKER_F_BOUND))
 219		atomic_dec(&wqe->wq->user->processes);
 220	worker->flags = 0;
 221	preempt_enable();
 222
 223	spin_lock_irq(&wqe->lock);
 224	hlist_nulls_del_rcu(&worker->nulls_node);
 225	list_del_rcu(&worker->all_list);
 226	if (__io_worker_unuse(wqe, worker)) {
 227		__release(&wqe->lock);
 228		spin_lock_irq(&wqe->lock);
 229	}
 230	acct->nr_workers--;
 231	nr_workers = wqe->acct[IO_WQ_ACCT_BOUND].nr_workers +
 232			wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers;
 233	spin_unlock_irq(&wqe->lock);
 234
 235	/* all workers gone, wq exit can proceed */
 236	if (!nr_workers && refcount_dec_and_test(&wqe->wq->refs))
 237		complete(&wqe->wq->done);
 238
 239	kfree_rcu(worker, rcu);
 240}
 241
 242static inline bool io_wqe_run_queue(struct io_wqe *wqe)
 243	__must_hold(wqe->lock)
 244{
 245	if (!wq_list_empty(&wqe->work_list) &&
 246	    !(wqe->flags & IO_WQE_FLAG_STALLED))
 247		return true;
 248	return false;
 249}
 250
 251/*
 252 * Check head of free list for an available worker. If one isn't available,
 253 * caller must wake up the wq manager to create one.
 254 */
 255static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
 256	__must_hold(RCU)
 257{
 258	struct hlist_nulls_node *n;
 259	struct io_worker *worker;
 260
 261	n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
 262	if (is_a_nulls(n))
 263		return false;
 264
 265	worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
 266	if (io_worker_get(worker)) {
 267		wake_up_process(worker->task);
 268		io_worker_release(worker);
 269		return true;
 270	}
 271
 272	return false;
 273}
 274
 275/*
 276 * We need a worker. If we find a free one, we're good. If not, and we're
 277 * below the max number of workers, wake up the manager to create one.
 278 */
 279static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
 280{
 281	bool ret;
 282
 283	/*
 284	 * Most likely an attempt to queue unbounded work on an io_wq that
 285	 * wasn't setup with any unbounded workers.
 286	 */
 287	WARN_ON_ONCE(!acct->max_workers);
 288
 289	rcu_read_lock();
 290	ret = io_wqe_activate_free_worker(wqe);
 291	rcu_read_unlock();
 292
 293	if (!ret && acct->nr_workers < acct->max_workers)
 294		wake_up_process(wqe->wq->manager);
 295}
 296
 297static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker)
 298{
 299	struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
 300
 301	atomic_inc(&acct->nr_running);
 302}
 303
 304static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker)
 305	__must_hold(wqe->lock)
 306{
 307	struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
 308
 309	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
 310		io_wqe_wake_worker(wqe, acct);
 311}
 312
 313static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
 314{
 315	allow_kernel_signal(SIGINT);
 316
 317	current->flags |= PF_IO_WORKER;
 318
 319	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
 320	worker->restore_files = current->files;
 321	worker->restore_fs = current->fs;
 322	io_wqe_inc_running(wqe, worker);
 323}
 324
 325/*
 326 * Worker will start processing some work. Move it to the busy list, if
 327 * it's currently on the freelist
 328 */
 329static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
 330			     struct io_wq_work *work)
 331	__must_hold(wqe->lock)
 332{
 333	bool worker_bound, work_bound;
 334
 335	if (worker->flags & IO_WORKER_F_FREE) {
 336		worker->flags &= ~IO_WORKER_F_FREE;
 337		hlist_nulls_del_init_rcu(&worker->nulls_node);
 338	}
 339
 340	/*
 341	 * If worker is moving from bound to unbound (or vice versa), then
 342	 * ensure we update the running accounting.
 343	 */
 344	worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
 345	work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
 346	if (worker_bound != work_bound) {
 347		io_wqe_dec_running(wqe, worker);
 348		if (work_bound) {
 349			worker->flags |= IO_WORKER_F_BOUND;
 350			wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
 351			wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
 352			atomic_dec(&wqe->wq->user->processes);
 353		} else {
 354			worker->flags &= ~IO_WORKER_F_BOUND;
 355			wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
 356			wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
 357			atomic_inc(&wqe->wq->user->processes);
 358		}
 359		io_wqe_inc_running(wqe, worker);
 360	 }
 361}
 362
 363/*
 364 * No work, worker going to sleep. Move to freelist, and unuse mm if we
 365 * have one attached. Dropping the mm may potentially sleep, so we drop
 366 * the lock in that case and return success. Since the caller has to
 367 * retry the loop in that case (we changed task state), we don't regrab
 368 * the lock if we return success.
 369 */
 370static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
 371	__must_hold(wqe->lock)
 372{
 373	if (!(worker->flags & IO_WORKER_F_FREE)) {
 374		worker->flags |= IO_WORKER_F_FREE;
 375		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
 376	}
 377
 378	return __io_worker_unuse(wqe, worker);
 379}
 380
 381static inline unsigned int io_get_work_hash(struct io_wq_work *work)
 382{
 383	return work->flags >> IO_WQ_HASH_SHIFT;
 384}
 385
 386static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
 387	__must_hold(wqe->lock)
 388{
 389	struct io_wq_work_node *node, *prev;
 390	struct io_wq_work *work, *tail;
 391	unsigned int hash;
 392
 393	wq_list_for_each(node, prev, &wqe->work_list) {
 394		work = container_of(node, struct io_wq_work, list);
 395
 396		/* not hashed, can run anytime */
 397		if (!io_wq_is_hashed(work)) {
 398			wq_list_del(&wqe->work_list, node, prev);
 399			return work;
 400		}
 401
 402		/* hashed, can run if not already running */
 403		hash = io_get_work_hash(work);
 404		if (!(wqe->hash_map & BIT(hash))) {
 405			wqe->hash_map |= BIT(hash);
 406			/* all items with this hash lie in [work, tail] */
 407			tail = wqe->hash_tail[hash];
 408			wqe->hash_tail[hash] = NULL;
 409			wq_list_cut(&wqe->work_list, &tail->list, prev);
 410			return work;
 411		}
 412	}
 413
 414	return NULL;
 415}
 416
 417static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work)
 418{
 419	if (worker->mm) {
 420		kthread_unuse_mm(worker->mm);
 421		mmput(worker->mm);
 422		worker->mm = NULL;
 423	}
 424	if (!work->mm)
 425		return;
 426
 427	if (mmget_not_zero(work->mm)) {
 428		kthread_use_mm(work->mm);
 429		worker->mm = work->mm;
 430		/* hang on to this mm */
 431		work->mm = NULL;
 432		return;
 433	}
 434
 435	/* failed grabbing mm, ensure work gets cancelled */
 436	work->flags |= IO_WQ_WORK_CANCEL;
 437}
 438
 439static void io_wq_switch_creds(struct io_worker *worker,
 440			       struct io_wq_work *work)
 441{
 442	const struct cred *old_creds = override_creds(work->creds);
 443
 444	worker->cur_creds = work->creds;
 445	if (worker->saved_creds)
 446		put_cred(old_creds); /* creds set by previous switch */
 447	else
 448		worker->saved_creds = old_creds;
 449}
 450
 451static void io_impersonate_work(struct io_worker *worker,
 452				struct io_wq_work *work)
 453{
 454	if (work->files && current->files != work->files) {
 455		task_lock(current);
 456		current->files = work->files;
 457		task_unlock(current);
 458	}
 459	if (work->fs && current->fs != work->fs)
 460		current->fs = work->fs;
 461	if (work->mm != worker->mm)
 462		io_wq_switch_mm(worker, work);
 463	if (worker->cur_creds != work->creds)
 464		io_wq_switch_creds(worker, work);
 465	current->signal->rlim[RLIMIT_FSIZE].rlim_cur = work->fsize;
 466}
 467
 468static void io_assign_current_work(struct io_worker *worker,
 469				   struct io_wq_work *work)
 470{
 471	if (work) {
 472		/* flush pending signals before assigning new work */
 473		if (signal_pending(current))
 474			flush_signals(current);
 475		cond_resched();
 476	}
 477
 478	spin_lock_irq(&worker->lock);
 479	worker->cur_work = work;
 480	spin_unlock_irq(&worker->lock);
 481}
 482
 483static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
 484
 485static void io_worker_handle_work(struct io_worker *worker)
 486	__releases(wqe->lock)
 487{
 488	struct io_wqe *wqe = worker->wqe;
 489	struct io_wq *wq = wqe->wq;
 490
 491	do {
 492		struct io_wq_work *work;
 493get_next:
 494		/*
 495		 * If we got some work, mark us as busy. If we didn't, but
 496		 * the list isn't empty, it means we stalled on hashed work.
 497		 * Mark us stalled so we don't keep looking for work when we
 498		 * can't make progress, any work completion or insertion will
 499		 * clear the stalled flag.
 500		 */
 501		work = io_get_next_work(wqe);
 502		if (work)
 503			__io_worker_busy(wqe, worker, work);
 504		else if (!wq_list_empty(&wqe->work_list))
 505			wqe->flags |= IO_WQE_FLAG_STALLED;
 506
 507		spin_unlock_irq(&wqe->lock);
 508		if (!work)
 509			break;
 510		io_assign_current_work(worker, work);
 511
 512		/* handle a whole dependent link */
 513		do {
 514			struct io_wq_work *old_work, *next_hashed, *linked;
 515			unsigned int hash = io_get_work_hash(work);
 516
 517			next_hashed = wq_next_work(work);
 518			io_impersonate_work(worker, work);
 519			/*
 520			 * OK to set IO_WQ_WORK_CANCEL even for uncancellable
 521			 * work, the worker function will do the right thing.
 522			 */
 523			if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
 524				work->flags |= IO_WQ_WORK_CANCEL;
 525
 526			old_work = work;
 527			linked = wq->do_work(work);
 528
 529			work = next_hashed;
 530			if (!work && linked && !io_wq_is_hashed(linked)) {
 531				work = linked;
 532				linked = NULL;
 533			}
 534			io_assign_current_work(worker, work);
 535			wq->free_work(old_work);
 536
 537			if (linked)
 538				io_wqe_enqueue(wqe, linked);
 539
 540			if (hash != -1U && !next_hashed) {
 541				spin_lock_irq(&wqe->lock);
 542				wqe->hash_map &= ~BIT_ULL(hash);
 543				wqe->flags &= ~IO_WQE_FLAG_STALLED;
 544				/* skip unnecessary unlock-lock wqe->lock */
 545				if (!work)
 546					goto get_next;
 547				spin_unlock_irq(&wqe->lock);
 548			}
 549		} while (work);
 550
 551		spin_lock_irq(&wqe->lock);
 552	} while (1);
 553}
 554
 555static int io_wqe_worker(void *data)
 556{
 557	struct io_worker *worker = data;
 558	struct io_wqe *wqe = worker->wqe;
 559	struct io_wq *wq = wqe->wq;
 560
 561	io_worker_start(wqe, worker);
 562
 563	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
 564		set_current_state(TASK_INTERRUPTIBLE);
 565loop:
 566		spin_lock_irq(&wqe->lock);
 567		if (io_wqe_run_queue(wqe)) {
 568			__set_current_state(TASK_RUNNING);
 569			io_worker_handle_work(worker);
 570			goto loop;
 571		}
 572		/* drops the lock on success, retry */
 573		if (__io_worker_idle(wqe, worker)) {
 574			__release(&wqe->lock);
 575			goto loop;
 576		}
 577		spin_unlock_irq(&wqe->lock);
 578		if (signal_pending(current))
 579			flush_signals(current);
 580		if (schedule_timeout(WORKER_IDLE_TIMEOUT))
 581			continue;
 582		/* timed out, exit unless we're the fixed worker */
 583		if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
 584		    !(worker->flags & IO_WORKER_F_FIXED))
 585			break;
 586	}
 587
 588	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
 589		spin_lock_irq(&wqe->lock);
 590		if (!wq_list_empty(&wqe->work_list))
 591			io_worker_handle_work(worker);
 592		else
 593			spin_unlock_irq(&wqe->lock);
 594	}
 595
 596	io_worker_exit(worker);
 597	return 0;
 598}
 599
 600/*
 601 * Called when a worker is scheduled in. Mark us as currently running.
 602 */
 603void io_wq_worker_running(struct task_struct *tsk)
 604{
 605	struct io_worker *worker = kthread_data(tsk);
 606	struct io_wqe *wqe = worker->wqe;
 607
 608	if (!(worker->flags & IO_WORKER_F_UP))
 609		return;
 610	if (worker->flags & IO_WORKER_F_RUNNING)
 611		return;
 612	worker->flags |= IO_WORKER_F_RUNNING;
 613	io_wqe_inc_running(wqe, worker);
 614}
 615
 616/*
 617 * Called when worker is going to sleep. If there are no workers currently
 618 * running and we have work pending, wake up a free one or have the manager
 619 * set one up.
 620 */
 621void io_wq_worker_sleeping(struct task_struct *tsk)
 622{
 623	struct io_worker *worker = kthread_data(tsk);
 624	struct io_wqe *wqe = worker->wqe;
 625
 626	if (!(worker->flags & IO_WORKER_F_UP))
 627		return;
 628	if (!(worker->flags & IO_WORKER_F_RUNNING))
 629		return;
 630
 631	worker->flags &= ~IO_WORKER_F_RUNNING;
 632
 633	spin_lock_irq(&wqe->lock);
 634	io_wqe_dec_running(wqe, worker);
 635	spin_unlock_irq(&wqe->lock);
 636}
 637
 638static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 639{
 640	struct io_wqe_acct *acct =&wqe->acct[index];
 641	struct io_worker *worker;
 642
 643	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
 644	if (!worker)
 645		return false;
 646
 647	refcount_set(&worker->ref, 1);
 648	worker->nulls_node.pprev = NULL;
 649	worker->wqe = wqe;
 650	spin_lock_init(&worker->lock);
 651
 652	worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
 653				"io_wqe_worker-%d/%d", index, wqe->node);
 654	if (IS_ERR(worker->task)) {
 655		kfree(worker);
 656		return false;
 657	}
 658
 659	spin_lock_irq(&wqe->lock);
 660	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
 661	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
 662	worker->flags |= IO_WORKER_F_FREE;
 663	if (index == IO_WQ_ACCT_BOUND)
 664		worker->flags |= IO_WORKER_F_BOUND;
 665	if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
 666		worker->flags |= IO_WORKER_F_FIXED;
 667	acct->nr_workers++;
 668	spin_unlock_irq(&wqe->lock);
 669
 670	if (index == IO_WQ_ACCT_UNBOUND)
 671		atomic_inc(&wq->user->processes);
 672
 673	wake_up_process(worker->task);
 674	return true;
 675}
 676
 677static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
 678	__must_hold(wqe->lock)
 679{
 680	struct io_wqe_acct *acct = &wqe->acct[index];
 681
 682	/* if we have available workers or no work, no need */
 683	if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
 684		return false;
 685	return acct->nr_workers < acct->max_workers;
 686}
 687
 688/*
 689 * Manager thread. Tasked with creating new workers, if we need them.
 690 */
 691static int io_wq_manager(void *data)
 692{
 693	struct io_wq *wq = data;
 694	int workers_to_create = num_possible_nodes();
 695	int node;
 696
 697	/* create fixed workers */
 698	refcount_set(&wq->refs, workers_to_create);
 699	for_each_node(node) {
 700		if (!node_online(node))
 701			continue;
 702		if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND))
 703			goto err;
 704		workers_to_create--;
 705	}
 706
 707	while (workers_to_create--)
 708		refcount_dec(&wq->refs);
 709
 710	complete(&wq->done);
 711
 712	while (!kthread_should_stop()) {
 713		if (current->task_works)
 714			task_work_run();
 715
 716		for_each_node(node) {
 717			struct io_wqe *wqe = wq->wqes[node];
 718			bool fork_worker[2] = { false, false };
 719
 720			if (!node_online(node))
 721				continue;
 722
 723			spin_lock_irq(&wqe->lock);
 724			if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
 725				fork_worker[IO_WQ_ACCT_BOUND] = true;
 726			if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
 727				fork_worker[IO_WQ_ACCT_UNBOUND] = true;
 728			spin_unlock_irq(&wqe->lock);
 729			if (fork_worker[IO_WQ_ACCT_BOUND])
 730				create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
 731			if (fork_worker[IO_WQ_ACCT_UNBOUND])
 732				create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
 733		}
 734		set_current_state(TASK_INTERRUPTIBLE);
 735		schedule_timeout(HZ);
 736	}
 737
 738	if (current->task_works)
 739		task_work_run();
 740
 741	return 0;
 742err:
 743	set_bit(IO_WQ_BIT_ERROR, &wq->state);
 744	set_bit(IO_WQ_BIT_EXIT, &wq->state);
 745	if (refcount_sub_and_test(workers_to_create, &wq->refs))
 746		complete(&wq->done);
 747	return 0;
 748}
 749
 750static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
 751			    struct io_wq_work *work)
 752{
 753	bool free_worker;
 754
 755	if (!(work->flags & IO_WQ_WORK_UNBOUND))
 756		return true;
 757	if (atomic_read(&acct->nr_running))
 758		return true;
 759
 760	rcu_read_lock();
 761	free_worker = !hlist_nulls_empty(&wqe->free_list);
 762	rcu_read_unlock();
 763	if (free_worker)
 764		return true;
 765
 766	if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers &&
 767	    !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN)))
 768		return false;
 769
 770	return true;
 771}
 772
 773static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
 774{
 775	struct io_wq *wq = wqe->wq;
 776
 777	do {
 778		struct io_wq_work *old_work = work;
 779
 780		work->flags |= IO_WQ_WORK_CANCEL;
 781		work = wq->do_work(work);
 782		wq->free_work(old_work);
 783	} while (work);
 784}
 785
 786static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
 787{
 788	unsigned int hash;
 789	struct io_wq_work *tail;
 790
 791	if (!io_wq_is_hashed(work)) {
 792append:
 793		wq_list_add_tail(&work->list, &wqe->work_list);
 794		return;
 795	}
 796
 797	hash = io_get_work_hash(work);
 798	tail = wqe->hash_tail[hash];
 799	wqe->hash_tail[hash] = work;
 800	if (!tail)
 801		goto append;
 802
 803	wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
 804}
 805
 806static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
 807{
 808	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
 809	int work_flags;
 810	unsigned long flags;
 811
 812	/*
 813	 * Do early check to see if we need a new unbound worker, and if we do,
 814	 * if we're allowed to do so. This isn't 100% accurate as there's a
 815	 * gap between this check and incrementing the value, but that's OK.
 816	 * It's close enough to not be an issue, fork() has the same delay.
 817	 */
 818	if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
 819		io_run_cancel(work, wqe);
 820		return;
 821	}
 822
 823	work_flags = work->flags;
 824	spin_lock_irqsave(&wqe->lock, flags);
 825	io_wqe_insert_work(wqe, work);
 826	wqe->flags &= ~IO_WQE_FLAG_STALLED;
 827	spin_unlock_irqrestore(&wqe->lock, flags);
 828
 829	if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
 830	    !atomic_read(&acct->nr_running))
 831		io_wqe_wake_worker(wqe, acct);
 832}
 833
 834void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
 835{
 836	struct io_wqe *wqe = wq->wqes[numa_node_id()];
 837
 838	io_wqe_enqueue(wqe, work);
 839}
 840
 841/*
 842 * Work items that hash to the same value will not be done in parallel.
 843 * Used to limit concurrent writes, generally hashed by inode.
 844 */
 845void io_wq_hash_work(struct io_wq_work *work, void *val)
 846{
 847	unsigned int bit;
 848
 849	bit = hash_ptr(val, IO_WQ_HASH_ORDER);
 850	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
 851}
 852
 853static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data)
 854{
 855	send_sig(SIGINT, worker->task, 1);
 856	return false;
 857}
 858
 859/*
 860 * Iterate the passed in list and call the specific function for each
 861 * worker that isn't exiting
 862 */
 863static bool io_wq_for_each_worker(struct io_wqe *wqe,
 864				  bool (*func)(struct io_worker *, void *),
 865				  void *data)
 866{
 867	struct io_worker *worker;
 868	bool ret = false;
 869
 870	list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
 871		if (io_worker_get(worker)) {
 872			/* no task if node is/was offline */
 873			if (worker->task)
 874				ret = func(worker, data);
 875			io_worker_release(worker);
 876			if (ret)
 877				break;
 878		}
 879	}
 880
 881	return ret;
 882}
 883
 884void io_wq_cancel_all(struct io_wq *wq)
 885{
 886	int node;
 887
 888	set_bit(IO_WQ_BIT_CANCEL, &wq->state);
 889
 890	rcu_read_lock();
 891	for_each_node(node) {
 892		struct io_wqe *wqe = wq->wqes[node];
 893
 894		io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL);
 895	}
 896	rcu_read_unlock();
 897}
 898
 899struct io_cb_cancel_data {
 900	work_cancel_fn *fn;
 901	void *data;
 902	int nr_running;
 903	int nr_pending;
 904	bool cancel_all;
 905};
 906
 907static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
 908{
 909	struct io_cb_cancel_data *match = data;
 910	unsigned long flags;
 911
 912	/*
 913	 * Hold the lock to avoid ->cur_work going out of scope, caller
 914	 * may dereference the passed in work.
 915	 */
 916	spin_lock_irqsave(&worker->lock, flags);
 917	if (worker->cur_work &&
 918	    !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) &&
 919	    match->fn(worker->cur_work, match->data)) {
 920		send_sig(SIGINT, worker->task, 1);
 921		match->nr_running++;
 922	}
 923	spin_unlock_irqrestore(&worker->lock, flags);
 924
 925	return match->nr_running && !match->cancel_all;
 926}
 927
 928static inline void io_wqe_remove_pending(struct io_wqe *wqe,
 929					 struct io_wq_work *work,
 930					 struct io_wq_work_node *prev)
 931{
 932	unsigned int hash = io_get_work_hash(work);
 933	struct io_wq_work *prev_work = NULL;
 934
 935	if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
 936		if (prev)
 937			prev_work = container_of(prev, struct io_wq_work, list);
 938		if (prev_work && io_get_work_hash(prev_work) == hash)
 939			wqe->hash_tail[hash] = prev_work;
 940		else
 941			wqe->hash_tail[hash] = NULL;
 942	}
 943	wq_list_del(&wqe->work_list, &work->list, prev);
 944}
 945
 946static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
 947				       struct io_cb_cancel_data *match)
 948{
 949	struct io_wq_work_node *node, *prev;
 950	struct io_wq_work *work;
 951	unsigned long flags;
 952
 953retry:
 954	spin_lock_irqsave(&wqe->lock, flags);
 955	wq_list_for_each(node, prev, &wqe->work_list) {
 956		work = container_of(node, struct io_wq_work, list);
 957		if (!match->fn(work, match->data))
 958			continue;
 959		io_wqe_remove_pending(wqe, work, prev);
 960		spin_unlock_irqrestore(&wqe->lock, flags);
 961		io_run_cancel(work, wqe);
 962		match->nr_pending++;
 963		if (!match->cancel_all)
 964			return;
 965
 966		/* not safe to continue after unlock */
 967		goto retry;
 968	}
 969	spin_unlock_irqrestore(&wqe->lock, flags);
 970}
 971
 972static void io_wqe_cancel_running_work(struct io_wqe *wqe,
 973				       struct io_cb_cancel_data *match)
 974{
 975	rcu_read_lock();
 976	io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
 977	rcu_read_unlock();
 978}
 979
 980enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
 981				  void *data, bool cancel_all)
 982{
 983	struct io_cb_cancel_data match = {
 984		.fn		= cancel,
 985		.data		= data,
 986		.cancel_all	= cancel_all,
 987	};
 988	int node;
 989
 990	/*
 991	 * First check pending list, if we're lucky we can just remove it
 992	 * from there. CANCEL_OK means that the work is returned as-new,
 993	 * no completion will be posted for it.
 994	 */
 995	for_each_node(node) {
 996		struct io_wqe *wqe = wq->wqes[node];
 997
 998		io_wqe_cancel_pending_work(wqe, &match);
 999		if (match.nr_pending && !match.cancel_all)
1000			return IO_WQ_CANCEL_OK;
1001	}
1002
1003	/*
1004	 * Now check if a free (going busy) or busy worker has the work
1005	 * currently running. If we find it there, we'll return CANCEL_RUNNING
1006	 * as an indication that we attempt to signal cancellation. The
1007	 * completion will run normally in this case.
1008	 */
1009	for_each_node(node) {
1010		struct io_wqe *wqe = wq->wqes[node];
1011
1012		io_wqe_cancel_running_work(wqe, &match);
1013		if (match.nr_running && !match.cancel_all)
1014			return IO_WQ_CANCEL_RUNNING;
1015	}
1016
1017	if (match.nr_running)
1018		return IO_WQ_CANCEL_RUNNING;
1019	if (match.nr_pending)
1020		return IO_WQ_CANCEL_OK;
1021	return IO_WQ_CANCEL_NOTFOUND;
1022}
1023
1024static bool io_wq_io_cb_cancel_data(struct io_wq_work *work, void *data)
1025{
1026	return work == data;
1027}
1028
1029enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
1030{
1031	return io_wq_cancel_cb(wq, io_wq_io_cb_cancel_data, (void *)cwork, false);
1032}
1033
1034struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1035{
1036	int ret = -ENOMEM, node;
1037	struct io_wq *wq;
1038
1039	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1040		return ERR_PTR(-EINVAL);
1041
1042	wq = kzalloc(sizeof(*wq), GFP_KERNEL);
1043	if (!wq)
1044		return ERR_PTR(-ENOMEM);
1045
1046	wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL);
1047	if (!wq->wqes) {
1048		kfree(wq);
1049		return ERR_PTR(-ENOMEM);
1050	}
1051
1052	wq->free_work = data->free_work;
1053	wq->do_work = data->do_work;
1054
1055	/* caller must already hold a reference to this */
1056	wq->user = data->user;
1057
1058	for_each_node(node) {
1059		struct io_wqe *wqe;
1060		int alloc_node = node;
1061
1062		if (!node_online(alloc_node))
1063			alloc_node = NUMA_NO_NODE;
1064		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1065		if (!wqe)
1066			goto err;
1067		wq->wqes[node] = wqe;
1068		wqe->node = alloc_node;
1069		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1070		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
1071		if (wq->user) {
1072			wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1073					task_rlimit(current, RLIMIT_NPROC);
1074		}
1075		atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
1076		wqe->wq = wq;
1077		spin_lock_init(&wqe->lock);
1078		INIT_WQ_LIST(&wqe->work_list);
1079		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1080		INIT_LIST_HEAD(&wqe->all_list);
1081	}
1082
1083	init_completion(&wq->done);
1084
1085	wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
1086	if (!IS_ERR(wq->manager)) {
1087		wake_up_process(wq->manager);
1088		wait_for_completion(&wq->done);
1089		if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
1090			ret = -ENOMEM;
1091			goto err;
1092		}
1093		refcount_set(&wq->use_refs, 1);
1094		reinit_completion(&wq->done);
1095		return wq;
1096	}
1097
1098	ret = PTR_ERR(wq->manager);
1099	complete(&wq->done);
1100err:
1101	for_each_node(node)
1102		kfree(wq->wqes[node]);
1103	kfree(wq->wqes);
1104	kfree(wq);
1105	return ERR_PTR(ret);
1106}
1107
1108bool io_wq_get(struct io_wq *wq, struct io_wq_data *data)
1109{
1110	if (data->free_work != wq->free_work || data->do_work != wq->do_work)
1111		return false;
1112
1113	return refcount_inc_not_zero(&wq->use_refs);
1114}
1115
1116static bool io_wq_worker_wake(struct io_worker *worker, void *data)
1117{
1118	wake_up_process(worker->task);
1119	return false;
1120}
1121
1122static void __io_wq_destroy(struct io_wq *wq)
1123{
1124	int node;
1125
1126	set_bit(IO_WQ_BIT_EXIT, &wq->state);
1127	if (wq->manager)
1128		kthread_stop(wq->manager);
1129
1130	rcu_read_lock();
1131	for_each_node(node)
1132		io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
1133	rcu_read_unlock();
1134
1135	wait_for_completion(&wq->done);
1136
1137	for_each_node(node)
1138		kfree(wq->wqes[node]);
1139	kfree(wq->wqes);
1140	kfree(wq);
1141}
1142
1143void io_wq_destroy(struct io_wq *wq)
1144{
1145	if (refcount_dec_and_test(&wq->use_refs))
1146		__io_wq_destroy(wq);
1147}
1148
1149struct task_struct *io_wq_get_task(struct io_wq *wq)
1150{
1151	return wq->manager;
1152}