Linux Audio

Check our new training course

Loading...
v6.2
  1/******************************************************************************
  2 * xenbus_comms.c
  3 *
  4 * Low level code to talks to Xen Store: ringbuffer and event channel.
  5 *
  6 * Copyright (C) 2005 Rusty Russell, IBM Corporation
  7 *
  8 * This program is free software; you can redistribute it and/or
  9 * modify it under the terms of the GNU General Public License version 2
 10 * as published by the Free Software Foundation; or, when distributed
 11 * separately from the Linux kernel or incorporated into other
 12 * software packages, subject to the following license:
 13 *
 14 * Permission is hereby granted, free of charge, to any person obtaining a copy
 15 * of this source file (the "Software"), to deal in the Software without
 16 * restriction, including without limitation the rights to use, copy, modify,
 17 * merge, publish, distribute, sublicense, and/or sell copies of the Software,
 18 * and to permit persons to whom the Software is furnished to do so, subject to
 19 * the following conditions:
 20 *
 21 * The above copyright notice and this permission notice shall be included in
 22 * all copies or substantial portions of the Software.
 23 *
 24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 25 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 26 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 27 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 28 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 29 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 30 * IN THE SOFTWARE.
 31 */
 32
 33#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
 34
 35#include <linux/wait.h>
 36#include <linux/interrupt.h>
 37#include <linux/kthread.h>
 38#include <linux/sched.h>
 39#include <linux/err.h>
 40#include <xen/xenbus.h>
 41#include <asm/xen/hypervisor.h>
 42#include <xen/events.h>
 43#include <xen/page.h>
 44#include "xenbus.h"
 45
 46/* A list of replies. Currently only one will ever be outstanding. */
 47LIST_HEAD(xs_reply_list);
 48
 49/* A list of write requests. */
 50LIST_HEAD(xb_write_list);
 51DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
 52DEFINE_MUTEX(xb_write_mutex);
 53
 54/* Protect xenbus reader thread against save/restore. */
 55DEFINE_MUTEX(xs_response_mutex);
 56
 57static int xenbus_irq;
 58static struct task_struct *xenbus_task;
 59
 
 
 
 60static irqreturn_t wake_waiting(int irq, void *unused)
 61{
 
 
 
 
 
 62	wake_up(&xb_waitq);
 63	return IRQ_HANDLED;
 64}
 65
 66static int check_indexes(XENSTORE_RING_IDX cons, XENSTORE_RING_IDX prod)
 67{
 68	return ((prod - cons) <= XENSTORE_RING_SIZE);
 69}
 70
 71static void *get_output_chunk(XENSTORE_RING_IDX cons,
 72			      XENSTORE_RING_IDX prod,
 73			      char *buf, uint32_t *len)
 74{
 75	*len = XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod);
 76	if ((XENSTORE_RING_SIZE - (prod - cons)) < *len)
 77		*len = XENSTORE_RING_SIZE - (prod - cons);
 78	return buf + MASK_XENSTORE_IDX(prod);
 79}
 80
 81static const void *get_input_chunk(XENSTORE_RING_IDX cons,
 82				   XENSTORE_RING_IDX prod,
 83				   const char *buf, uint32_t *len)
 84{
 85	*len = XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(cons);
 86	if ((prod - cons) < *len)
 87		*len = prod - cons;
 88	return buf + MASK_XENSTORE_IDX(cons);
 89}
 90
 91static int xb_data_to_write(void)
 92{
 93	struct xenstore_domain_interface *intf = xen_store_interface;
 94
 95	return (intf->req_prod - intf->req_cons) != XENSTORE_RING_SIZE &&
 96		!list_empty(&xb_write_list);
 97}
 98
 99/**
100 * xb_write - low level write
101 * @data: buffer to send
102 * @len: length of buffer
103 *
104 * Returns number of bytes written or -err.
105 */
106static int xb_write(const void *data, unsigned int len)
107{
108	struct xenstore_domain_interface *intf = xen_store_interface;
109	XENSTORE_RING_IDX cons, prod;
110	unsigned int bytes = 0;
111
112	while (len != 0) {
113		void *dst;
114		unsigned int avail;
115
116		/* Read indexes, then verify. */
117		cons = intf->req_cons;
118		prod = intf->req_prod;
119		if (!check_indexes(cons, prod)) {
120			intf->req_cons = intf->req_prod = 0;
121			return -EIO;
122		}
123		if (!xb_data_to_write())
124			return bytes;
125
126		/* Must write data /after/ reading the consumer index. */
127		virt_mb();
128
129		dst = get_output_chunk(cons, prod, intf->req, &avail);
130		if (avail == 0)
131			continue;
132		if (avail > len)
133			avail = len;
134
135		memcpy(dst, data, avail);
136		data += avail;
137		len -= avail;
138		bytes += avail;
139
140		/* Other side must not see new producer until data is there. */
141		virt_wmb();
142		intf->req_prod += avail;
143
144		/* Implies mb(): other side will see the updated producer. */
145		if (prod <= intf->req_cons)
146			notify_remote_via_evtchn(xen_store_evtchn);
147	}
148
149	return bytes;
150}
151
152static int xb_data_to_read(void)
153{
154	struct xenstore_domain_interface *intf = xen_store_interface;
155	return (intf->rsp_cons != intf->rsp_prod);
156}
157
158static int xb_read(void *data, unsigned int len)
159{
160	struct xenstore_domain_interface *intf = xen_store_interface;
161	XENSTORE_RING_IDX cons, prod;
162	unsigned int bytes = 0;
163
164	while (len != 0) {
165		unsigned int avail;
166		const char *src;
167
168		/* Read indexes, then verify. */
169		cons = intf->rsp_cons;
170		prod = intf->rsp_prod;
171		if (cons == prod)
172			return bytes;
173
174		if (!check_indexes(cons, prod)) {
175			intf->rsp_cons = intf->rsp_prod = 0;
176			return -EIO;
177		}
178
179		src = get_input_chunk(cons, prod, intf->rsp, &avail);
180		if (avail == 0)
181			continue;
182		if (avail > len)
183			avail = len;
184
185		/* Must read data /after/ reading the producer index. */
186		virt_rmb();
187
188		memcpy(data, src, avail);
189		data += avail;
190		len -= avail;
191		bytes += avail;
192
193		/* Other side must not see free space until we've copied out */
194		virt_mb();
195		intf->rsp_cons += avail;
196
197		/* Implies mb(): other side will see the updated consumer. */
198		if (intf->rsp_prod - cons >= XENSTORE_RING_SIZE)
199			notify_remote_via_evtchn(xen_store_evtchn);
200	}
201
202	return bytes;
203}
204
205static int process_msg(void)
206{
207	static struct {
208		struct xsd_sockmsg msg;
209		char *body;
210		union {
211			void *alloc;
212			struct xs_watch_event *watch;
213		};
214		bool in_msg;
215		bool in_hdr;
216		unsigned int read;
217	} state;
218	struct xb_req_data *req;
219	int err;
220	unsigned int len;
221
222	if (!state.in_msg) {
223		state.in_msg = true;
224		state.in_hdr = true;
225		state.read = 0;
226
227		/*
228		 * We must disallow save/restore while reading a message.
229		 * A partial read across s/r leaves us out of sync with
230		 * xenstored.
231		 * xs_response_mutex is locked as long as we are processing one
232		 * message. state.in_msg will be true as long as we are holding
233		 * the lock here.
234		 */
235		mutex_lock(&xs_response_mutex);
236
237		if (!xb_data_to_read()) {
238			/* We raced with save/restore: pending data 'gone'. */
239			mutex_unlock(&xs_response_mutex);
240			state.in_msg = false;
241			return 0;
242		}
243	}
244
245	if (state.in_hdr) {
246		if (state.read != sizeof(state.msg)) {
247			err = xb_read((void *)&state.msg + state.read,
248				      sizeof(state.msg) - state.read);
249			if (err < 0)
250				goto out;
251			state.read += err;
252			if (state.read != sizeof(state.msg))
253				return 0;
254			if (state.msg.len > XENSTORE_PAYLOAD_MAX) {
255				err = -EINVAL;
256				goto out;
257			}
258		}
259
260		len = state.msg.len + 1;
261		if (state.msg.type == XS_WATCH_EVENT)
262			len += sizeof(*state.watch);
263
264		state.alloc = kmalloc(len, GFP_NOIO | __GFP_HIGH);
265		if (!state.alloc)
266			return -ENOMEM;
267
268		if (state.msg.type == XS_WATCH_EVENT)
269			state.body = state.watch->body;
270		else
271			state.body = state.alloc;
272		state.in_hdr = false;
273		state.read = 0;
274	}
275
276	err = xb_read(state.body + state.read, state.msg.len - state.read);
277	if (err < 0)
278		goto out;
279
280	state.read += err;
281	if (state.read != state.msg.len)
282		return 0;
283
284	state.body[state.msg.len] = '\0';
285
286	if (state.msg.type == XS_WATCH_EVENT) {
287		state.watch->len = state.msg.len;
288		err = xs_watch_msg(state.watch);
289	} else {
290		err = -ENOENT;
291		mutex_lock(&xb_write_mutex);
292		list_for_each_entry(req, &xs_reply_list, list) {
293			if (req->msg.req_id == state.msg.req_id) {
294				list_del(&req->list);
295				err = 0;
296				break;
297			}
298		}
299		mutex_unlock(&xb_write_mutex);
300		if (err)
301			goto out;
302
303		if (req->state == xb_req_state_wait_reply) {
304			req->msg.req_id = req->caller_req_id;
305			req->msg.type = state.msg.type;
306			req->msg.len = state.msg.len;
307			req->body = state.body;
308			/* write body, then update state */
309			virt_wmb();
310			req->state = xb_req_state_got_reply;
311			req->cb(req);
312		} else
313			kfree(req);
314	}
315
316	mutex_unlock(&xs_response_mutex);
317
318	state.in_msg = false;
319	state.alloc = NULL;
320	return err;
321
322 out:
323	mutex_unlock(&xs_response_mutex);
324	state.in_msg = false;
325	kfree(state.alloc);
326	state.alloc = NULL;
327	return err;
328}
329
330static int process_writes(void)
331{
332	static struct {
333		struct xb_req_data *req;
334		int idx;
335		unsigned int written;
336	} state;
337	void *base;
338	unsigned int len;
339	int err = 0;
340
341	if (!xb_data_to_write())
342		return 0;
343
344	mutex_lock(&xb_write_mutex);
345
346	if (!state.req) {
347		state.req = list_first_entry(&xb_write_list,
348					     struct xb_req_data, list);
349		state.idx = -1;
350		state.written = 0;
351	}
352
353	if (state.req->state == xb_req_state_aborted)
354		goto out_err;
355
356	while (state.idx < state.req->num_vecs) {
357		if (state.idx < 0) {
358			base = &state.req->msg;
359			len = sizeof(state.req->msg);
360		} else {
361			base = state.req->vec[state.idx].iov_base;
362			len = state.req->vec[state.idx].iov_len;
363		}
364		err = xb_write(base + state.written, len - state.written);
365		if (err < 0)
366			goto out_err;
367		state.written += err;
368		if (state.written != len)
369			goto out;
370
371		state.idx++;
372		state.written = 0;
373	}
374
375	list_del(&state.req->list);
376	state.req->state = xb_req_state_wait_reply;
377	list_add_tail(&state.req->list, &xs_reply_list);
378	state.req = NULL;
379
380 out:
381	mutex_unlock(&xb_write_mutex);
382
383	return 0;
384
385 out_err:
386	state.req->msg.type = XS_ERROR;
387	state.req->err = err;
388	list_del(&state.req->list);
389	if (state.req->state == xb_req_state_aborted)
390		kfree(state.req);
391	else {
392		/* write err, then update state */
393		virt_wmb();
394		state.req->state = xb_req_state_got_reply;
395		wake_up(&state.req->wq);
396	}
397
398	mutex_unlock(&xb_write_mutex);
399
400	state.req = NULL;
401
402	return err;
403}
404
405static int xb_thread_work(void)
406{
407	return xb_data_to_read() || xb_data_to_write();
408}
409
410static int xenbus_thread(void *unused)
411{
412	int err;
413
414	while (!kthread_should_stop()) {
415		if (wait_event_interruptible(xb_waitq, xb_thread_work()))
416			continue;
417
418		err = process_msg();
419		if (err == -ENOMEM)
420			schedule();
421		else if (err)
422			pr_warn_ratelimited("error %d while reading message\n",
423					    err);
424
425		err = process_writes();
426		if (err)
427			pr_warn_ratelimited("error %d while writing message\n",
428					    err);
429	}
430
431	xenbus_task = NULL;
432	return 0;
433}
434
435/**
436 * xb_init_comms - Set up interrupt handler off store event channel.
437 */
438int xb_init_comms(void)
439{
440	struct xenstore_domain_interface *intf = xen_store_interface;
441
442	if (intf->req_prod != intf->req_cons)
443		pr_err("request ring is not quiescent (%08x:%08x)!\n",
444		       intf->req_cons, intf->req_prod);
445
446	if (intf->rsp_prod != intf->rsp_cons) {
447		pr_warn("response ring is not quiescent (%08x:%08x): fixing up\n",
448			intf->rsp_cons, intf->rsp_prod);
449		/* breaks kdump */
450		if (!reset_devices)
451			intf->rsp_cons = intf->rsp_prod;
452	}
453
454	if (xenbus_irq) {
455		/* Already have an irq; assume we're resuming */
456		rebind_evtchn_irq(xen_store_evtchn, xenbus_irq);
457	} else {
458		int err;
459
460		err = bind_evtchn_to_irqhandler(xen_store_evtchn, wake_waiting,
461						0, "xenbus", &xb_waitq);
462		if (err < 0) {
463			pr_err("request irq failed %i\n", err);
464			return err;
465		}
466
467		xenbus_irq = err;
468
469		if (!xenbus_task) {
470			xenbus_task = kthread_run(xenbus_thread, NULL,
471						  "xenbus");
472			if (IS_ERR(xenbus_task))
473				return PTR_ERR(xenbus_task);
474		}
475	}
476
477	return 0;
478}
479
480void xb_deinit_comms(void)
481{
482	unbind_from_irqhandler(xenbus_irq, &xb_waitq);
483	xenbus_irq = 0;
484}
v5.9
  1/******************************************************************************
  2 * xenbus_comms.c
  3 *
  4 * Low level code to talks to Xen Store: ringbuffer and event channel.
  5 *
  6 * Copyright (C) 2005 Rusty Russell, IBM Corporation
  7 *
  8 * This program is free software; you can redistribute it and/or
  9 * modify it under the terms of the GNU General Public License version 2
 10 * as published by the Free Software Foundation; or, when distributed
 11 * separately from the Linux kernel or incorporated into other
 12 * software packages, subject to the following license:
 13 *
 14 * Permission is hereby granted, free of charge, to any person obtaining a copy
 15 * of this source file (the "Software"), to deal in the Software without
 16 * restriction, including without limitation the rights to use, copy, modify,
 17 * merge, publish, distribute, sublicense, and/or sell copies of the Software,
 18 * and to permit persons to whom the Software is furnished to do so, subject to
 19 * the following conditions:
 20 *
 21 * The above copyright notice and this permission notice shall be included in
 22 * all copies or substantial portions of the Software.
 23 *
 24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 25 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 26 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 27 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 28 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 29 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 30 * IN THE SOFTWARE.
 31 */
 32
 33#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
 34
 35#include <linux/wait.h>
 36#include <linux/interrupt.h>
 37#include <linux/kthread.h>
 38#include <linux/sched.h>
 39#include <linux/err.h>
 40#include <xen/xenbus.h>
 41#include <asm/xen/hypervisor.h>
 42#include <xen/events.h>
 43#include <xen/page.h>
 44#include "xenbus.h"
 45
 46/* A list of replies. Currently only one will ever be outstanding. */
 47LIST_HEAD(xs_reply_list);
 48
 49/* A list of write requests. */
 50LIST_HEAD(xb_write_list);
 51DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
 52DEFINE_MUTEX(xb_write_mutex);
 53
 54/* Protect xenbus reader thread against save/restore. */
 55DEFINE_MUTEX(xs_response_mutex);
 56
 57static int xenbus_irq;
 58static struct task_struct *xenbus_task;
 59
 60static DECLARE_WORK(probe_work, xenbus_probe);
 61
 62
 63static irqreturn_t wake_waiting(int irq, void *unused)
 64{
 65	if (unlikely(xenstored_ready == 0)) {
 66		xenstored_ready = 1;
 67		schedule_work(&probe_work);
 68	}
 69
 70	wake_up(&xb_waitq);
 71	return IRQ_HANDLED;
 72}
 73
 74static int check_indexes(XENSTORE_RING_IDX cons, XENSTORE_RING_IDX prod)
 75{
 76	return ((prod - cons) <= XENSTORE_RING_SIZE);
 77}
 78
 79static void *get_output_chunk(XENSTORE_RING_IDX cons,
 80			      XENSTORE_RING_IDX prod,
 81			      char *buf, uint32_t *len)
 82{
 83	*len = XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod);
 84	if ((XENSTORE_RING_SIZE - (prod - cons)) < *len)
 85		*len = XENSTORE_RING_SIZE - (prod - cons);
 86	return buf + MASK_XENSTORE_IDX(prod);
 87}
 88
 89static const void *get_input_chunk(XENSTORE_RING_IDX cons,
 90				   XENSTORE_RING_IDX prod,
 91				   const char *buf, uint32_t *len)
 92{
 93	*len = XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(cons);
 94	if ((prod - cons) < *len)
 95		*len = prod - cons;
 96	return buf + MASK_XENSTORE_IDX(cons);
 97}
 98
 99static int xb_data_to_write(void)
100{
101	struct xenstore_domain_interface *intf = xen_store_interface;
102
103	return (intf->req_prod - intf->req_cons) != XENSTORE_RING_SIZE &&
104		!list_empty(&xb_write_list);
105}
106
107/**
108 * xb_write - low level write
109 * @data: buffer to send
110 * @len: length of buffer
111 *
112 * Returns number of bytes written or -err.
113 */
114static int xb_write(const void *data, unsigned int len)
115{
116	struct xenstore_domain_interface *intf = xen_store_interface;
117	XENSTORE_RING_IDX cons, prod;
118	unsigned int bytes = 0;
119
120	while (len != 0) {
121		void *dst;
122		unsigned int avail;
123
124		/* Read indexes, then verify. */
125		cons = intf->req_cons;
126		prod = intf->req_prod;
127		if (!check_indexes(cons, prod)) {
128			intf->req_cons = intf->req_prod = 0;
129			return -EIO;
130		}
131		if (!xb_data_to_write())
132			return bytes;
133
134		/* Must write data /after/ reading the consumer index. */
135		virt_mb();
136
137		dst = get_output_chunk(cons, prod, intf->req, &avail);
138		if (avail == 0)
139			continue;
140		if (avail > len)
141			avail = len;
142
143		memcpy(dst, data, avail);
144		data += avail;
145		len -= avail;
146		bytes += avail;
147
148		/* Other side must not see new producer until data is there. */
149		virt_wmb();
150		intf->req_prod += avail;
151
152		/* Implies mb(): other side will see the updated producer. */
153		if (prod <= intf->req_cons)
154			notify_remote_via_evtchn(xen_store_evtchn);
155	}
156
157	return bytes;
158}
159
160static int xb_data_to_read(void)
161{
162	struct xenstore_domain_interface *intf = xen_store_interface;
163	return (intf->rsp_cons != intf->rsp_prod);
164}
165
166static int xb_read(void *data, unsigned int len)
167{
168	struct xenstore_domain_interface *intf = xen_store_interface;
169	XENSTORE_RING_IDX cons, prod;
170	unsigned int bytes = 0;
171
172	while (len != 0) {
173		unsigned int avail;
174		const char *src;
175
176		/* Read indexes, then verify. */
177		cons = intf->rsp_cons;
178		prod = intf->rsp_prod;
179		if (cons == prod)
180			return bytes;
181
182		if (!check_indexes(cons, prod)) {
183			intf->rsp_cons = intf->rsp_prod = 0;
184			return -EIO;
185		}
186
187		src = get_input_chunk(cons, prod, intf->rsp, &avail);
188		if (avail == 0)
189			continue;
190		if (avail > len)
191			avail = len;
192
193		/* Must read data /after/ reading the producer index. */
194		virt_rmb();
195
196		memcpy(data, src, avail);
197		data += avail;
198		len -= avail;
199		bytes += avail;
200
201		/* Other side must not see free space until we've copied out */
202		virt_mb();
203		intf->rsp_cons += avail;
204
205		/* Implies mb(): other side will see the updated consumer. */
206		if (intf->rsp_prod - cons >= XENSTORE_RING_SIZE)
207			notify_remote_via_evtchn(xen_store_evtchn);
208	}
209
210	return bytes;
211}
212
213static int process_msg(void)
214{
215	static struct {
216		struct xsd_sockmsg msg;
217		char *body;
218		union {
219			void *alloc;
220			struct xs_watch_event *watch;
221		};
222		bool in_msg;
223		bool in_hdr;
224		unsigned int read;
225	} state;
226	struct xb_req_data *req;
227	int err;
228	unsigned int len;
229
230	if (!state.in_msg) {
231		state.in_msg = true;
232		state.in_hdr = true;
233		state.read = 0;
234
235		/*
236		 * We must disallow save/restore while reading a message.
237		 * A partial read across s/r leaves us out of sync with
238		 * xenstored.
239		 * xs_response_mutex is locked as long as we are processing one
240		 * message. state.in_msg will be true as long as we are holding
241		 * the lock here.
242		 */
243		mutex_lock(&xs_response_mutex);
244
245		if (!xb_data_to_read()) {
246			/* We raced with save/restore: pending data 'gone'. */
247			mutex_unlock(&xs_response_mutex);
248			state.in_msg = false;
249			return 0;
250		}
251	}
252
253	if (state.in_hdr) {
254		if (state.read != sizeof(state.msg)) {
255			err = xb_read((void *)&state.msg + state.read,
256				      sizeof(state.msg) - state.read);
257			if (err < 0)
258				goto out;
259			state.read += err;
260			if (state.read != sizeof(state.msg))
261				return 0;
262			if (state.msg.len > XENSTORE_PAYLOAD_MAX) {
263				err = -EINVAL;
264				goto out;
265			}
266		}
267
268		len = state.msg.len + 1;
269		if (state.msg.type == XS_WATCH_EVENT)
270			len += sizeof(*state.watch);
271
272		state.alloc = kmalloc(len, GFP_NOIO | __GFP_HIGH);
273		if (!state.alloc)
274			return -ENOMEM;
275
276		if (state.msg.type == XS_WATCH_EVENT)
277			state.body = state.watch->body;
278		else
279			state.body = state.alloc;
280		state.in_hdr = false;
281		state.read = 0;
282	}
283
284	err = xb_read(state.body + state.read, state.msg.len - state.read);
285	if (err < 0)
286		goto out;
287
288	state.read += err;
289	if (state.read != state.msg.len)
290		return 0;
291
292	state.body[state.msg.len] = '\0';
293
294	if (state.msg.type == XS_WATCH_EVENT) {
295		state.watch->len = state.msg.len;
296		err = xs_watch_msg(state.watch);
297	} else {
298		err = -ENOENT;
299		mutex_lock(&xb_write_mutex);
300		list_for_each_entry(req, &xs_reply_list, list) {
301			if (req->msg.req_id == state.msg.req_id) {
302				list_del(&req->list);
303				err = 0;
304				break;
305			}
306		}
307		mutex_unlock(&xb_write_mutex);
308		if (err)
309			goto out;
310
311		if (req->state == xb_req_state_wait_reply) {
312			req->msg.req_id = req->caller_req_id;
313			req->msg.type = state.msg.type;
314			req->msg.len = state.msg.len;
315			req->body = state.body;
316			/* write body, then update state */
317			virt_wmb();
318			req->state = xb_req_state_got_reply;
319			req->cb(req);
320		} else
321			kfree(req);
322	}
323
324	mutex_unlock(&xs_response_mutex);
325
326	state.in_msg = false;
327	state.alloc = NULL;
328	return err;
329
330 out:
331	mutex_unlock(&xs_response_mutex);
332	state.in_msg = false;
333	kfree(state.alloc);
334	state.alloc = NULL;
335	return err;
336}
337
338static int process_writes(void)
339{
340	static struct {
341		struct xb_req_data *req;
342		int idx;
343		unsigned int written;
344	} state;
345	void *base;
346	unsigned int len;
347	int err = 0;
348
349	if (!xb_data_to_write())
350		return 0;
351
352	mutex_lock(&xb_write_mutex);
353
354	if (!state.req) {
355		state.req = list_first_entry(&xb_write_list,
356					     struct xb_req_data, list);
357		state.idx = -1;
358		state.written = 0;
359	}
360
361	if (state.req->state == xb_req_state_aborted)
362		goto out_err;
363
364	while (state.idx < state.req->num_vecs) {
365		if (state.idx < 0) {
366			base = &state.req->msg;
367			len = sizeof(state.req->msg);
368		} else {
369			base = state.req->vec[state.idx].iov_base;
370			len = state.req->vec[state.idx].iov_len;
371		}
372		err = xb_write(base + state.written, len - state.written);
373		if (err < 0)
374			goto out_err;
375		state.written += err;
376		if (state.written != len)
377			goto out;
378
379		state.idx++;
380		state.written = 0;
381	}
382
383	list_del(&state.req->list);
384	state.req->state = xb_req_state_wait_reply;
385	list_add_tail(&state.req->list, &xs_reply_list);
386	state.req = NULL;
387
388 out:
389	mutex_unlock(&xb_write_mutex);
390
391	return 0;
392
393 out_err:
394	state.req->msg.type = XS_ERROR;
395	state.req->err = err;
396	list_del(&state.req->list);
397	if (state.req->state == xb_req_state_aborted)
398		kfree(state.req);
399	else {
400		/* write err, then update state */
401		virt_wmb();
402		state.req->state = xb_req_state_got_reply;
403		wake_up(&state.req->wq);
404	}
405
406	mutex_unlock(&xb_write_mutex);
407
408	state.req = NULL;
409
410	return err;
411}
412
413static int xb_thread_work(void)
414{
415	return xb_data_to_read() || xb_data_to_write();
416}
417
418static int xenbus_thread(void *unused)
419{
420	int err;
421
422	while (!kthread_should_stop()) {
423		if (wait_event_interruptible(xb_waitq, xb_thread_work()))
424			continue;
425
426		err = process_msg();
427		if (err == -ENOMEM)
428			schedule();
429		else if (err)
430			pr_warn_ratelimited("error %d while reading message\n",
431					    err);
432
433		err = process_writes();
434		if (err)
435			pr_warn_ratelimited("error %d while writing message\n",
436					    err);
437	}
438
439	xenbus_task = NULL;
440	return 0;
441}
442
443/**
444 * xb_init_comms - Set up interrupt handler off store event channel.
445 */
446int xb_init_comms(void)
447{
448	struct xenstore_domain_interface *intf = xen_store_interface;
449
450	if (intf->req_prod != intf->req_cons)
451		pr_err("request ring is not quiescent (%08x:%08x)!\n",
452		       intf->req_cons, intf->req_prod);
453
454	if (intf->rsp_prod != intf->rsp_cons) {
455		pr_warn("response ring is not quiescent (%08x:%08x): fixing up\n",
456			intf->rsp_cons, intf->rsp_prod);
457		/* breaks kdump */
458		if (!reset_devices)
459			intf->rsp_cons = intf->rsp_prod;
460	}
461
462	if (xenbus_irq) {
463		/* Already have an irq; assume we're resuming */
464		rebind_evtchn_irq(xen_store_evtchn, xenbus_irq);
465	} else {
466		int err;
467
468		err = bind_evtchn_to_irqhandler(xen_store_evtchn, wake_waiting,
469						0, "xenbus", &xb_waitq);
470		if (err < 0) {
471			pr_err("request irq failed %i\n", err);
472			return err;
473		}
474
475		xenbus_irq = err;
476
477		if (!xenbus_task) {
478			xenbus_task = kthread_run(xenbus_thread, NULL,
479						  "xenbus");
480			if (IS_ERR(xenbus_task))
481				return PTR_ERR(xenbus_task);
482		}
483	}
484
485	return 0;
486}
487
488void xb_deinit_comms(void)
489{
490	unbind_from_irqhandler(xenbus_irq, &xb_waitq);
491	xenbus_irq = 0;
492}