Linux Audio

Check our new training course

Loading...
v6.13.7
  1// SPDX-License-Identifier: GPL-2.0-or-later
  2/* RxRPC Tx data buffering.
  3 *
  4 * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
  5 * Written by David Howells (dhowells@redhat.com)
  6 */
  7
  8#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  9
 10#include <linux/slab.h>
 11#include "ar-internal.h"
 12
 13static atomic_t rxrpc_txbuf_debug_ids;
 14atomic_t rxrpc_nr_txbuf;
 15
 16/*
 17 * Allocate and partially initialise a data transmission buffer.
 18 */
 19struct rxrpc_txbuf *rxrpc_alloc_data_txbuf(struct rxrpc_call *call, size_t data_size,
 20					   size_t data_align, gfp_t gfp)
 21{
 22	struct rxrpc_wire_header *whdr;
 23	struct rxrpc_txbuf *txb;
 24	size_t total, hoff;
 25	void *buf;
 26
 27	txb = kmalloc(sizeof(*txb), gfp);
 28	if (!txb)
 29		return NULL;
 30
 31	hoff = round_up(sizeof(*whdr), data_align) - sizeof(*whdr);
 32	total = hoff + sizeof(*whdr) + data_size;
 33
 34	data_align = umax(data_align, L1_CACHE_BYTES);
 35	mutex_lock(&call->conn->tx_data_alloc_lock);
 36	buf = page_frag_alloc_align(&call->conn->tx_data_alloc, total, gfp,
 37				    data_align);
 38	mutex_unlock(&call->conn->tx_data_alloc_lock);
 39	if (!buf) {
 40		kfree(txb);
 41		return NULL;
 42	}
 43
 44	whdr = buf + hoff;
 45
 46	INIT_LIST_HEAD(&txb->call_link);
 47	INIT_LIST_HEAD(&txb->tx_link);
 48	refcount_set(&txb->ref, 1);
 49	txb->last_sent		= KTIME_MIN;
 50	txb->call_debug_id	= call->debug_id;
 51	txb->debug_id		= atomic_inc_return(&rxrpc_txbuf_debug_ids);
 52	txb->space		= data_size;
 53	txb->len		= 0;
 54	txb->offset		= sizeof(*whdr);
 55	txb->flags		= call->conn->out_clientflag;
 56	txb->ack_why		= 0;
 57	txb->seq		= call->tx_prepared + 1;
 58	txb->serial		= 0;
 59	txb->cksum		= 0;
 60	txb->nr_kvec		= 1;
 61	txb->kvec[0].iov_base	= whdr;
 62	txb->kvec[0].iov_len	= sizeof(*whdr);
 63
 64	whdr->epoch		= htonl(call->conn->proto.epoch);
 65	whdr->cid		= htonl(call->cid);
 66	whdr->callNumber	= htonl(call->call_id);
 67	whdr->seq		= htonl(txb->seq);
 68	whdr->type		= RXRPC_PACKET_TYPE_DATA;
 69	whdr->flags		= 0;
 70	whdr->userStatus	= 0;
 71	whdr->securityIndex	= call->security_ix;
 72	whdr->_rsvd		= 0;
 73	whdr->serviceId		= htons(call->dest_srx.srx_service);
 74
 75	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 1,
 76			  rxrpc_txbuf_alloc_data);
 77
 78	atomic_inc(&rxrpc_nr_txbuf);
 79	return txb;
 80}
 81
 82/*
 83 * Allocate and partially initialise an ACK packet.
 84 */
 85struct rxrpc_txbuf *rxrpc_alloc_ack_txbuf(struct rxrpc_call *call, size_t sack_size)
 86{
 87	struct rxrpc_wire_header *whdr;
 88	struct rxrpc_acktrailer *trailer;
 89	struct rxrpc_ackpacket *ack;
 90	struct rxrpc_txbuf *txb;
 91	gfp_t gfp = rcu_read_lock_held() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS;
 92	void *buf, *buf2 = NULL;
 93	u8 *filler;
 94
 95	txb = kmalloc(sizeof(*txb), gfp);
 96	if (!txb)
 97		return NULL;
 98
 99	buf = page_frag_alloc(&call->local->tx_alloc,
100			      sizeof(*whdr) + sizeof(*ack) + 1 + 3 + sizeof(*trailer), gfp);
101	if (!buf) {
102		kfree(txb);
103		return NULL;
104	}
105
106	if (sack_size) {
107		buf2 = page_frag_alloc(&call->local->tx_alloc, sack_size, gfp);
108		if (!buf2) {
109			page_frag_free(buf);
110			kfree(txb);
111			return NULL;
112		}
113	}
114
115	whdr	= buf;
116	ack	= buf + sizeof(*whdr);
117	filler	= buf + sizeof(*whdr) + sizeof(*ack) + 1;
118	trailer	= buf + sizeof(*whdr) + sizeof(*ack) + 1 + 3;
119
120	INIT_LIST_HEAD(&txb->call_link);
121	INIT_LIST_HEAD(&txb->tx_link);
122	refcount_set(&txb->ref, 1);
123	txb->call_debug_id	= call->debug_id;
124	txb->debug_id		= atomic_inc_return(&rxrpc_txbuf_debug_ids);
125	txb->space		= 0;
126	txb->len		= sizeof(*whdr) + sizeof(*ack) + 3 + sizeof(*trailer);
127	txb->offset		= 0;
128	txb->flags		= call->conn->out_clientflag;
129	txb->ack_rwind		= 0;
130	txb->seq		= 0;
131	txb->serial		= 0;
132	txb->cksum		= 0;
133	txb->nr_kvec		= 3;
134	txb->kvec[0].iov_base	= whdr;
135	txb->kvec[0].iov_len	= sizeof(*whdr) + sizeof(*ack);
136	txb->kvec[1].iov_base	= buf2;
137	txb->kvec[1].iov_len	= sack_size;
138	txb->kvec[2].iov_base	= filler;
139	txb->kvec[2].iov_len	= 3 + sizeof(*trailer);
140
141	whdr->epoch		= htonl(call->conn->proto.epoch);
142	whdr->cid		= htonl(call->cid);
143	whdr->callNumber	= htonl(call->call_id);
144	whdr->seq		= 0;
145	whdr->type		= RXRPC_PACKET_TYPE_ACK;
146	whdr->flags		= 0;
147	whdr->userStatus	= 0;
148	whdr->securityIndex	= call->security_ix;
149	whdr->_rsvd		= 0;
150	whdr->serviceId		= htons(call->dest_srx.srx_service);
151
152	get_page(virt_to_head_page(trailer));
153
154	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 1,
155			  rxrpc_txbuf_alloc_ack);
156	atomic_inc(&rxrpc_nr_txbuf);
157	return txb;
158}
159
160void rxrpc_get_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
161{
162	int r;
163
164	__refcount_inc(&txb->ref, &r);
165	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r + 1, what);
166}
167
168void rxrpc_see_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
169{
170	int r = refcount_read(&txb->ref);
171
172	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r, what);
173}
174
175static void rxrpc_free_txbuf(struct rxrpc_txbuf *txb)
176{
177	int i;
178
179	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 0,
180			  rxrpc_txbuf_free);
181	for (i = 0; i < txb->nr_kvec; i++)
182		if (txb->kvec[i].iov_base)
183			page_frag_free(txb->kvec[i].iov_base);
184	kfree(txb);
185	atomic_dec(&rxrpc_nr_txbuf);
186}
187
188void rxrpc_put_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
189{
190	unsigned int debug_id, call_debug_id;
191	rxrpc_seq_t seq;
192	bool dead;
193	int r;
194
195	if (txb) {
196		debug_id = txb->debug_id;
197		call_debug_id = txb->call_debug_id;
198		seq = txb->seq;
199		dead = __refcount_dec_and_test(&txb->ref, &r);
200		trace_rxrpc_txbuf(debug_id, call_debug_id, seq, r - 1, what);
201		if (dead)
202			rxrpc_free_txbuf(txb);
203	}
204}
205
206/*
207 * Shrink the transmit buffer.
208 */
209void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
210{
211	struct rxrpc_txbuf *txb;
212	rxrpc_seq_t hard_ack = smp_load_acquire(&call->acks_hard_ack);
213	bool wake = false;
214
215	_enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top);
216
217	while ((txb = list_first_entry_or_null(&call->tx_buffer,
218					       struct rxrpc_txbuf, call_link))) {
219		hard_ack = smp_load_acquire(&call->acks_hard_ack);
220		if (before(hard_ack, txb->seq))
221			break;
222
223		if (txb->seq != call->tx_bottom + 1)
224			rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step);
225		ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
226		smp_store_release(&call->tx_bottom, call->tx_bottom + 1);
227		list_del_rcu(&txb->call_link);
228
229		trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue);
230
231		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated);
232		if (after(call->acks_hard_ack, call->tx_bottom + 128))
233			wake = true;
234	}
235
236	if (wake)
237		wake_up(&call->waitq);
238}
v6.8
  1// SPDX-License-Identifier: GPL-2.0-or-later
  2/* RxRPC Tx data buffering.
  3 *
  4 * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
  5 * Written by David Howells (dhowells@redhat.com)
  6 */
  7
  8#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  9
 10#include <linux/slab.h>
 11#include "ar-internal.h"
 12
 13static atomic_t rxrpc_txbuf_debug_ids;
 14atomic_t rxrpc_nr_txbuf;
 15
 16/*
 17 * Allocate and partially initialise an I/O request structure.
 18 */
 19struct rxrpc_txbuf *rxrpc_alloc_txbuf(struct rxrpc_call *call, u8 packet_type,
 20				      gfp_t gfp)
 21{
 
 22	struct rxrpc_txbuf *txb;
 
 
 23
 24	txb = kmalloc(sizeof(*txb), gfp);
 25	if (txb) {
 26		INIT_LIST_HEAD(&txb->call_link);
 27		INIT_LIST_HEAD(&txb->tx_link);
 28		refcount_set(&txb->ref, 1);
 29		txb->call_debug_id	= call->debug_id;
 30		txb->debug_id		= atomic_inc_return(&rxrpc_txbuf_debug_ids);
 31		txb->space		= sizeof(txb->data);
 32		txb->len		= 0;
 33		txb->offset		= 0;
 34		txb->flags		= 0;
 35		txb->ack_why		= 0;
 36		txb->seq		= call->tx_prepared + 1;
 37		txb->wire.epoch		= htonl(call->conn->proto.epoch);
 38		txb->wire.cid		= htonl(call->cid);
 39		txb->wire.callNumber	= htonl(call->call_id);
 40		txb->wire.seq		= htonl(txb->seq);
 41		txb->wire.type		= packet_type;
 42		txb->wire.flags		= call->conn->out_clientflag;
 43		txb->wire.userStatus	= 0;
 44		txb->wire.securityIndex	= call->security_ix;
 45		txb->wire._rsvd		= 0;
 46		txb->wire.serviceId	= htons(call->dest_srx.srx_service);
 47
 48		trace_rxrpc_txbuf(txb->debug_id,
 49				  txb->call_debug_id, txb->seq, 1,
 50				  packet_type == RXRPC_PACKET_TYPE_DATA ?
 51				  rxrpc_txbuf_alloc_data :
 52				  rxrpc_txbuf_alloc_ack);
 53		atomic_inc(&rxrpc_nr_txbuf);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 54	}
 55
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 56	return txb;
 57}
 58
 59void rxrpc_get_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
 60{
 61	int r;
 62
 63	__refcount_inc(&txb->ref, &r);
 64	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r + 1, what);
 65}
 66
 67void rxrpc_see_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
 68{
 69	int r = refcount_read(&txb->ref);
 70
 71	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r, what);
 72}
 73
 74static void rxrpc_free_txbuf(struct rcu_head *rcu)
 75{
 76	struct rxrpc_txbuf *txb = container_of(rcu, struct rxrpc_txbuf, rcu);
 77
 78	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 0,
 79			  rxrpc_txbuf_free);
 
 
 
 80	kfree(txb);
 81	atomic_dec(&rxrpc_nr_txbuf);
 82}
 83
 84void rxrpc_put_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
 85{
 86	unsigned int debug_id, call_debug_id;
 87	rxrpc_seq_t seq;
 88	bool dead;
 89	int r;
 90
 91	if (txb) {
 92		debug_id = txb->debug_id;
 93		call_debug_id = txb->call_debug_id;
 94		seq = txb->seq;
 95		dead = __refcount_dec_and_test(&txb->ref, &r);
 96		trace_rxrpc_txbuf(debug_id, call_debug_id, seq, r - 1, what);
 97		if (dead)
 98			call_rcu(&txb->rcu, rxrpc_free_txbuf);
 99	}
100}
101
102/*
103 * Shrink the transmit buffer.
104 */
105void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
106{
107	struct rxrpc_txbuf *txb;
108	rxrpc_seq_t hard_ack = smp_load_acquire(&call->acks_hard_ack);
109	bool wake = false;
110
111	_enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top);
112
113	while ((txb = list_first_entry_or_null(&call->tx_buffer,
114					       struct rxrpc_txbuf, call_link))) {
115		hard_ack = smp_load_acquire(&call->acks_hard_ack);
116		if (before(hard_ack, txb->seq))
117			break;
118
119		if (txb->seq != call->tx_bottom + 1)
120			rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step);
121		ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
122		smp_store_release(&call->tx_bottom, call->tx_bottom + 1);
123		list_del_rcu(&txb->call_link);
124
125		trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue);
126
127		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated);
128		if (after(call->acks_hard_ack, call->tx_bottom + 128))
129			wake = true;
130	}
131
132	if (wake)
133		wake_up(&call->waitq);
134}