Linux Audio

Check our new training course

Loading...
Note: File does not exist in v4.17.
  1// SPDX-License-Identifier: GPL-2.0-only
  2/*
  3 * NFS client support for local clients to bypass network stack
  4 *
  5 * Copyright (C) 2014 Weston Andros Adamson <dros@primarydata.com>
  6 * Copyright (C) 2019 Trond Myklebust <trond.myklebust@hammerspace.com>
  7 * Copyright (C) 2024 Mike Snitzer <snitzer@hammerspace.com>
  8 * Copyright (C) 2024 NeilBrown <neilb@suse.de>
  9 */
 10
 11#include <linux/module.h>
 12#include <linux/errno.h>
 13#include <linux/vfs.h>
 14#include <linux/file.h>
 15#include <linux/inet.h>
 16#include <linux/sunrpc/addr.h>
 17#include <linux/inetdevice.h>
 18#include <net/addrconf.h>
 19#include <linux/nfs_common.h>
 20#include <linux/nfslocalio.h>
 21#include <linux/bvec.h>
 22
 23#include <linux/nfs.h>
 24#include <linux/nfs_fs.h>
 25#include <linux/nfs_xdr.h>
 26
 27#include "internal.h"
 28#include "pnfs.h"
 29#include "nfstrace.h"
 30
 31#define NFSDBG_FACILITY		NFSDBG_VFS
 32
 33struct nfs_local_kiocb {
 34	struct kiocb		kiocb;
 35	struct bio_vec		*bvec;
 36	struct nfs_pgio_header	*hdr;
 37	struct work_struct	work;
 38	struct nfsd_file	*localio;
 39};
 40
 41struct nfs_local_fsync_ctx {
 42	struct nfsd_file	*localio;
 43	struct nfs_commit_data	*data;
 44	struct work_struct	work;
 45	struct completion	*done;
 46};
 47
 48static bool localio_enabled __read_mostly = true;
 49module_param(localio_enabled, bool, 0644);
 50
 51static inline bool nfs_client_is_local(const struct nfs_client *clp)
 52{
 53	return !!test_bit(NFS_CS_LOCAL_IO, &clp->cl_flags);
 54}
 55
 56bool nfs_server_is_local(const struct nfs_client *clp)
 57{
 58	return nfs_client_is_local(clp) && localio_enabled;
 59}
 60EXPORT_SYMBOL_GPL(nfs_server_is_local);
 61
 62/*
 63 * UUID_IS_LOCAL XDR functions
 64 */
 65
 66static void localio_xdr_enc_uuidargs(struct rpc_rqst *req,
 67				     struct xdr_stream *xdr,
 68				     const void *data)
 69{
 70	const u8 *uuid = data;
 71
 72	encode_opaque_fixed(xdr, uuid, UUID_SIZE);
 73}
 74
 75static int localio_xdr_dec_uuidres(struct rpc_rqst *req,
 76				   struct xdr_stream *xdr,
 77				   void *result)
 78{
 79	/* void return */
 80	return 0;
 81}
 82
 83static const struct rpc_procinfo nfs_localio_procedures[] = {
 84	[LOCALIOPROC_UUID_IS_LOCAL] = {
 85		.p_proc = LOCALIOPROC_UUID_IS_LOCAL,
 86		.p_encode = localio_xdr_enc_uuidargs,
 87		.p_decode = localio_xdr_dec_uuidres,
 88		.p_arglen = XDR_QUADLEN(UUID_SIZE),
 89		.p_replen = 0,
 90		.p_statidx = LOCALIOPROC_UUID_IS_LOCAL,
 91		.p_name = "UUID_IS_LOCAL",
 92	},
 93};
 94
 95static unsigned int nfs_localio_counts[ARRAY_SIZE(nfs_localio_procedures)];
 96static const struct rpc_version nfslocalio_version1 = {
 97	.number			= 1,
 98	.nrprocs		= ARRAY_SIZE(nfs_localio_procedures),
 99	.procs			= nfs_localio_procedures,
100	.counts			= nfs_localio_counts,
101};
102
103static const struct rpc_version *nfslocalio_version[] = {
104       [1]			= &nfslocalio_version1,
105};
106
107extern const struct rpc_program nfslocalio_program;
108static struct rpc_stat		nfslocalio_rpcstat = { &nfslocalio_program };
109
110const struct rpc_program nfslocalio_program = {
111	.name			= "nfslocalio",
112	.number			= NFS_LOCALIO_PROGRAM,
113	.nrvers			= ARRAY_SIZE(nfslocalio_version),
114	.version		= nfslocalio_version,
115	.stats			= &nfslocalio_rpcstat,
116};
117
118/*
119 * nfs_local_enable - enable local i/o for an nfs_client
120 */
121static void nfs_local_enable(struct nfs_client *clp)
122{
123	spin_lock(&clp->cl_localio_lock);
124	set_bit(NFS_CS_LOCAL_IO, &clp->cl_flags);
125	trace_nfs_local_enable(clp);
126	spin_unlock(&clp->cl_localio_lock);
127}
128
129/*
130 * nfs_local_disable - disable local i/o for an nfs_client
131 */
132void nfs_local_disable(struct nfs_client *clp)
133{
134	spin_lock(&clp->cl_localio_lock);
135	if (test_and_clear_bit(NFS_CS_LOCAL_IO, &clp->cl_flags)) {
136		trace_nfs_local_disable(clp);
137		nfs_uuid_invalidate_one_client(&clp->cl_uuid);
138	}
139	spin_unlock(&clp->cl_localio_lock);
140}
141
142/*
143 * nfs_init_localioclient - Initialise an NFS localio client connection
144 */
145static struct rpc_clnt *nfs_init_localioclient(struct nfs_client *clp)
146{
147	struct rpc_clnt *rpcclient_localio;
148
149	rpcclient_localio = rpc_bind_new_program(clp->cl_rpcclient,
150						 &nfslocalio_program, 1);
151
152	dprintk_rcu("%s: server (%s) %s NFS LOCALIO.\n",
153		__func__, rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_ADDR),
154		(IS_ERR(rpcclient_localio) ? "does not support" : "supports"));
155
156	return rpcclient_localio;
157}
158
159static bool nfs_server_uuid_is_local(struct nfs_client *clp)
160{
161	u8 uuid[UUID_SIZE];
162	struct rpc_message msg = {
163		.rpc_argp = &uuid,
164	};
165	struct rpc_clnt *rpcclient_localio;
166	int status;
167
168	rpcclient_localio = nfs_init_localioclient(clp);
169	if (IS_ERR(rpcclient_localio))
170		return false;
171
172	export_uuid(uuid, &clp->cl_uuid.uuid);
173
174	msg.rpc_proc = &nfs_localio_procedures[LOCALIOPROC_UUID_IS_LOCAL];
175	status = rpc_call_sync(rpcclient_localio, &msg, 0);
176	dprintk("%s: NFS reply UUID_IS_LOCAL: status=%d\n",
177		__func__, status);
178	rpc_shutdown_client(rpcclient_localio);
179
180	/* Server is only local if it initialized required struct members */
181	if (status || !clp->cl_uuid.net || !clp->cl_uuid.dom)
182		return false;
183
184	return true;
185}
186
187/*
188 * nfs_local_probe - probe local i/o support for an nfs_server and nfs_client
189 * - called after alloc_client and init_client (so cl_rpcclient exists)
190 * - this function is idempotent, it can be called for old or new clients
191 */
192void nfs_local_probe(struct nfs_client *clp)
193{
194	/* Disallow localio if disabled via sysfs or AUTH_SYS isn't used */
195	if (!localio_enabled ||
196	    clp->cl_rpcclient->cl_auth->au_flavor != RPC_AUTH_UNIX) {
197		nfs_local_disable(clp);
198		return;
199	}
200
201	if (nfs_client_is_local(clp)) {
202		/* If already enabled, disable and re-enable */
203		nfs_local_disable(clp);
204	}
205
206	if (!nfs_uuid_begin(&clp->cl_uuid))
207		return;
208	if (nfs_server_uuid_is_local(clp))
209		nfs_local_enable(clp);
210	nfs_uuid_end(&clp->cl_uuid);
211}
212EXPORT_SYMBOL_GPL(nfs_local_probe);
213
214/*
215 * nfs_local_open_fh - open a local filehandle in terms of nfsd_file
216 *
217 * Returns a pointer to a struct nfsd_file or NULL
218 */
219struct nfsd_file *
220nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
221		  struct nfs_fh *fh, const fmode_t mode)
222{
223	struct nfsd_file *localio;
224	int status;
225
226	if (!nfs_server_is_local(clp))
227		return NULL;
228	if (mode & ~(FMODE_READ | FMODE_WRITE))
229		return NULL;
230
231	localio = nfs_open_local_fh(&clp->cl_uuid, clp->cl_rpcclient,
232				    cred, fh, mode);
233	if (IS_ERR(localio)) {
234		status = PTR_ERR(localio);
235		trace_nfs_local_open_fh(fh, mode, status);
236		switch (status) {
237		case -ENOMEM:
238		case -ENXIO:
239		case -ENOENT:
240			/* Revalidate localio, will disable if unsupported */
241			nfs_local_probe(clp);
242		}
243		return NULL;
244	}
245	return localio;
246}
247EXPORT_SYMBOL_GPL(nfs_local_open_fh);
248
249static struct bio_vec *
250nfs_bvec_alloc_and_import_pagevec(struct page **pagevec,
251		unsigned int npages, gfp_t flags)
252{
253	struct bio_vec *bvec, *p;
254
255	bvec = kmalloc_array(npages, sizeof(*bvec), flags);
256	if (bvec != NULL) {
257		for (p = bvec; npages > 0; p++, pagevec++, npages--) {
258			p->bv_page = *pagevec;
259			p->bv_len = PAGE_SIZE;
260			p->bv_offset = 0;
261		}
262	}
263	return bvec;
264}
265
266static void
267nfs_local_iocb_free(struct nfs_local_kiocb *iocb)
268{
269	kfree(iocb->bvec);
270	kfree(iocb);
271}
272
273static struct nfs_local_kiocb *
274nfs_local_iocb_alloc(struct nfs_pgio_header *hdr,
275		     struct file *file, gfp_t flags)
276{
277	struct nfs_local_kiocb *iocb;
278
279	iocb = kmalloc(sizeof(*iocb), flags);
280	if (iocb == NULL)
281		return NULL;
282	iocb->bvec = nfs_bvec_alloc_and_import_pagevec(hdr->page_array.pagevec,
283			hdr->page_array.npages, flags);
284	if (iocb->bvec == NULL) {
285		kfree(iocb);
286		return NULL;
287	}
288	init_sync_kiocb(&iocb->kiocb, file);
289	iocb->kiocb.ki_pos = hdr->args.offset;
290	iocb->hdr = hdr;
291	iocb->kiocb.ki_flags &= ~IOCB_APPEND;
292	return iocb;
293}
294
295static void
296nfs_local_iter_init(struct iov_iter *i, struct nfs_local_kiocb *iocb, int dir)
297{
298	struct nfs_pgio_header *hdr = iocb->hdr;
299
300	iov_iter_bvec(i, dir, iocb->bvec, hdr->page_array.npages,
301		      hdr->args.count + hdr->args.pgbase);
302	if (hdr->args.pgbase != 0)
303		iov_iter_advance(i, hdr->args.pgbase);
304}
305
306static void
307nfs_local_hdr_release(struct nfs_pgio_header *hdr,
308		const struct rpc_call_ops *call_ops)
309{
310	call_ops->rpc_call_done(&hdr->task, hdr);
311	call_ops->rpc_release(hdr);
312}
313
314static void
315nfs_local_pgio_init(struct nfs_pgio_header *hdr,
316		const struct rpc_call_ops *call_ops)
317{
318	hdr->task.tk_ops = call_ops;
319	if (!hdr->task.tk_start)
320		hdr->task.tk_start = ktime_get();
321}
322
323static void
324nfs_local_pgio_done(struct nfs_pgio_header *hdr, long status)
325{
326	if (status >= 0) {
327		hdr->res.count = status;
328		hdr->res.op_status = NFS4_OK;
329		hdr->task.tk_status = 0;
330	} else {
331		hdr->res.op_status = nfs_localio_errno_to_nfs4_stat(status);
332		hdr->task.tk_status = status;
333	}
334}
335
336static void
337nfs_local_pgio_release(struct nfs_local_kiocb *iocb)
338{
339	struct nfs_pgio_header *hdr = iocb->hdr;
340
341	nfs_to_nfsd_file_put_local(iocb->localio);
342	nfs_local_iocb_free(iocb);
343	nfs_local_hdr_release(hdr, hdr->task.tk_ops);
344}
345
346static void
347nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
348{
349	struct nfs_pgio_header *hdr = iocb->hdr;
350	struct file *filp = iocb->kiocb.ki_filp;
351
352	nfs_local_pgio_done(hdr, status);
353
354	/*
355	 * Must clear replen otherwise NFSv3 data corruption will occur
356	 * if/when switching from LOCALIO back to using normal RPC.
357	 */
358	hdr->res.replen = 0;
359
360	if (hdr->res.count != hdr->args.count ||
361	    hdr->args.offset + hdr->res.count >= i_size_read(file_inode(filp)))
362		hdr->res.eof = true;
363
364	dprintk("%s: read %ld bytes eof %d.\n", __func__,
365			status > 0 ? status : 0, hdr->res.eof);
366}
367
368static void nfs_local_call_read(struct work_struct *work)
369{
370	struct nfs_local_kiocb *iocb =
371		container_of(work, struct nfs_local_kiocb, work);
372	struct file *filp = iocb->kiocb.ki_filp;
373	const struct cred *save_cred;
374	struct iov_iter iter;
375	ssize_t status;
376
377	save_cred = override_creds(filp->f_cred);
378
379	nfs_local_iter_init(&iter, iocb, READ);
380
381	status = filp->f_op->read_iter(&iocb->kiocb, &iter);
382	WARN_ON_ONCE(status == -EIOCBQUEUED);
383
384	nfs_local_read_done(iocb, status);
385	nfs_local_pgio_release(iocb);
386
387	revert_creds(save_cred);
388}
389
390static int
391nfs_do_local_read(struct nfs_pgio_header *hdr,
392		  struct nfsd_file *localio,
393		  const struct rpc_call_ops *call_ops)
394{
395	struct nfs_local_kiocb *iocb;
396	struct file *file = nfs_to->nfsd_file_file(localio);
397
398	/* Don't support filesystems without read_iter */
399	if (!file->f_op->read_iter)
400		return -EAGAIN;
401
402	dprintk("%s: vfs_read count=%u pos=%llu\n",
403		__func__, hdr->args.count, hdr->args.offset);
404
405	iocb = nfs_local_iocb_alloc(hdr, file, GFP_KERNEL);
406	if (iocb == NULL)
407		return -ENOMEM;
408	iocb->localio = localio;
409
410	nfs_local_pgio_init(hdr, call_ops);
411	hdr->res.eof = false;
412
413	INIT_WORK(&iocb->work, nfs_local_call_read);
414	queue_work(nfslocaliod_workqueue, &iocb->work);
415
416	return 0;
417}
418
419static void
420nfs_copy_boot_verifier(struct nfs_write_verifier *verifier, struct inode *inode)
421{
422	struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
423	u32 *verf = (u32 *)verifier->data;
424	int seq = 0;
425
426	do {
427		read_seqbegin_or_lock(&clp->cl_boot_lock, &seq);
428		verf[0] = (u32)clp->cl_nfssvc_boot.tv_sec;
429		verf[1] = (u32)clp->cl_nfssvc_boot.tv_nsec;
430	} while (need_seqretry(&clp->cl_boot_lock, seq));
431	done_seqretry(&clp->cl_boot_lock, seq);
432}
433
434static void
435nfs_reset_boot_verifier(struct inode *inode)
436{
437	struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
438
439	write_seqlock(&clp->cl_boot_lock);
440	ktime_get_real_ts64(&clp->cl_nfssvc_boot);
441	write_sequnlock(&clp->cl_boot_lock);
442}
443
444static void
445nfs_set_local_verifier(struct inode *inode,
446		struct nfs_writeverf *verf,
447		enum nfs3_stable_how how)
448{
449	nfs_copy_boot_verifier(&verf->verifier, inode);
450	verf->committed = how;
451}
452
453/* Factored out from fs/nfsd/vfs.h:fh_getattr() */
454static int __vfs_getattr(struct path *p, struct kstat *stat, int version)
455{
456	u32 request_mask = STATX_BASIC_STATS;
457
458	if (version == 4)
459		request_mask |= (STATX_BTIME | STATX_CHANGE_COOKIE);
460	return vfs_getattr(p, stat, request_mask, AT_STATX_SYNC_AS_STAT);
461}
462
463/* Copied from fs/nfsd/nfsfh.c:nfsd4_change_attribute() */
464static u64 __nfsd4_change_attribute(const struct kstat *stat,
465				    const struct inode *inode)
466{
467	u64 chattr;
468
469	if (stat->result_mask & STATX_CHANGE_COOKIE) {
470		chattr = stat->change_cookie;
471		if (S_ISREG(inode->i_mode) &&
472		    !(stat->attributes & STATX_ATTR_CHANGE_MONOTONIC)) {
473			chattr += (u64)stat->ctime.tv_sec << 30;
474			chattr += stat->ctime.tv_nsec;
475		}
476	} else {
477		chattr = time_to_chattr(&stat->ctime);
478	}
479	return chattr;
480}
481
482static void nfs_local_vfs_getattr(struct nfs_local_kiocb *iocb)
483{
484	struct kstat stat;
485	struct file *filp = iocb->kiocb.ki_filp;
486	struct nfs_pgio_header *hdr = iocb->hdr;
487	struct nfs_fattr *fattr = hdr->res.fattr;
488	int version = NFS_PROTO(hdr->inode)->version;
489
490	if (unlikely(!fattr) || __vfs_getattr(&filp->f_path, &stat, version))
491		return;
492
493	fattr->valid = (NFS_ATTR_FATTR_FILEID |
494			NFS_ATTR_FATTR_CHANGE |
495			NFS_ATTR_FATTR_SIZE |
496			NFS_ATTR_FATTR_ATIME |
497			NFS_ATTR_FATTR_MTIME |
498			NFS_ATTR_FATTR_CTIME |
499			NFS_ATTR_FATTR_SPACE_USED);
500
501	fattr->fileid = stat.ino;
502	fattr->size = stat.size;
503	fattr->atime = stat.atime;
504	fattr->mtime = stat.mtime;
505	fattr->ctime = stat.ctime;
506	if (version == 4) {
507		fattr->change_attr =
508			__nfsd4_change_attribute(&stat, file_inode(filp));
509	} else
510		fattr->change_attr = nfs_timespec_to_change_attr(&fattr->ctime);
511	fattr->du.nfs3.used = stat.blocks << 9;
512}
513
514static void
515nfs_local_write_done(struct nfs_local_kiocb *iocb, long status)
516{
517	struct nfs_pgio_header *hdr = iocb->hdr;
518	struct inode *inode = hdr->inode;
519
520	dprintk("%s: wrote %ld bytes.\n", __func__, status > 0 ? status : 0);
521
522	/* Handle short writes as if they are ENOSPC */
523	if (status > 0 && status < hdr->args.count) {
524		hdr->mds_offset += status;
525		hdr->args.offset += status;
526		hdr->args.pgbase += status;
527		hdr->args.count -= status;
528		nfs_set_pgio_error(hdr, -ENOSPC, hdr->args.offset);
529		status = -ENOSPC;
530	}
531	if (status < 0)
532		nfs_reset_boot_verifier(inode);
533
534	nfs_local_pgio_done(hdr, status);
535}
536
537static void nfs_local_call_write(struct work_struct *work)
538{
539	struct nfs_local_kiocb *iocb =
540		container_of(work, struct nfs_local_kiocb, work);
541	struct file *filp = iocb->kiocb.ki_filp;
542	unsigned long old_flags = current->flags;
543	const struct cred *save_cred;
544	struct iov_iter iter;
545	ssize_t status;
546
547	current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
548	save_cred = override_creds(filp->f_cred);
549
550	nfs_local_iter_init(&iter, iocb, WRITE);
551
552	file_start_write(filp);
553	status = filp->f_op->write_iter(&iocb->kiocb, &iter);
554	file_end_write(filp);
555	WARN_ON_ONCE(status == -EIOCBQUEUED);
556
557	nfs_local_write_done(iocb, status);
558	nfs_local_vfs_getattr(iocb);
559	nfs_local_pgio_release(iocb);
560
561	revert_creds(save_cred);
562	current->flags = old_flags;
563}
564
565static int
566nfs_do_local_write(struct nfs_pgio_header *hdr,
567		   struct nfsd_file *localio,
568		   const struct rpc_call_ops *call_ops)
569{
570	struct nfs_local_kiocb *iocb;
571	struct file *file = nfs_to->nfsd_file_file(localio);
572
573	/* Don't support filesystems without write_iter */
574	if (!file->f_op->write_iter)
575		return -EAGAIN;
576
577	dprintk("%s: vfs_write count=%u pos=%llu %s\n",
578		__func__, hdr->args.count, hdr->args.offset,
579		(hdr->args.stable == NFS_UNSTABLE) ?  "unstable" : "stable");
580
581	iocb = nfs_local_iocb_alloc(hdr, file, GFP_NOIO);
582	if (iocb == NULL)
583		return -ENOMEM;
584	iocb->localio = localio;
585
586	switch (hdr->args.stable) {
587	default:
588		break;
589	case NFS_DATA_SYNC:
590		iocb->kiocb.ki_flags |= IOCB_DSYNC;
591		break;
592	case NFS_FILE_SYNC:
593		iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC;
594	}
595	nfs_local_pgio_init(hdr, call_ops);
596
597	nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable);
598
599	INIT_WORK(&iocb->work, nfs_local_call_write);
600	queue_work(nfslocaliod_workqueue, &iocb->work);
601
602	return 0;
603}
604
605int nfs_local_doio(struct nfs_client *clp, struct nfsd_file *localio,
606		   struct nfs_pgio_header *hdr,
607		   const struct rpc_call_ops *call_ops)
608{
609	int status = 0;
610
611	if (!hdr->args.count)
612		return 0;
613
614	switch (hdr->rw_mode) {
615	case FMODE_READ:
616		status = nfs_do_local_read(hdr, localio, call_ops);
617		break;
618	case FMODE_WRITE:
619		status = nfs_do_local_write(hdr, localio, call_ops);
620		break;
621	default:
622		dprintk("%s: invalid mode: %d\n", __func__,
623			hdr->rw_mode);
624		status = -EINVAL;
625	}
626
627	if (status != 0) {
628		if (status == -EAGAIN)
629			nfs_local_disable(clp);
630		nfs_to_nfsd_file_put_local(localio);
631		hdr->task.tk_status = status;
632		nfs_local_hdr_release(hdr, call_ops);
633	}
634	return status;
635}
636
637static void
638nfs_local_init_commit(struct nfs_commit_data *data,
639		const struct rpc_call_ops *call_ops)
640{
641	data->task.tk_ops = call_ops;
642}
643
644static int
645nfs_local_run_commit(struct file *filp, struct nfs_commit_data *data)
646{
647	loff_t start = data->args.offset;
648	loff_t end = LLONG_MAX;
649
650	if (data->args.count > 0) {
651		end = start + data->args.count - 1;
652		if (end < start)
653			end = LLONG_MAX;
654	}
655
656	dprintk("%s: commit %llu - %llu\n", __func__, start, end);
657	return vfs_fsync_range(filp, start, end, 0);
658}
659
660static void
661nfs_local_commit_done(struct nfs_commit_data *data, int status)
662{
663	if (status >= 0) {
664		nfs_set_local_verifier(data->inode,
665				data->res.verf,
666				NFS_FILE_SYNC);
667		data->res.op_status = NFS4_OK;
668		data->task.tk_status = 0;
669	} else {
670		nfs_reset_boot_verifier(data->inode);
671		data->res.op_status = nfs_localio_errno_to_nfs4_stat(status);
672		data->task.tk_status = status;
673	}
674}
675
676static void
677nfs_local_release_commit_data(struct nfsd_file *localio,
678		struct nfs_commit_data *data,
679		const struct rpc_call_ops *call_ops)
680{
681	nfs_to_nfsd_file_put_local(localio);
682	call_ops->rpc_call_done(&data->task, data);
683	call_ops->rpc_release(data);
684}
685
686static void
687nfs_local_fsync_ctx_free(struct nfs_local_fsync_ctx *ctx)
688{
689	nfs_local_release_commit_data(ctx->localio, ctx->data,
690				      ctx->data->task.tk_ops);
691	kfree(ctx);
692}
693
694static void
695nfs_local_fsync_work(struct work_struct *work)
696{
697	struct nfs_local_fsync_ctx *ctx;
698	int status;
699
700	ctx = container_of(work, struct nfs_local_fsync_ctx, work);
701
702	status = nfs_local_run_commit(nfs_to->nfsd_file_file(ctx->localio),
703				      ctx->data);
704	nfs_local_commit_done(ctx->data, status);
705	if (ctx->done != NULL)
706		complete(ctx->done);
707	nfs_local_fsync_ctx_free(ctx);
708}
709
710static struct nfs_local_fsync_ctx *
711nfs_local_fsync_ctx_alloc(struct nfs_commit_data *data,
712			  struct nfsd_file *localio, gfp_t flags)
713{
714	struct nfs_local_fsync_ctx *ctx = kmalloc(sizeof(*ctx), flags);
715
716	if (ctx != NULL) {
717		ctx->localio = localio;
718		ctx->data = data;
719		INIT_WORK(&ctx->work, nfs_local_fsync_work);
720		ctx->done = NULL;
721	}
722	return ctx;
723}
724
725int nfs_local_commit(struct nfsd_file *localio,
726		     struct nfs_commit_data *data,
727		     const struct rpc_call_ops *call_ops, int how)
728{
729	struct nfs_local_fsync_ctx *ctx;
730
731	ctx = nfs_local_fsync_ctx_alloc(data, localio, GFP_KERNEL);
732	if (!ctx) {
733		nfs_local_commit_done(data, -ENOMEM);
734		nfs_local_release_commit_data(localio, data, call_ops);
735		return -ENOMEM;
736	}
737
738	nfs_local_init_commit(data, call_ops);
739
740	if (how & FLUSH_SYNC) {
741		DECLARE_COMPLETION_ONSTACK(done);
742		ctx->done = &done;
743		queue_work(nfsiod_workqueue, &ctx->work);
744		wait_for_completion(&done);
745	} else
746		queue_work(nfsiod_workqueue, &ctx->work);
747
748	return 0;
749}