Linux Audio

Check our new training course

In-person Linux kernel drivers training

Jun 16-20, 2025
Register
Loading...
   1/* Management of Tx window, Tx resend, ACKs and out-of-sequence reception
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/module.h>
  13#include <linux/circ_buf.h>
  14#include <linux/net.h>
  15#include <linux/skbuff.h>
  16#include <linux/slab.h>
  17#include <linux/udp.h>
  18#include <net/sock.h>
  19#include <net/af_rxrpc.h>
  20#include "ar-internal.h"
  21
  22/*
  23 * How long to wait before scheduling ACK generation after seeing a
  24 * packet with RXRPC_REQUEST_ACK set (in jiffies).
  25 */
  26unsigned rxrpc_requested_ack_delay = 1;
  27
  28/*
  29 * How long to wait before scheduling an ACK with subtype DELAY (in jiffies).
  30 *
  31 * We use this when we've received new data packets.  If those packets aren't
  32 * all consumed within this time we will send a DELAY ACK if an ACK was not
  33 * requested to let the sender know it doesn't need to resend.
  34 */
  35unsigned rxrpc_soft_ack_delay = 1 * HZ;
  36
  37/*
  38 * How long to wait before scheduling an ACK with subtype IDLE (in jiffies).
  39 *
  40 * We use this when we've consumed some previously soft-ACK'd packets when
  41 * further packets aren't immediately received to decide when to send an IDLE
  42 * ACK let the other end know that it can free up its Tx buffer space.
  43 */
  44unsigned rxrpc_idle_ack_delay = 0.5 * HZ;
  45
  46/*
  47 * Receive window size in packets.  This indicates the maximum number of
  48 * unconsumed received packets we're willing to retain in memory.  Once this
  49 * limit is hit, we should generate an EXCEEDS_WINDOW ACK and discard further
  50 * packets.
  51 */
  52unsigned rxrpc_rx_window_size = 32;
  53
  54/*
  55 * Maximum Rx MTU size.  This indicates to the sender the size of jumbo packet
  56 * made by gluing normal packets together that we're willing to handle.
  57 */
  58unsigned rxrpc_rx_mtu = 5692;
  59
  60/*
  61 * The maximum number of fragments in a received jumbo packet that we tell the
  62 * sender that we're willing to handle.
  63 */
  64unsigned rxrpc_rx_jumbo_max = 4;
  65
  66static const char *rxrpc_acks(u8 reason)
  67{
  68	static const char *const str[] = {
  69		"---", "REQ", "DUP", "OOS", "WIN", "MEM", "PNG", "PNR", "DLY",
  70		"IDL", "-?-"
  71	};
  72
  73	if (reason >= ARRAY_SIZE(str))
  74		reason = ARRAY_SIZE(str) - 1;
  75	return str[reason];
  76}
  77
  78static const s8 rxrpc_ack_priority[] = {
  79	[0]				= 0,
  80	[RXRPC_ACK_DELAY]		= 1,
  81	[RXRPC_ACK_REQUESTED]		= 2,
  82	[RXRPC_ACK_IDLE]		= 3,
  83	[RXRPC_ACK_PING_RESPONSE]	= 4,
  84	[RXRPC_ACK_DUPLICATE]		= 5,
  85	[RXRPC_ACK_OUT_OF_SEQUENCE]	= 6,
  86	[RXRPC_ACK_EXCEEDS_WINDOW]	= 7,
  87	[RXRPC_ACK_NOSPACE]		= 8,
  88};
  89
  90/*
  91 * propose an ACK be sent
  92 */
  93void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
  94			 __be32 serial, bool immediate)
  95{
  96	unsigned long expiry;
  97	s8 prior = rxrpc_ack_priority[ack_reason];
  98
  99	ASSERTCMP(prior, >, 0);
 100
 101	_enter("{%d},%s,%%%x,%u",
 102	       call->debug_id, rxrpc_acks(ack_reason), ntohl(serial),
 103	       immediate);
 104
 105	if (prior < rxrpc_ack_priority[call->ackr_reason]) {
 106		if (immediate)
 107			goto cancel_timer;
 108		return;
 109	}
 110
 111	/* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
 112	 * numbers */
 113	if (prior == rxrpc_ack_priority[call->ackr_reason]) {
 114		if (prior <= 4)
 115			call->ackr_serial = serial;
 116		if (immediate)
 117			goto cancel_timer;
 118		return;
 119	}
 120
 121	call->ackr_reason = ack_reason;
 122	call->ackr_serial = serial;
 123
 124	switch (ack_reason) {
 125	case RXRPC_ACK_DELAY:
 126		_debug("run delay timer");
 127		expiry = rxrpc_soft_ack_delay;
 128		goto run_timer;
 129
 130	case RXRPC_ACK_IDLE:
 131		if (!immediate) {
 132			_debug("run defer timer");
 133			expiry = rxrpc_idle_ack_delay;
 134			goto run_timer;
 135		}
 136		goto cancel_timer;
 137
 138	case RXRPC_ACK_REQUESTED:
 139		expiry = rxrpc_requested_ack_delay;
 140		if (!expiry)
 141			goto cancel_timer;
 142		if (!immediate || serial == cpu_to_be32(1)) {
 143			_debug("run defer timer");
 144			goto run_timer;
 145		}
 146
 147	default:
 148		_debug("immediate ACK");
 149		goto cancel_timer;
 150	}
 151
 152run_timer:
 153	expiry += jiffies;
 154	if (!timer_pending(&call->ack_timer) ||
 155	    time_after(call->ack_timer.expires, expiry))
 156		mod_timer(&call->ack_timer, expiry);
 157	return;
 158
 159cancel_timer:
 160	_debug("cancel timer %%%u", ntohl(serial));
 161	try_to_del_timer_sync(&call->ack_timer);
 162	read_lock_bh(&call->state_lock);
 163	if (call->state <= RXRPC_CALL_COMPLETE &&
 164	    !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
 165		rxrpc_queue_call(call);
 166	read_unlock_bh(&call->state_lock);
 167}
 168
 169/*
 170 * propose an ACK be sent, locking the call structure
 171 */
 172void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
 173		       __be32 serial, bool immediate)
 174{
 175	s8 prior = rxrpc_ack_priority[ack_reason];
 176
 177	if (prior > rxrpc_ack_priority[call->ackr_reason]) {
 178		spin_lock_bh(&call->lock);
 179		__rxrpc_propose_ACK(call, ack_reason, serial, immediate);
 180		spin_unlock_bh(&call->lock);
 181	}
 182}
 183
 184/*
 185 * set the resend timer
 186 */
 187static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
 188			     unsigned long resend_at)
 189{
 190	read_lock_bh(&call->state_lock);
 191	if (call->state >= RXRPC_CALL_COMPLETE)
 192		resend = 0;
 193
 194	if (resend & 1) {
 195		_debug("SET RESEND");
 196		set_bit(RXRPC_CALL_RESEND, &call->events);
 197	}
 198
 199	if (resend & 2) {
 200		_debug("MODIFY RESEND TIMER");
 201		set_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 202		mod_timer(&call->resend_timer, resend_at);
 203	} else {
 204		_debug("KILL RESEND TIMER");
 205		del_timer_sync(&call->resend_timer);
 206		clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
 207		clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 208	}
 209	read_unlock_bh(&call->state_lock);
 210}
 211
 212/*
 213 * resend packets
 214 */
 215static void rxrpc_resend(struct rxrpc_call *call)
 216{
 217	struct rxrpc_skb_priv *sp;
 218	struct rxrpc_header *hdr;
 219	struct sk_buff *txb;
 220	unsigned long *p_txb, resend_at;
 221	int loop, stop;
 222	u8 resend;
 223
 224	_enter("{%d,%d,%d,%d},",
 225	       call->acks_hard, call->acks_unacked,
 226	       atomic_read(&call->sequence),
 227	       CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
 228
 229	stop = 0;
 230	resend = 0;
 231	resend_at = 0;
 232
 233	for (loop = call->acks_tail;
 234	     loop != call->acks_head || stop;
 235	     loop = (loop + 1) &  (call->acks_winsz - 1)
 236	     ) {
 237		p_txb = call->acks_window + loop;
 238		smp_read_barrier_depends();
 239		if (*p_txb & 1)
 240			continue;
 241
 242		txb = (struct sk_buff *) *p_txb;
 243		sp = rxrpc_skb(txb);
 244
 245		if (sp->need_resend) {
 246			sp->need_resend = false;
 247
 248			/* each Tx packet has a new serial number */
 249			sp->hdr.serial =
 250				htonl(atomic_inc_return(&call->conn->serial));
 251
 252			hdr = (struct rxrpc_header *) txb->head;
 253			hdr->serial = sp->hdr.serial;
 254
 255			_proto("Tx DATA %%%u { #%d }",
 256			       ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
 257			if (rxrpc_send_packet(call->conn->trans, txb) < 0) {
 258				stop = 0;
 259				sp->resend_at = jiffies + 3;
 260			} else {
 261				sp->resend_at =
 262					jiffies + rxrpc_resend_timeout * HZ;
 263			}
 264		}
 265
 266		if (time_after_eq(jiffies + 1, sp->resend_at)) {
 267			sp->need_resend = true;
 268			resend |= 1;
 269		} else if (resend & 2) {
 270			if (time_before(sp->resend_at, resend_at))
 271				resend_at = sp->resend_at;
 272		} else {
 273			resend_at = sp->resend_at;
 274			resend |= 2;
 275		}
 276	}
 277
 278	rxrpc_set_resend(call, resend, resend_at);
 279	_leave("");
 280}
 281
 282/*
 283 * handle resend timer expiry
 284 */
 285static void rxrpc_resend_timer(struct rxrpc_call *call)
 286{
 287	struct rxrpc_skb_priv *sp;
 288	struct sk_buff *txb;
 289	unsigned long *p_txb, resend_at;
 290	int loop;
 291	u8 resend;
 292
 293	_enter("%d,%d,%d",
 294	       call->acks_tail, call->acks_unacked, call->acks_head);
 295
 296	if (call->state >= RXRPC_CALL_COMPLETE)
 297		return;
 298
 299	resend = 0;
 300	resend_at = 0;
 301
 302	for (loop = call->acks_unacked;
 303	     loop != call->acks_head;
 304	     loop = (loop + 1) &  (call->acks_winsz - 1)
 305	     ) {
 306		p_txb = call->acks_window + loop;
 307		smp_read_barrier_depends();
 308		txb = (struct sk_buff *) (*p_txb & ~1);
 309		sp = rxrpc_skb(txb);
 310
 311		ASSERT(!(*p_txb & 1));
 312
 313		if (sp->need_resend) {
 314			;
 315		} else if (time_after_eq(jiffies + 1, sp->resend_at)) {
 316			sp->need_resend = true;
 317			resend |= 1;
 318		} else if (resend & 2) {
 319			if (time_before(sp->resend_at, resend_at))
 320				resend_at = sp->resend_at;
 321		} else {
 322			resend_at = sp->resend_at;
 323			resend |= 2;
 324		}
 325	}
 326
 327	rxrpc_set_resend(call, resend, resend_at);
 328	_leave("");
 329}
 330
 331/*
 332 * process soft ACKs of our transmitted packets
 333 * - these indicate packets the peer has or has not received, but hasn't yet
 334 *   given to the consumer, and so can still be discarded and re-requested
 335 */
 336static int rxrpc_process_soft_ACKs(struct rxrpc_call *call,
 337				   struct rxrpc_ackpacket *ack,
 338				   struct sk_buff *skb)
 339{
 340	struct rxrpc_skb_priv *sp;
 341	struct sk_buff *txb;
 342	unsigned long *p_txb, resend_at;
 343	int loop;
 344	u8 sacks[RXRPC_MAXACKS], resend;
 345
 346	_enter("{%d,%d},{%d},",
 347	       call->acks_hard,
 348	       CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz),
 349	       ack->nAcks);
 350
 351	if (skb_copy_bits(skb, 0, sacks, ack->nAcks) < 0)
 352		goto protocol_error;
 353
 354	resend = 0;
 355	resend_at = 0;
 356	for (loop = 0; loop < ack->nAcks; loop++) {
 357		p_txb = call->acks_window;
 358		p_txb += (call->acks_tail + loop) & (call->acks_winsz - 1);
 359		smp_read_barrier_depends();
 360		txb = (struct sk_buff *) (*p_txb & ~1);
 361		sp = rxrpc_skb(txb);
 362
 363		switch (sacks[loop]) {
 364		case RXRPC_ACK_TYPE_ACK:
 365			sp->need_resend = false;
 366			*p_txb |= 1;
 367			break;
 368		case RXRPC_ACK_TYPE_NACK:
 369			sp->need_resend = true;
 370			*p_txb &= ~1;
 371			resend = 1;
 372			break;
 373		default:
 374			_debug("Unsupported ACK type %d", sacks[loop]);
 375			goto protocol_error;
 376		}
 377	}
 378
 379	smp_mb();
 380	call->acks_unacked = (call->acks_tail + loop) & (call->acks_winsz - 1);
 381
 382	/* anything not explicitly ACK'd is implicitly NACK'd, but may just not
 383	 * have been received or processed yet by the far end */
 384	for (loop = call->acks_unacked;
 385	     loop != call->acks_head;
 386	     loop = (loop + 1) &  (call->acks_winsz - 1)
 387	     ) {
 388		p_txb = call->acks_window + loop;
 389		smp_read_barrier_depends();
 390		txb = (struct sk_buff *) (*p_txb & ~1);
 391		sp = rxrpc_skb(txb);
 392
 393		if (*p_txb & 1) {
 394			/* packet must have been discarded */
 395			sp->need_resend = true;
 396			*p_txb &= ~1;
 397			resend |= 1;
 398		} else if (sp->need_resend) {
 399			;
 400		} else if (time_after_eq(jiffies + 1, sp->resend_at)) {
 401			sp->need_resend = true;
 402			resend |= 1;
 403		} else if (resend & 2) {
 404			if (time_before(sp->resend_at, resend_at))
 405				resend_at = sp->resend_at;
 406		} else {
 407			resend_at = sp->resend_at;
 408			resend |= 2;
 409		}
 410	}
 411
 412	rxrpc_set_resend(call, resend, resend_at);
 413	_leave(" = 0");
 414	return 0;
 415
 416protocol_error:
 417	_leave(" = -EPROTO");
 418	return -EPROTO;
 419}
 420
 421/*
 422 * discard hard-ACK'd packets from the Tx window
 423 */
 424static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard)
 425{
 426	unsigned long _skb;
 427	int tail = call->acks_tail, old_tail;
 428	int win = CIRC_CNT(call->acks_head, tail, call->acks_winsz);
 429
 430	_enter("{%u,%u},%u", call->acks_hard, win, hard);
 431
 432	ASSERTCMP(hard - call->acks_hard, <=, win);
 433
 434	while (call->acks_hard < hard) {
 435		smp_read_barrier_depends();
 436		_skb = call->acks_window[tail] & ~1;
 437		rxrpc_free_skb((struct sk_buff *) _skb);
 438		old_tail = tail;
 439		tail = (tail + 1) & (call->acks_winsz - 1);
 440		call->acks_tail = tail;
 441		if (call->acks_unacked == old_tail)
 442			call->acks_unacked = tail;
 443		call->acks_hard++;
 444	}
 445
 446	wake_up(&call->tx_waitq);
 447}
 448
 449/*
 450 * clear the Tx window in the event of a failure
 451 */
 452static void rxrpc_clear_tx_window(struct rxrpc_call *call)
 453{
 454	rxrpc_rotate_tx_window(call, atomic_read(&call->sequence));
 455}
 456
 457/*
 458 * drain the out of sequence received packet queue into the packet Rx queue
 459 */
 460static int rxrpc_drain_rx_oos_queue(struct rxrpc_call *call)
 461{
 462	struct rxrpc_skb_priv *sp;
 463	struct sk_buff *skb;
 464	bool terminal;
 465	int ret;
 466
 467	_enter("{%d,%d}", call->rx_data_post, call->rx_first_oos);
 468
 469	spin_lock_bh(&call->lock);
 470
 471	ret = -ECONNRESET;
 472	if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
 473		goto socket_unavailable;
 474
 475	skb = skb_dequeue(&call->rx_oos_queue);
 476	if (skb) {
 477		sp = rxrpc_skb(skb);
 478
 479		_debug("drain OOS packet %d [%d]",
 480		       ntohl(sp->hdr.seq), call->rx_first_oos);
 481
 482		if (ntohl(sp->hdr.seq) != call->rx_first_oos) {
 483			skb_queue_head(&call->rx_oos_queue, skb);
 484			call->rx_first_oos = ntohl(rxrpc_skb(skb)->hdr.seq);
 485			_debug("requeue %p {%u}", skb, call->rx_first_oos);
 486		} else {
 487			skb->mark = RXRPC_SKB_MARK_DATA;
 488			terminal = ((sp->hdr.flags & RXRPC_LAST_PACKET) &&
 489				!(sp->hdr.flags & RXRPC_CLIENT_INITIATED));
 490			ret = rxrpc_queue_rcv_skb(call, skb, true, terminal);
 491			BUG_ON(ret < 0);
 492			_debug("drain #%u", call->rx_data_post);
 493			call->rx_data_post++;
 494
 495			/* find out what the next packet is */
 496			skb = skb_peek(&call->rx_oos_queue);
 497			if (skb)
 498				call->rx_first_oos =
 499					ntohl(rxrpc_skb(skb)->hdr.seq);
 500			else
 501				call->rx_first_oos = 0;
 502			_debug("peek %p {%u}", skb, call->rx_first_oos);
 503		}
 504	}
 505
 506	ret = 0;
 507socket_unavailable:
 508	spin_unlock_bh(&call->lock);
 509	_leave(" = %d", ret);
 510	return ret;
 511}
 512
 513/*
 514 * insert an out of sequence packet into the buffer
 515 */
 516static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
 517				    struct sk_buff *skb)
 518{
 519	struct rxrpc_skb_priv *sp, *psp;
 520	struct sk_buff *p;
 521	u32 seq;
 522
 523	sp = rxrpc_skb(skb);
 524	seq = ntohl(sp->hdr.seq);
 525	_enter(",,{%u}", seq);
 526
 527	skb->destructor = rxrpc_packet_destructor;
 528	ASSERTCMP(sp->call, ==, NULL);
 529	sp->call = call;
 530	rxrpc_get_call(call);
 531
 532	/* insert into the buffer in sequence order */
 533	spin_lock_bh(&call->lock);
 534
 535	skb_queue_walk(&call->rx_oos_queue, p) {
 536		psp = rxrpc_skb(p);
 537		if (ntohl(psp->hdr.seq) > seq) {
 538			_debug("insert oos #%u before #%u",
 539			       seq, ntohl(psp->hdr.seq));
 540			skb_insert(p, skb, &call->rx_oos_queue);
 541			goto inserted;
 542		}
 543	}
 544
 545	_debug("append oos #%u", seq);
 546	skb_queue_tail(&call->rx_oos_queue, skb);
 547inserted:
 548
 549	/* we might now have a new front to the queue */
 550	if (call->rx_first_oos == 0 || seq < call->rx_first_oos)
 551		call->rx_first_oos = seq;
 552
 553	read_lock(&call->state_lock);
 554	if (call->state < RXRPC_CALL_COMPLETE &&
 555	    call->rx_data_post == call->rx_first_oos) {
 556		_debug("drain rx oos now");
 557		set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
 558	}
 559	read_unlock(&call->state_lock);
 560
 561	spin_unlock_bh(&call->lock);
 562	_leave(" [stored #%u]", call->rx_first_oos);
 563}
 564
 565/*
 566 * clear the Tx window on final ACK reception
 567 */
 568static void rxrpc_zap_tx_window(struct rxrpc_call *call)
 569{
 570	struct rxrpc_skb_priv *sp;
 571	struct sk_buff *skb;
 572	unsigned long _skb, *acks_window;
 573	u8 winsz = call->acks_winsz;
 574	int tail;
 575
 576	acks_window = call->acks_window;
 577	call->acks_window = NULL;
 578
 579	while (CIRC_CNT(call->acks_head, call->acks_tail, winsz) > 0) {
 580		tail = call->acks_tail;
 581		smp_read_barrier_depends();
 582		_skb = acks_window[tail] & ~1;
 583		smp_mb();
 584		call->acks_tail = (call->acks_tail + 1) & (winsz - 1);
 585
 586		skb = (struct sk_buff *) _skb;
 587		sp = rxrpc_skb(skb);
 588		_debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
 589		rxrpc_free_skb(skb);
 590	}
 591
 592	kfree(acks_window);
 593}
 594
 595/*
 596 * process the extra information that may be appended to an ACK packet
 597 */
 598static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
 599				  unsigned int latest, int nAcks)
 600{
 601	struct rxrpc_ackinfo ackinfo;
 602	struct rxrpc_peer *peer;
 603	unsigned int mtu;
 604
 605	if (skb_copy_bits(skb, nAcks + 3, &ackinfo, sizeof(ackinfo)) < 0) {
 606		_leave(" [no ackinfo]");
 607		return;
 608	}
 609
 610	_proto("Rx ACK %%%u Info { rx=%u max=%u rwin=%u jm=%u }",
 611	       latest,
 612	       ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU),
 613	       ntohl(ackinfo.rwind), ntohl(ackinfo.jumbo_max));
 614
 615	mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU));
 616
 617	peer = call->conn->trans->peer;
 618	if (mtu < peer->maxdata) {
 619		spin_lock_bh(&peer->lock);
 620		peer->maxdata = mtu;
 621		peer->mtu = mtu + peer->hdrsize;
 622		spin_unlock_bh(&peer->lock);
 623		_net("Net MTU %u (maxdata %u)", peer->mtu, peer->maxdata);
 624	}
 625}
 626
 627/*
 628 * process packets in the reception queue
 629 */
 630static int rxrpc_process_rx_queue(struct rxrpc_call *call,
 631				  u32 *_abort_code)
 632{
 633	struct rxrpc_ackpacket ack;
 634	struct rxrpc_skb_priv *sp;
 635	struct sk_buff *skb;
 636	bool post_ACK;
 637	int latest;
 638	u32 hard, tx;
 639
 640	_enter("");
 641
 642process_further:
 643	skb = skb_dequeue(&call->rx_queue);
 644	if (!skb)
 645		return -EAGAIN;
 646
 647	_net("deferred skb %p", skb);
 648
 649	sp = rxrpc_skb(skb);
 650
 651	_debug("process %s [st %d]", rxrpc_pkts[sp->hdr.type], call->state);
 652
 653	post_ACK = false;
 654
 655	switch (sp->hdr.type) {
 656		/* data packets that wind up here have been received out of
 657		 * order, need security processing or are jumbo packets */
 658	case RXRPC_PACKET_TYPE_DATA:
 659		_proto("OOSQ DATA %%%u { #%u }",
 660		       ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
 661
 662		/* secured packets must be verified and possibly decrypted */
 663		if (rxrpc_verify_packet(call, skb, _abort_code) < 0)
 664			goto protocol_error;
 665
 666		rxrpc_insert_oos_packet(call, skb);
 667		goto process_further;
 668
 669		/* partial ACK to process */
 670	case RXRPC_PACKET_TYPE_ACK:
 671		if (skb_copy_bits(skb, 0, &ack, sizeof(ack)) < 0) {
 672			_debug("extraction failure");
 673			goto protocol_error;
 674		}
 675		if (!skb_pull(skb, sizeof(ack)))
 676			BUG();
 677
 678		latest = ntohl(sp->hdr.serial);
 679		hard = ntohl(ack.firstPacket);
 680		tx = atomic_read(&call->sequence);
 681
 682		_proto("Rx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
 683		       latest,
 684		       ntohs(ack.maxSkew),
 685		       hard,
 686		       ntohl(ack.previousPacket),
 687		       ntohl(ack.serial),
 688		       rxrpc_acks(ack.reason),
 689		       ack.nAcks);
 690
 691		rxrpc_extract_ackinfo(call, skb, latest, ack.nAcks);
 692
 693		if (ack.reason == RXRPC_ACK_PING) {
 694			_proto("Rx ACK %%%u PING Request", latest);
 695			rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
 696					  sp->hdr.serial, true);
 697		}
 698
 699		/* discard any out-of-order or duplicate ACKs */
 700		if (latest - call->acks_latest <= 0) {
 701			_debug("discard ACK %d <= %d",
 702			       latest, call->acks_latest);
 703			goto discard;
 704		}
 705		call->acks_latest = latest;
 706
 707		if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
 708		    call->state != RXRPC_CALL_CLIENT_AWAIT_REPLY &&
 709		    call->state != RXRPC_CALL_SERVER_SEND_REPLY &&
 710		    call->state != RXRPC_CALL_SERVER_AWAIT_ACK)
 711			goto discard;
 712
 713		_debug("Tx=%d H=%u S=%d", tx, call->acks_hard, call->state);
 714
 715		if (hard > 0) {
 716			if (hard - 1 > tx) {
 717				_debug("hard-ACK'd packet %d not transmitted"
 718				       " (%d top)",
 719				       hard - 1, tx);
 720				goto protocol_error;
 721			}
 722
 723			if ((call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY ||
 724			     call->state == RXRPC_CALL_SERVER_AWAIT_ACK) &&
 725			    hard > tx)
 726				goto all_acked;
 727
 728			smp_rmb();
 729			rxrpc_rotate_tx_window(call, hard - 1);
 730		}
 731
 732		if (ack.nAcks > 0) {
 733			if (hard - 1 + ack.nAcks > tx) {
 734				_debug("soft-ACK'd packet %d+%d not"
 735				       " transmitted (%d top)",
 736				       hard - 1, ack.nAcks, tx);
 737				goto protocol_error;
 738			}
 739
 740			if (rxrpc_process_soft_ACKs(call, &ack, skb) < 0)
 741				goto protocol_error;
 742		}
 743		goto discard;
 744
 745		/* complete ACK to process */
 746	case RXRPC_PACKET_TYPE_ACKALL:
 747		goto all_acked;
 748
 749		/* abort and busy are handled elsewhere */
 750	case RXRPC_PACKET_TYPE_BUSY:
 751	case RXRPC_PACKET_TYPE_ABORT:
 752		BUG();
 753
 754		/* connection level events - also handled elsewhere */
 755	case RXRPC_PACKET_TYPE_CHALLENGE:
 756	case RXRPC_PACKET_TYPE_RESPONSE:
 757	case RXRPC_PACKET_TYPE_DEBUG:
 758		BUG();
 759	}
 760
 761	/* if we've had a hard ACK that covers all the packets we've sent, then
 762	 * that ends that phase of the operation */
 763all_acked:
 764	write_lock_bh(&call->state_lock);
 765	_debug("ack all %d", call->state);
 766
 767	switch (call->state) {
 768	case RXRPC_CALL_CLIENT_AWAIT_REPLY:
 769		call->state = RXRPC_CALL_CLIENT_RECV_REPLY;
 770		break;
 771	case RXRPC_CALL_SERVER_AWAIT_ACK:
 772		_debug("srv complete");
 773		call->state = RXRPC_CALL_COMPLETE;
 774		post_ACK = true;
 775		break;
 776	case RXRPC_CALL_CLIENT_SEND_REQUEST:
 777	case RXRPC_CALL_SERVER_RECV_REQUEST:
 778		goto protocol_error_unlock; /* can't occur yet */
 779	default:
 780		write_unlock_bh(&call->state_lock);
 781		goto discard; /* assume packet left over from earlier phase */
 782	}
 783
 784	write_unlock_bh(&call->state_lock);
 785
 786	/* if all the packets we sent are hard-ACK'd, then we can discard
 787	 * whatever we've got left */
 788	_debug("clear Tx %d",
 789	       CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
 790
 791	del_timer_sync(&call->resend_timer);
 792	clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 793	clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
 794
 795	if (call->acks_window)
 796		rxrpc_zap_tx_window(call);
 797
 798	if (post_ACK) {
 799		/* post the final ACK message for userspace to pick up */
 800		_debug("post ACK");
 801		skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
 802		sp->call = call;
 803		rxrpc_get_call(call);
 804		spin_lock_bh(&call->lock);
 805		if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
 806			BUG();
 807		spin_unlock_bh(&call->lock);
 808		goto process_further;
 809	}
 810
 811discard:
 812	rxrpc_free_skb(skb);
 813	goto process_further;
 814
 815protocol_error_unlock:
 816	write_unlock_bh(&call->state_lock);
 817protocol_error:
 818	rxrpc_free_skb(skb);
 819	_leave(" = -EPROTO");
 820	return -EPROTO;
 821}
 822
 823/*
 824 * post a message to the socket Rx queue for recvmsg() to pick up
 825 */
 826static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
 827			      bool fatal)
 828{
 829	struct rxrpc_skb_priv *sp;
 830	struct sk_buff *skb;
 831	int ret;
 832
 833	_enter("{%d,%lx},%u,%u,%d",
 834	       call->debug_id, call->flags, mark, error, fatal);
 835
 836	/* remove timers and things for fatal messages */
 837	if (fatal) {
 838		del_timer_sync(&call->resend_timer);
 839		del_timer_sync(&call->ack_timer);
 840		clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 841	}
 842
 843	if (mark != RXRPC_SKB_MARK_NEW_CALL &&
 844	    !test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
 845		_leave("[no userid]");
 846		return 0;
 847	}
 848
 849	if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags)) {
 850		skb = alloc_skb(0, GFP_NOFS);
 851		if (!skb)
 852			return -ENOMEM;
 853
 854		rxrpc_new_skb(skb);
 855
 856		skb->mark = mark;
 857
 858		sp = rxrpc_skb(skb);
 859		memset(sp, 0, sizeof(*sp));
 860		sp->error = error;
 861		sp->call = call;
 862		rxrpc_get_call(call);
 863
 864		spin_lock_bh(&call->lock);
 865		ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
 866		spin_unlock_bh(&call->lock);
 867		BUG_ON(ret < 0);
 868	}
 869
 870	return 0;
 871}
 872
 873/*
 874 * handle background processing of incoming call packets and ACK / abort
 875 * generation
 876 */
 877void rxrpc_process_call(struct work_struct *work)
 878{
 879	struct rxrpc_call *call =
 880		container_of(work, struct rxrpc_call, processor);
 881	struct rxrpc_ackpacket ack;
 882	struct rxrpc_ackinfo ackinfo;
 883	struct rxrpc_header hdr;
 884	struct msghdr msg;
 885	struct kvec iov[5];
 886	unsigned long bits;
 887	__be32 data, pad;
 888	size_t len;
 889	int genbit, loop, nbit, ioc, ret, mtu;
 890	u32 abort_code = RX_PROTOCOL_ERROR;
 891	u8 *acks = NULL;
 892
 893	//printk("\n--------------------\n");
 894	_enter("{%d,%s,%lx} [%lu]",
 895	       call->debug_id, rxrpc_call_states[call->state], call->events,
 896	       (jiffies - call->creation_jif) / (HZ / 10));
 897
 898	if (test_and_set_bit(RXRPC_CALL_PROC_BUSY, &call->flags)) {
 899		_debug("XXXXXXXXXXXXX RUNNING ON MULTIPLE CPUS XXXXXXXXXXXXX");
 900		return;
 901	}
 902
 903	/* there's a good chance we're going to have to send a message, so set
 904	 * one up in advance */
 905	msg.msg_name	= &call->conn->trans->peer->srx.transport.sin;
 906	msg.msg_namelen	= sizeof(call->conn->trans->peer->srx.transport.sin);
 907	msg.msg_control	= NULL;
 908	msg.msg_controllen = 0;
 909	msg.msg_flags	= 0;
 910
 911	hdr.epoch	= call->conn->epoch;
 912	hdr.cid		= call->cid;
 913	hdr.callNumber	= call->call_id;
 914	hdr.seq		= 0;
 915	hdr.type	= RXRPC_PACKET_TYPE_ACK;
 916	hdr.flags	= call->conn->out_clientflag;
 917	hdr.userStatus	= 0;
 918	hdr.securityIndex = call->conn->security_ix;
 919	hdr._rsvd	= 0;
 920	hdr.serviceId	= call->conn->service_id;
 921
 922	memset(iov, 0, sizeof(iov));
 923	iov[0].iov_base	= &hdr;
 924	iov[0].iov_len	= sizeof(hdr);
 925
 926	/* deal with events of a final nature */
 927	if (test_bit(RXRPC_CALL_RELEASE, &call->events)) {
 928		rxrpc_release_call(call);
 929		clear_bit(RXRPC_CALL_RELEASE, &call->events);
 930	}
 931
 932	if (test_bit(RXRPC_CALL_RCVD_ERROR, &call->events)) {
 933		int error;
 934
 935		clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
 936		clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
 937		clear_bit(RXRPC_CALL_ABORT, &call->events);
 938
 939		error = call->conn->trans->peer->net_error;
 940		_debug("post net error %d", error);
 941
 942		if (rxrpc_post_message(call, RXRPC_SKB_MARK_NET_ERROR,
 943				       error, true) < 0)
 944			goto no_mem;
 945		clear_bit(RXRPC_CALL_RCVD_ERROR, &call->events);
 946		goto kill_ACKs;
 947	}
 948
 949	if (test_bit(RXRPC_CALL_CONN_ABORT, &call->events)) {
 950		ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
 951
 952		clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
 953		clear_bit(RXRPC_CALL_ABORT, &call->events);
 954
 955		_debug("post conn abort");
 956
 957		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 958				       call->conn->error, true) < 0)
 959			goto no_mem;
 960		clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
 961		goto kill_ACKs;
 962	}
 963
 964	if (test_bit(RXRPC_CALL_REJECT_BUSY, &call->events)) {
 965		hdr.type = RXRPC_PACKET_TYPE_BUSY;
 966		genbit = RXRPC_CALL_REJECT_BUSY;
 967		goto send_message;
 968	}
 969
 970	if (test_bit(RXRPC_CALL_ABORT, &call->events)) {
 971		ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
 972
 973		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 974				       ECONNABORTED, true) < 0)
 975			goto no_mem;
 976		hdr.type = RXRPC_PACKET_TYPE_ABORT;
 977		data = htonl(call->abort_code);
 978		iov[1].iov_base = &data;
 979		iov[1].iov_len = sizeof(data);
 980		genbit = RXRPC_CALL_ABORT;
 981		goto send_message;
 982	}
 983
 984	if (test_bit(RXRPC_CALL_ACK_FINAL, &call->events)) {
 985		genbit = RXRPC_CALL_ACK_FINAL;
 986
 987		ack.bufferSpace	= htons(8);
 988		ack.maxSkew	= 0;
 989		ack.serial	= 0;
 990		ack.reason	= RXRPC_ACK_IDLE;
 991		ack.nAcks	= 0;
 992		call->ackr_reason = 0;
 993
 994		spin_lock_bh(&call->lock);
 995		ack.serial = call->ackr_serial;
 996		ack.previousPacket = call->ackr_prev_seq;
 997		ack.firstPacket = htonl(call->rx_data_eaten + 1);
 998		spin_unlock_bh(&call->lock);
 999
1000		pad = 0;
1001
1002		iov[1].iov_base = &ack;
1003		iov[1].iov_len	= sizeof(ack);
1004		iov[2].iov_base = &pad;
1005		iov[2].iov_len	= 3;
1006		iov[3].iov_base = &ackinfo;
1007		iov[3].iov_len	= sizeof(ackinfo);
1008		goto send_ACK;
1009	}
1010
1011	if (call->events & ((1 << RXRPC_CALL_RCVD_BUSY) |
1012			    (1 << RXRPC_CALL_RCVD_ABORT))
1013	    ) {
1014		u32 mark;
1015
1016		if (test_bit(RXRPC_CALL_RCVD_ABORT, &call->events))
1017			mark = RXRPC_SKB_MARK_REMOTE_ABORT;
1018		else
1019			mark = RXRPC_SKB_MARK_BUSY;
1020
1021		_debug("post abort/busy");
1022		rxrpc_clear_tx_window(call);
1023		if (rxrpc_post_message(call, mark, ECONNABORTED, true) < 0)
1024			goto no_mem;
1025
1026		clear_bit(RXRPC_CALL_RCVD_BUSY, &call->events);
1027		clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
1028		goto kill_ACKs;
1029	}
1030
1031	if (test_and_clear_bit(RXRPC_CALL_RCVD_ACKALL, &call->events)) {
1032		_debug("do implicit ackall");
1033		rxrpc_clear_tx_window(call);
1034	}
1035
1036	if (test_bit(RXRPC_CALL_LIFE_TIMER, &call->events)) {
1037		write_lock_bh(&call->state_lock);
1038		if (call->state <= RXRPC_CALL_COMPLETE) {
1039			call->state = RXRPC_CALL_LOCALLY_ABORTED;
1040			call->abort_code = RX_CALL_TIMEOUT;
1041			set_bit(RXRPC_CALL_ABORT, &call->events);
1042		}
1043		write_unlock_bh(&call->state_lock);
1044
1045		_debug("post timeout");
1046		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
1047				       ETIME, true) < 0)
1048			goto no_mem;
1049
1050		clear_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
1051		goto kill_ACKs;
1052	}
1053
1054	/* deal with assorted inbound messages */
1055	if (!skb_queue_empty(&call->rx_queue)) {
1056		switch (rxrpc_process_rx_queue(call, &abort_code)) {
1057		case 0:
1058		case -EAGAIN:
1059			break;
1060		case -ENOMEM:
1061			goto no_mem;
1062		case -EKEYEXPIRED:
1063		case -EKEYREJECTED:
1064		case -EPROTO:
1065			rxrpc_abort_call(call, abort_code);
1066			goto kill_ACKs;
1067		}
1068	}
1069
1070	/* handle resending */
1071	if (test_and_clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
1072		rxrpc_resend_timer(call);
1073	if (test_and_clear_bit(RXRPC_CALL_RESEND, &call->events))
1074		rxrpc_resend(call);
1075
1076	/* consider sending an ordinary ACK */
1077	if (test_bit(RXRPC_CALL_ACK, &call->events)) {
1078		_debug("send ACK: window: %d - %d { %lx }",
1079		       call->rx_data_eaten, call->ackr_win_top,
1080		       call->ackr_window[0]);
1081
1082		if (call->state > RXRPC_CALL_SERVER_ACK_REQUEST &&
1083		    call->ackr_reason != RXRPC_ACK_PING_RESPONSE) {
1084			/* ACK by sending reply DATA packet in this state */
1085			clear_bit(RXRPC_CALL_ACK, &call->events);
1086			goto maybe_reschedule;
1087		}
1088
1089		genbit = RXRPC_CALL_ACK;
1090
1091		acks = kzalloc(call->ackr_win_top - call->rx_data_eaten,
1092			       GFP_NOFS);
1093		if (!acks)
1094			goto no_mem;
1095
1096		//hdr.flags	= RXRPC_SLOW_START_OK;
1097		ack.bufferSpace	= htons(8);
1098		ack.maxSkew	= 0;
1099		ack.serial	= 0;
1100		ack.reason	= 0;
1101
1102		spin_lock_bh(&call->lock);
1103		ack.reason = call->ackr_reason;
1104		ack.serial = call->ackr_serial;
1105		ack.previousPacket = call->ackr_prev_seq;
1106		ack.firstPacket = htonl(call->rx_data_eaten + 1);
1107
1108		ack.nAcks = 0;
1109		for (loop = 0; loop < RXRPC_ACKR_WINDOW_ASZ; loop++) {
1110			nbit = loop * BITS_PER_LONG;
1111			for (bits = call->ackr_window[loop]; bits; bits >>= 1
1112			     ) {
1113				_debug("- l=%d n=%d b=%lx", loop, nbit, bits);
1114				if (bits & 1) {
1115					acks[nbit] = RXRPC_ACK_TYPE_ACK;
1116					ack.nAcks = nbit + 1;
1117				}
1118				nbit++;
1119			}
1120		}
1121		call->ackr_reason = 0;
1122		spin_unlock_bh(&call->lock);
1123
1124		pad = 0;
1125
1126		iov[1].iov_base = &ack;
1127		iov[1].iov_len	= sizeof(ack);
1128		iov[2].iov_base = acks;
1129		iov[2].iov_len	= ack.nAcks;
1130		iov[3].iov_base = &pad;
1131		iov[3].iov_len	= 3;
1132		iov[4].iov_base = &ackinfo;
1133		iov[4].iov_len	= sizeof(ackinfo);
1134
1135		switch (ack.reason) {
1136		case RXRPC_ACK_REQUESTED:
1137		case RXRPC_ACK_DUPLICATE:
1138		case RXRPC_ACK_OUT_OF_SEQUENCE:
1139		case RXRPC_ACK_EXCEEDS_WINDOW:
1140		case RXRPC_ACK_NOSPACE:
1141		case RXRPC_ACK_PING:
1142		case RXRPC_ACK_PING_RESPONSE:
1143			goto send_ACK_with_skew;
1144		case RXRPC_ACK_DELAY:
1145		case RXRPC_ACK_IDLE:
1146			goto send_ACK;
1147		}
1148	}
1149
1150	/* handle completion of security negotiations on an incoming
1151	 * connection */
1152	if (test_and_clear_bit(RXRPC_CALL_SECURED, &call->events)) {
1153		_debug("secured");
1154		spin_lock_bh(&call->lock);
1155
1156		if (call->state == RXRPC_CALL_SERVER_SECURING) {
1157			_debug("securing");
1158			write_lock(&call->conn->lock);
1159			if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1160			    !test_bit(RXRPC_CALL_RELEASE, &call->events)) {
1161				_debug("not released");
1162				call->state = RXRPC_CALL_SERVER_ACCEPTING;
1163				list_move_tail(&call->accept_link,
1164					       &call->socket->acceptq);
1165			}
1166			write_unlock(&call->conn->lock);
1167			read_lock(&call->state_lock);
1168			if (call->state < RXRPC_CALL_COMPLETE)
1169				set_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1170			read_unlock(&call->state_lock);
1171		}
1172
1173		spin_unlock_bh(&call->lock);
1174		if (!test_bit(RXRPC_CALL_POST_ACCEPT, &call->events))
1175			goto maybe_reschedule;
1176	}
1177
1178	/* post a notification of an acceptable connection to the app */
1179	if (test_bit(RXRPC_CALL_POST_ACCEPT, &call->events)) {
1180		_debug("post accept");
1181		if (rxrpc_post_message(call, RXRPC_SKB_MARK_NEW_CALL,
1182				       0, false) < 0)
1183			goto no_mem;
1184		clear_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1185		goto maybe_reschedule;
1186	}
1187
1188	/* handle incoming call acceptance */
1189	if (test_and_clear_bit(RXRPC_CALL_ACCEPTED, &call->events)) {
1190		_debug("accepted");
1191		ASSERTCMP(call->rx_data_post, ==, 0);
1192		call->rx_data_post = 1;
1193		read_lock_bh(&call->state_lock);
1194		if (call->state < RXRPC_CALL_COMPLETE)
1195			set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
1196		read_unlock_bh(&call->state_lock);
1197	}
1198
1199	/* drain the out of sequence received packet queue into the packet Rx
1200	 * queue */
1201	if (test_and_clear_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events)) {
1202		while (call->rx_data_post == call->rx_first_oos)
1203			if (rxrpc_drain_rx_oos_queue(call) < 0)
1204				break;
1205		goto maybe_reschedule;
1206	}
1207
1208	/* other events may have been raised since we started checking */
1209	goto maybe_reschedule;
1210
1211send_ACK_with_skew:
1212	ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) -
1213			    ntohl(ack.serial));
1214send_ACK:
1215	mtu = call->conn->trans->peer->if_mtu;
1216	mtu -= call->conn->trans->peer->hdrsize;
1217	ackinfo.maxMTU	= htonl(mtu);
1218	ackinfo.rwind	= htonl(rxrpc_rx_window_size);
1219
1220	/* permit the peer to send us jumbo packets if it wants to */
1221	ackinfo.rxMTU	= htonl(rxrpc_rx_mtu);
1222	ackinfo.jumbo_max = htonl(rxrpc_rx_jumbo_max);
1223
1224	hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1225	_proto("Tx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
1226	       ntohl(hdr.serial),
1227	       ntohs(ack.maxSkew),
1228	       ntohl(ack.firstPacket),
1229	       ntohl(ack.previousPacket),
1230	       ntohl(ack.serial),
1231	       rxrpc_acks(ack.reason),
1232	       ack.nAcks);
1233
1234	del_timer_sync(&call->ack_timer);
1235	if (ack.nAcks > 0)
1236		set_bit(RXRPC_CALL_TX_SOFT_ACK, &call->flags);
1237	goto send_message_2;
1238
1239send_message:
1240	_debug("send message");
1241
1242	hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1243	_proto("Tx %s %%%u", rxrpc_pkts[hdr.type], ntohl(hdr.serial));
1244send_message_2:
1245
1246	len = iov[0].iov_len;
1247	ioc = 1;
1248	if (iov[4].iov_len) {
1249		ioc = 5;
1250		len += iov[4].iov_len;
1251		len += iov[3].iov_len;
1252		len += iov[2].iov_len;
1253		len += iov[1].iov_len;
1254	} else if (iov[3].iov_len) {
1255		ioc = 4;
1256		len += iov[3].iov_len;
1257		len += iov[2].iov_len;
1258		len += iov[1].iov_len;
1259	} else if (iov[2].iov_len) {
1260		ioc = 3;
1261		len += iov[2].iov_len;
1262		len += iov[1].iov_len;
1263	} else if (iov[1].iov_len) {
1264		ioc = 2;
1265		len += iov[1].iov_len;
1266	}
1267
1268	ret = kernel_sendmsg(call->conn->trans->local->socket,
1269			     &msg, iov, ioc, len);
1270	if (ret < 0) {
1271		_debug("sendmsg failed: %d", ret);
1272		read_lock_bh(&call->state_lock);
1273		if (call->state < RXRPC_CALL_DEAD)
1274			rxrpc_queue_call(call);
1275		read_unlock_bh(&call->state_lock);
1276		goto error;
1277	}
1278
1279	switch (genbit) {
1280	case RXRPC_CALL_ABORT:
1281		clear_bit(genbit, &call->events);
1282		clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
1283		goto kill_ACKs;
1284
1285	case RXRPC_CALL_ACK_FINAL:
1286		write_lock_bh(&call->state_lock);
1287		if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
1288			call->state = RXRPC_CALL_COMPLETE;
1289		write_unlock_bh(&call->state_lock);
1290		goto kill_ACKs;
1291
1292	default:
1293		clear_bit(genbit, &call->events);
1294		switch (call->state) {
1295		case RXRPC_CALL_CLIENT_AWAIT_REPLY:
1296		case RXRPC_CALL_CLIENT_RECV_REPLY:
1297		case RXRPC_CALL_SERVER_RECV_REQUEST:
1298		case RXRPC_CALL_SERVER_ACK_REQUEST:
1299			_debug("start ACK timer");
1300			rxrpc_propose_ACK(call, RXRPC_ACK_DELAY,
1301					  call->ackr_serial, false);
1302		default:
1303			break;
1304		}
1305		goto maybe_reschedule;
1306	}
1307
1308kill_ACKs:
1309	del_timer_sync(&call->ack_timer);
1310	if (test_and_clear_bit(RXRPC_CALL_ACK_FINAL, &call->events))
1311		rxrpc_put_call(call);
1312	clear_bit(RXRPC_CALL_ACK, &call->events);
1313
1314maybe_reschedule:
1315	if (call->events || !skb_queue_empty(&call->rx_queue)) {
1316		read_lock_bh(&call->state_lock);
1317		if (call->state < RXRPC_CALL_DEAD)
1318			rxrpc_queue_call(call);
1319		read_unlock_bh(&call->state_lock);
1320	}
1321
1322	/* don't leave aborted connections on the accept queue */
1323	if (call->state >= RXRPC_CALL_COMPLETE &&
1324	    !list_empty(&call->accept_link)) {
1325		_debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
1326		       call, call->events, call->flags,
1327		       ntohl(call->conn->cid));
1328
1329		read_lock_bh(&call->state_lock);
1330		if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1331		    !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
1332			rxrpc_queue_call(call);
1333		read_unlock_bh(&call->state_lock);
1334	}
1335
1336error:
1337	clear_bit(RXRPC_CALL_PROC_BUSY, &call->flags);
1338	kfree(acks);
1339
1340	/* because we don't want two CPUs both processing the work item for one
1341	 * call at the same time, we use a flag to note when it's busy; however
1342	 * this means there's a race between clearing the flag and setting the
1343	 * work pending bit and the work item being processed again */
1344	if (call->events && !work_pending(&call->processor)) {
1345		_debug("jumpstart %x", ntohl(call->conn->cid));
1346		rxrpc_queue_call(call);
1347	}
1348
1349	_leave("");
1350	return;
1351
1352no_mem:
1353	_debug("out of memory");
1354	goto maybe_reschedule;
1355}
   1/* Management of Tx window, Tx resend, ACKs and out-of-sequence reception
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/module.h>
  13#include <linux/circ_buf.h>
  14#include <linux/net.h>
  15#include <linux/skbuff.h>
  16#include <linux/slab.h>
  17#include <linux/udp.h>
  18#include <net/sock.h>
  19#include <net/af_rxrpc.h>
  20#include "ar-internal.h"
  21
  22/*
  23 * How long to wait before scheduling ACK generation after seeing a
  24 * packet with RXRPC_REQUEST_ACK set (in jiffies).
  25 */
  26unsigned rxrpc_requested_ack_delay = 1;
  27
  28/*
  29 * How long to wait before scheduling an ACK with subtype DELAY (in jiffies).
  30 *
  31 * We use this when we've received new data packets.  If those packets aren't
  32 * all consumed within this time we will send a DELAY ACK if an ACK was not
  33 * requested to let the sender know it doesn't need to resend.
  34 */
  35unsigned rxrpc_soft_ack_delay = 1 * HZ;
  36
  37/*
  38 * How long to wait before scheduling an ACK with subtype IDLE (in jiffies).
  39 *
  40 * We use this when we've consumed some previously soft-ACK'd packets when
  41 * further packets aren't immediately received to decide when to send an IDLE
  42 * ACK let the other end know that it can free up its Tx buffer space.
  43 */
  44unsigned rxrpc_idle_ack_delay = 0.5 * HZ;
  45
  46/*
  47 * Receive window size in packets.  This indicates the maximum number of
  48 * unconsumed received packets we're willing to retain in memory.  Once this
  49 * limit is hit, we should generate an EXCEEDS_WINDOW ACK and discard further
  50 * packets.
  51 */
  52unsigned rxrpc_rx_window_size = 32;
  53
  54/*
  55 * Maximum Rx MTU size.  This indicates to the sender the size of jumbo packet
  56 * made by gluing normal packets together that we're willing to handle.
  57 */
  58unsigned rxrpc_rx_mtu = 5692;
  59
  60/*
  61 * The maximum number of fragments in a received jumbo packet that we tell the
  62 * sender that we're willing to handle.
  63 */
  64unsigned rxrpc_rx_jumbo_max = 4;
  65
  66static const char *rxrpc_acks(u8 reason)
  67{
  68	static const char *const str[] = {
  69		"---", "REQ", "DUP", "OOS", "WIN", "MEM", "PNG", "PNR", "DLY",
  70		"IDL", "-?-"
  71	};
  72
  73	if (reason >= ARRAY_SIZE(str))
  74		reason = ARRAY_SIZE(str) - 1;
  75	return str[reason];
  76}
  77
  78static const s8 rxrpc_ack_priority[] = {
  79	[0]				= 0,
  80	[RXRPC_ACK_DELAY]		= 1,
  81	[RXRPC_ACK_REQUESTED]		= 2,
  82	[RXRPC_ACK_IDLE]		= 3,
  83	[RXRPC_ACK_PING_RESPONSE]	= 4,
  84	[RXRPC_ACK_DUPLICATE]		= 5,
  85	[RXRPC_ACK_OUT_OF_SEQUENCE]	= 6,
  86	[RXRPC_ACK_EXCEEDS_WINDOW]	= 7,
  87	[RXRPC_ACK_NOSPACE]		= 8,
  88};
  89
  90/*
  91 * propose an ACK be sent
  92 */
  93void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
  94			 __be32 serial, bool immediate)
  95{
  96	unsigned long expiry;
  97	s8 prior = rxrpc_ack_priority[ack_reason];
  98
  99	ASSERTCMP(prior, >, 0);
 100
 101	_enter("{%d},%s,%%%x,%u",
 102	       call->debug_id, rxrpc_acks(ack_reason), ntohl(serial),
 103	       immediate);
 104
 105	if (prior < rxrpc_ack_priority[call->ackr_reason]) {
 106		if (immediate)
 107			goto cancel_timer;
 108		return;
 109	}
 110
 111	/* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
 112	 * numbers */
 113	if (prior == rxrpc_ack_priority[call->ackr_reason]) {
 114		if (prior <= 4)
 115			call->ackr_serial = serial;
 116		if (immediate)
 117			goto cancel_timer;
 118		return;
 119	}
 120
 121	call->ackr_reason = ack_reason;
 122	call->ackr_serial = serial;
 123
 124	switch (ack_reason) {
 125	case RXRPC_ACK_DELAY:
 126		_debug("run delay timer");
 127		expiry = rxrpc_soft_ack_delay;
 128		goto run_timer;
 129
 130	case RXRPC_ACK_IDLE:
 131		if (!immediate) {
 132			_debug("run defer timer");
 133			expiry = rxrpc_idle_ack_delay;
 134			goto run_timer;
 135		}
 136		goto cancel_timer;
 137
 138	case RXRPC_ACK_REQUESTED:
 139		expiry = rxrpc_requested_ack_delay;
 140		if (!expiry)
 141			goto cancel_timer;
 142		if (!immediate || serial == cpu_to_be32(1)) {
 143			_debug("run defer timer");
 144			goto run_timer;
 145		}
 146
 147	default:
 148		_debug("immediate ACK");
 149		goto cancel_timer;
 150	}
 151
 152run_timer:
 153	expiry += jiffies;
 154	if (!timer_pending(&call->ack_timer) ||
 155	    time_after(call->ack_timer.expires, expiry))
 156		mod_timer(&call->ack_timer, expiry);
 157	return;
 158
 159cancel_timer:
 160	_debug("cancel timer %%%u", ntohl(serial));
 161	try_to_del_timer_sync(&call->ack_timer);
 162	read_lock_bh(&call->state_lock);
 163	if (call->state <= RXRPC_CALL_COMPLETE &&
 164	    !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
 165		rxrpc_queue_call(call);
 166	read_unlock_bh(&call->state_lock);
 167}
 168
 169/*
 170 * propose an ACK be sent, locking the call structure
 171 */
 172void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
 173		       __be32 serial, bool immediate)
 174{
 175	s8 prior = rxrpc_ack_priority[ack_reason];
 176
 177	if (prior > rxrpc_ack_priority[call->ackr_reason]) {
 178		spin_lock_bh(&call->lock);
 179		__rxrpc_propose_ACK(call, ack_reason, serial, immediate);
 180		spin_unlock_bh(&call->lock);
 181	}
 182}
 183
 184/*
 185 * set the resend timer
 186 */
 187static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
 188			     unsigned long resend_at)
 189{
 190	read_lock_bh(&call->state_lock);
 191	if (call->state >= RXRPC_CALL_COMPLETE)
 192		resend = 0;
 193
 194	if (resend & 1) {
 195		_debug("SET RESEND");
 196		set_bit(RXRPC_CALL_RESEND, &call->events);
 197	}
 198
 199	if (resend & 2) {
 200		_debug("MODIFY RESEND TIMER");
 201		set_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 202		mod_timer(&call->resend_timer, resend_at);
 203	} else {
 204		_debug("KILL RESEND TIMER");
 205		del_timer_sync(&call->resend_timer);
 206		clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
 207		clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 208	}
 209	read_unlock_bh(&call->state_lock);
 210}
 211
 212/*
 213 * resend packets
 214 */
 215static void rxrpc_resend(struct rxrpc_call *call)
 216{
 217	struct rxrpc_skb_priv *sp;
 218	struct rxrpc_header *hdr;
 219	struct sk_buff *txb;
 220	unsigned long *p_txb, resend_at;
 221	int loop, stop;
 222	u8 resend;
 223
 224	_enter("{%d,%d,%d,%d},",
 225	       call->acks_hard, call->acks_unacked,
 226	       atomic_read(&call->sequence),
 227	       CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
 228
 229	stop = 0;
 230	resend = 0;
 231	resend_at = 0;
 232
 233	for (loop = call->acks_tail;
 234	     loop != call->acks_head || stop;
 235	     loop = (loop + 1) &  (call->acks_winsz - 1)
 236	     ) {
 237		p_txb = call->acks_window + loop;
 238		smp_read_barrier_depends();
 239		if (*p_txb & 1)
 240			continue;
 241
 242		txb = (struct sk_buff *) *p_txb;
 243		sp = rxrpc_skb(txb);
 244
 245		if (sp->need_resend) {
 246			sp->need_resend = false;
 247
 248			/* each Tx packet has a new serial number */
 249			sp->hdr.serial =
 250				htonl(atomic_inc_return(&call->conn->serial));
 251
 252			hdr = (struct rxrpc_header *) txb->head;
 253			hdr->serial = sp->hdr.serial;
 254
 255			_proto("Tx DATA %%%u { #%d }",
 256			       ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
 257			if (rxrpc_send_packet(call->conn->trans, txb) < 0) {
 258				stop = 0;
 259				sp->resend_at = jiffies + 3;
 260			} else {
 261				sp->resend_at =
 262					jiffies + rxrpc_resend_timeout * HZ;
 263			}
 264		}
 265
 266		if (time_after_eq(jiffies + 1, sp->resend_at)) {
 267			sp->need_resend = true;
 268			resend |= 1;
 269		} else if (resend & 2) {
 270			if (time_before(sp->resend_at, resend_at))
 271				resend_at = sp->resend_at;
 272		} else {
 273			resend_at = sp->resend_at;
 274			resend |= 2;
 275		}
 276	}
 277
 278	rxrpc_set_resend(call, resend, resend_at);
 279	_leave("");
 280}
 281
 282/*
 283 * handle resend timer expiry
 284 */
 285static void rxrpc_resend_timer(struct rxrpc_call *call)
 286{
 287	struct rxrpc_skb_priv *sp;
 288	struct sk_buff *txb;
 289	unsigned long *p_txb, resend_at;
 290	int loop;
 291	u8 resend;
 292
 293	_enter("%d,%d,%d",
 294	       call->acks_tail, call->acks_unacked, call->acks_head);
 295
 296	if (call->state >= RXRPC_CALL_COMPLETE)
 297		return;
 298
 299	resend = 0;
 300	resend_at = 0;
 301
 302	for (loop = call->acks_unacked;
 303	     loop != call->acks_head;
 304	     loop = (loop + 1) &  (call->acks_winsz - 1)
 305	     ) {
 306		p_txb = call->acks_window + loop;
 307		smp_read_barrier_depends();
 308		txb = (struct sk_buff *) (*p_txb & ~1);
 309		sp = rxrpc_skb(txb);
 310
 311		ASSERT(!(*p_txb & 1));
 312
 313		if (sp->need_resend) {
 314			;
 315		} else if (time_after_eq(jiffies + 1, sp->resend_at)) {
 316			sp->need_resend = true;
 317			resend |= 1;
 318		} else if (resend & 2) {
 319			if (time_before(sp->resend_at, resend_at))
 320				resend_at = sp->resend_at;
 321		} else {
 322			resend_at = sp->resend_at;
 323			resend |= 2;
 324		}
 325	}
 326
 327	rxrpc_set_resend(call, resend, resend_at);
 328	_leave("");
 329}
 330
 331/*
 332 * process soft ACKs of our transmitted packets
 333 * - these indicate packets the peer has or has not received, but hasn't yet
 334 *   given to the consumer, and so can still be discarded and re-requested
 335 */
 336static int rxrpc_process_soft_ACKs(struct rxrpc_call *call,
 337				   struct rxrpc_ackpacket *ack,
 338				   struct sk_buff *skb)
 339{
 340	struct rxrpc_skb_priv *sp;
 341	struct sk_buff *txb;
 342	unsigned long *p_txb, resend_at;
 343	int loop;
 344	u8 sacks[RXRPC_MAXACKS], resend;
 345
 346	_enter("{%d,%d},{%d},",
 347	       call->acks_hard,
 348	       CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz),
 349	       ack->nAcks);
 350
 351	if (skb_copy_bits(skb, 0, sacks, ack->nAcks) < 0)
 352		goto protocol_error;
 353
 354	resend = 0;
 355	resend_at = 0;
 356	for (loop = 0; loop < ack->nAcks; loop++) {
 357		p_txb = call->acks_window;
 358		p_txb += (call->acks_tail + loop) & (call->acks_winsz - 1);
 359		smp_read_barrier_depends();
 360		txb = (struct sk_buff *) (*p_txb & ~1);
 361		sp = rxrpc_skb(txb);
 362
 363		switch (sacks[loop]) {
 364		case RXRPC_ACK_TYPE_ACK:
 365			sp->need_resend = false;
 366			*p_txb |= 1;
 367			break;
 368		case RXRPC_ACK_TYPE_NACK:
 369			sp->need_resend = true;
 370			*p_txb &= ~1;
 371			resend = 1;
 372			break;
 373		default:
 374			_debug("Unsupported ACK type %d", sacks[loop]);
 375			goto protocol_error;
 376		}
 377	}
 378
 379	smp_mb();
 380	call->acks_unacked = (call->acks_tail + loop) & (call->acks_winsz - 1);
 381
 382	/* anything not explicitly ACK'd is implicitly NACK'd, but may just not
 383	 * have been received or processed yet by the far end */
 384	for (loop = call->acks_unacked;
 385	     loop != call->acks_head;
 386	     loop = (loop + 1) &  (call->acks_winsz - 1)
 387	     ) {
 388		p_txb = call->acks_window + loop;
 389		smp_read_barrier_depends();
 390		txb = (struct sk_buff *) (*p_txb & ~1);
 391		sp = rxrpc_skb(txb);
 392
 393		if (*p_txb & 1) {
 394			/* packet must have been discarded */
 395			sp->need_resend = true;
 396			*p_txb &= ~1;
 397			resend |= 1;
 398		} else if (sp->need_resend) {
 399			;
 400		} else if (time_after_eq(jiffies + 1, sp->resend_at)) {
 401			sp->need_resend = true;
 402			resend |= 1;
 403		} else if (resend & 2) {
 404			if (time_before(sp->resend_at, resend_at))
 405				resend_at = sp->resend_at;
 406		} else {
 407			resend_at = sp->resend_at;
 408			resend |= 2;
 409		}
 410	}
 411
 412	rxrpc_set_resend(call, resend, resend_at);
 413	_leave(" = 0");
 414	return 0;
 415
 416protocol_error:
 417	_leave(" = -EPROTO");
 418	return -EPROTO;
 419}
 420
 421/*
 422 * discard hard-ACK'd packets from the Tx window
 423 */
 424static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard)
 425{
 426	unsigned long _skb;
 427	int tail = call->acks_tail, old_tail;
 428	int win = CIRC_CNT(call->acks_head, tail, call->acks_winsz);
 429
 430	_enter("{%u,%u},%u", call->acks_hard, win, hard);
 431
 432	ASSERTCMP(hard - call->acks_hard, <=, win);
 433
 434	while (call->acks_hard < hard) {
 435		smp_read_barrier_depends();
 436		_skb = call->acks_window[tail] & ~1;
 437		rxrpc_free_skb((struct sk_buff *) _skb);
 438		old_tail = tail;
 439		tail = (tail + 1) & (call->acks_winsz - 1);
 440		call->acks_tail = tail;
 441		if (call->acks_unacked == old_tail)
 442			call->acks_unacked = tail;
 443		call->acks_hard++;
 444	}
 445
 446	wake_up(&call->tx_waitq);
 447}
 448
 449/*
 450 * clear the Tx window in the event of a failure
 451 */
 452static void rxrpc_clear_tx_window(struct rxrpc_call *call)
 453{
 454	rxrpc_rotate_tx_window(call, atomic_read(&call->sequence));
 455}
 456
 457/*
 458 * drain the out of sequence received packet queue into the packet Rx queue
 459 */
 460static int rxrpc_drain_rx_oos_queue(struct rxrpc_call *call)
 461{
 462	struct rxrpc_skb_priv *sp;
 463	struct sk_buff *skb;
 464	bool terminal;
 465	int ret;
 466
 467	_enter("{%d,%d}", call->rx_data_post, call->rx_first_oos);
 468
 469	spin_lock_bh(&call->lock);
 470
 471	ret = -ECONNRESET;
 472	if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
 473		goto socket_unavailable;
 474
 475	skb = skb_dequeue(&call->rx_oos_queue);
 476	if (skb) {
 477		sp = rxrpc_skb(skb);
 478
 479		_debug("drain OOS packet %d [%d]",
 480		       ntohl(sp->hdr.seq), call->rx_first_oos);
 481
 482		if (ntohl(sp->hdr.seq) != call->rx_first_oos) {
 483			skb_queue_head(&call->rx_oos_queue, skb);
 484			call->rx_first_oos = ntohl(rxrpc_skb(skb)->hdr.seq);
 485			_debug("requeue %p {%u}", skb, call->rx_first_oos);
 486		} else {
 487			skb->mark = RXRPC_SKB_MARK_DATA;
 488			terminal = ((sp->hdr.flags & RXRPC_LAST_PACKET) &&
 489				!(sp->hdr.flags & RXRPC_CLIENT_INITIATED));
 490			ret = rxrpc_queue_rcv_skb(call, skb, true, terminal);
 491			BUG_ON(ret < 0);
 492			_debug("drain #%u", call->rx_data_post);
 493			call->rx_data_post++;
 494
 495			/* find out what the next packet is */
 496			skb = skb_peek(&call->rx_oos_queue);
 497			if (skb)
 498				call->rx_first_oos =
 499					ntohl(rxrpc_skb(skb)->hdr.seq);
 500			else
 501				call->rx_first_oos = 0;
 502			_debug("peek %p {%u}", skb, call->rx_first_oos);
 503		}
 504	}
 505
 506	ret = 0;
 507socket_unavailable:
 508	spin_unlock_bh(&call->lock);
 509	_leave(" = %d", ret);
 510	return ret;
 511}
 512
 513/*
 514 * insert an out of sequence packet into the buffer
 515 */
 516static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
 517				    struct sk_buff *skb)
 518{
 519	struct rxrpc_skb_priv *sp, *psp;
 520	struct sk_buff *p;
 521	u32 seq;
 522
 523	sp = rxrpc_skb(skb);
 524	seq = ntohl(sp->hdr.seq);
 525	_enter(",,{%u}", seq);
 526
 527	skb->destructor = rxrpc_packet_destructor;
 528	ASSERTCMP(sp->call, ==, NULL);
 529	sp->call = call;
 530	rxrpc_get_call(call);
 531
 532	/* insert into the buffer in sequence order */
 533	spin_lock_bh(&call->lock);
 534
 535	skb_queue_walk(&call->rx_oos_queue, p) {
 536		psp = rxrpc_skb(p);
 537		if (ntohl(psp->hdr.seq) > seq) {
 538			_debug("insert oos #%u before #%u",
 539			       seq, ntohl(psp->hdr.seq));
 540			skb_insert(p, skb, &call->rx_oos_queue);
 541			goto inserted;
 542		}
 543	}
 544
 545	_debug("append oos #%u", seq);
 546	skb_queue_tail(&call->rx_oos_queue, skb);
 547inserted:
 548
 549	/* we might now have a new front to the queue */
 550	if (call->rx_first_oos == 0 || seq < call->rx_first_oos)
 551		call->rx_first_oos = seq;
 552
 553	read_lock(&call->state_lock);
 554	if (call->state < RXRPC_CALL_COMPLETE &&
 555	    call->rx_data_post == call->rx_first_oos) {
 556		_debug("drain rx oos now");
 557		set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
 558	}
 559	read_unlock(&call->state_lock);
 560
 561	spin_unlock_bh(&call->lock);
 562	_leave(" [stored #%u]", call->rx_first_oos);
 563}
 564
 565/*
 566 * clear the Tx window on final ACK reception
 567 */
 568static void rxrpc_zap_tx_window(struct rxrpc_call *call)
 569{
 570	struct rxrpc_skb_priv *sp;
 571	struct sk_buff *skb;
 572	unsigned long _skb, *acks_window;
 573	u8 winsz = call->acks_winsz;
 574	int tail;
 575
 576	acks_window = call->acks_window;
 577	call->acks_window = NULL;
 578
 579	while (CIRC_CNT(call->acks_head, call->acks_tail, winsz) > 0) {
 580		tail = call->acks_tail;
 581		smp_read_barrier_depends();
 582		_skb = acks_window[tail] & ~1;
 583		smp_mb();
 584		call->acks_tail = (call->acks_tail + 1) & (winsz - 1);
 585
 586		skb = (struct sk_buff *) _skb;
 587		sp = rxrpc_skb(skb);
 588		_debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
 589		rxrpc_free_skb(skb);
 590	}
 591
 592	kfree(acks_window);
 593}
 594
 595/*
 596 * process the extra information that may be appended to an ACK packet
 597 */
 598static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
 599				  unsigned int latest, int nAcks)
 600{
 601	struct rxrpc_ackinfo ackinfo;
 602	struct rxrpc_peer *peer;
 603	unsigned int mtu;
 604
 605	if (skb_copy_bits(skb, nAcks + 3, &ackinfo, sizeof(ackinfo)) < 0) {
 606		_leave(" [no ackinfo]");
 607		return;
 608	}
 609
 610	_proto("Rx ACK %%%u Info { rx=%u max=%u rwin=%u jm=%u }",
 611	       latest,
 612	       ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU),
 613	       ntohl(ackinfo.rwind), ntohl(ackinfo.jumbo_max));
 614
 615	mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU));
 616
 617	peer = call->conn->trans->peer;
 618	if (mtu < peer->maxdata) {
 619		spin_lock_bh(&peer->lock);
 620		peer->maxdata = mtu;
 621		peer->mtu = mtu + peer->hdrsize;
 622		spin_unlock_bh(&peer->lock);
 623		_net("Net MTU %u (maxdata %u)", peer->mtu, peer->maxdata);
 624	}
 625}
 626
 627/*
 628 * process packets in the reception queue
 629 */
 630static int rxrpc_process_rx_queue(struct rxrpc_call *call,
 631				  u32 *_abort_code)
 632{
 633	struct rxrpc_ackpacket ack;
 634	struct rxrpc_skb_priv *sp;
 635	struct sk_buff *skb;
 636	bool post_ACK;
 637	int latest;
 638	u32 hard, tx;
 639
 640	_enter("");
 641
 642process_further:
 643	skb = skb_dequeue(&call->rx_queue);
 644	if (!skb)
 645		return -EAGAIN;
 646
 647	_net("deferred skb %p", skb);
 648
 649	sp = rxrpc_skb(skb);
 650
 651	_debug("process %s [st %d]", rxrpc_pkts[sp->hdr.type], call->state);
 652
 653	post_ACK = false;
 654
 655	switch (sp->hdr.type) {
 656		/* data packets that wind up here have been received out of
 657		 * order, need security processing or are jumbo packets */
 658	case RXRPC_PACKET_TYPE_DATA:
 659		_proto("OOSQ DATA %%%u { #%u }",
 660		       ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
 661
 662		/* secured packets must be verified and possibly decrypted */
 663		if (rxrpc_verify_packet(call, skb, _abort_code) < 0)
 664			goto protocol_error;
 665
 666		rxrpc_insert_oos_packet(call, skb);
 667		goto process_further;
 668
 669		/* partial ACK to process */
 670	case RXRPC_PACKET_TYPE_ACK:
 671		if (skb_copy_bits(skb, 0, &ack, sizeof(ack)) < 0) {
 672			_debug("extraction failure");
 673			goto protocol_error;
 674		}
 675		if (!skb_pull(skb, sizeof(ack)))
 676			BUG();
 677
 678		latest = ntohl(sp->hdr.serial);
 679		hard = ntohl(ack.firstPacket);
 680		tx = atomic_read(&call->sequence);
 681
 682		_proto("Rx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
 683		       latest,
 684		       ntohs(ack.maxSkew),
 685		       hard,
 686		       ntohl(ack.previousPacket),
 687		       ntohl(ack.serial),
 688		       rxrpc_acks(ack.reason),
 689		       ack.nAcks);
 690
 691		rxrpc_extract_ackinfo(call, skb, latest, ack.nAcks);
 692
 693		if (ack.reason == RXRPC_ACK_PING) {
 694			_proto("Rx ACK %%%u PING Request", latest);
 695			rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
 696					  sp->hdr.serial, true);
 697		}
 698
 699		/* discard any out-of-order or duplicate ACKs */
 700		if (latest - call->acks_latest <= 0) {
 701			_debug("discard ACK %d <= %d",
 702			       latest, call->acks_latest);
 703			goto discard;
 704		}
 705		call->acks_latest = latest;
 706
 707		if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
 708		    call->state != RXRPC_CALL_CLIENT_AWAIT_REPLY &&
 709		    call->state != RXRPC_CALL_SERVER_SEND_REPLY &&
 710		    call->state != RXRPC_CALL_SERVER_AWAIT_ACK)
 711			goto discard;
 712
 713		_debug("Tx=%d H=%u S=%d", tx, call->acks_hard, call->state);
 714
 715		if (hard > 0) {
 716			if (hard - 1 > tx) {
 717				_debug("hard-ACK'd packet %d not transmitted"
 718				       " (%d top)",
 719				       hard - 1, tx);
 720				goto protocol_error;
 721			}
 722
 723			if ((call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY ||
 724			     call->state == RXRPC_CALL_SERVER_AWAIT_ACK) &&
 725			    hard > tx)
 726				goto all_acked;
 727
 728			smp_rmb();
 729			rxrpc_rotate_tx_window(call, hard - 1);
 730		}
 731
 732		if (ack.nAcks > 0) {
 733			if (hard - 1 + ack.nAcks > tx) {
 734				_debug("soft-ACK'd packet %d+%d not"
 735				       " transmitted (%d top)",
 736				       hard - 1, ack.nAcks, tx);
 737				goto protocol_error;
 738			}
 739
 740			if (rxrpc_process_soft_ACKs(call, &ack, skb) < 0)
 741				goto protocol_error;
 742		}
 743		goto discard;
 744
 745		/* complete ACK to process */
 746	case RXRPC_PACKET_TYPE_ACKALL:
 747		goto all_acked;
 748
 749		/* abort and busy are handled elsewhere */
 750	case RXRPC_PACKET_TYPE_BUSY:
 751	case RXRPC_PACKET_TYPE_ABORT:
 752		BUG();
 753
 754		/* connection level events - also handled elsewhere */
 755	case RXRPC_PACKET_TYPE_CHALLENGE:
 756	case RXRPC_PACKET_TYPE_RESPONSE:
 757	case RXRPC_PACKET_TYPE_DEBUG:
 758		BUG();
 759	}
 760
 761	/* if we've had a hard ACK that covers all the packets we've sent, then
 762	 * that ends that phase of the operation */
 763all_acked:
 764	write_lock_bh(&call->state_lock);
 765	_debug("ack all %d", call->state);
 766
 767	switch (call->state) {
 768	case RXRPC_CALL_CLIENT_AWAIT_REPLY:
 769		call->state = RXRPC_CALL_CLIENT_RECV_REPLY;
 770		break;
 771	case RXRPC_CALL_SERVER_AWAIT_ACK:
 772		_debug("srv complete");
 773		call->state = RXRPC_CALL_COMPLETE;
 774		post_ACK = true;
 775		break;
 776	case RXRPC_CALL_CLIENT_SEND_REQUEST:
 777	case RXRPC_CALL_SERVER_RECV_REQUEST:
 778		goto protocol_error_unlock; /* can't occur yet */
 779	default:
 780		write_unlock_bh(&call->state_lock);
 781		goto discard; /* assume packet left over from earlier phase */
 782	}
 783
 784	write_unlock_bh(&call->state_lock);
 785
 786	/* if all the packets we sent are hard-ACK'd, then we can discard
 787	 * whatever we've got left */
 788	_debug("clear Tx %d",
 789	       CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
 790
 791	del_timer_sync(&call->resend_timer);
 792	clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 793	clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
 794
 795	if (call->acks_window)
 796		rxrpc_zap_tx_window(call);
 797
 798	if (post_ACK) {
 799		/* post the final ACK message for userspace to pick up */
 800		_debug("post ACK");
 801		skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
 802		sp->call = call;
 803		rxrpc_get_call(call);
 804		spin_lock_bh(&call->lock);
 805		if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
 806			BUG();
 807		spin_unlock_bh(&call->lock);
 808		goto process_further;
 809	}
 810
 811discard:
 812	rxrpc_free_skb(skb);
 813	goto process_further;
 814
 815protocol_error_unlock:
 816	write_unlock_bh(&call->state_lock);
 817protocol_error:
 818	rxrpc_free_skb(skb);
 819	_leave(" = -EPROTO");
 820	return -EPROTO;
 821}
 822
 823/*
 824 * post a message to the socket Rx queue for recvmsg() to pick up
 825 */
 826static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
 827			      bool fatal)
 828{
 829	struct rxrpc_skb_priv *sp;
 830	struct sk_buff *skb;
 831	int ret;
 832
 833	_enter("{%d,%lx},%u,%u,%d",
 834	       call->debug_id, call->flags, mark, error, fatal);
 835
 836	/* remove timers and things for fatal messages */
 837	if (fatal) {
 838		del_timer_sync(&call->resend_timer);
 839		del_timer_sync(&call->ack_timer);
 840		clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 841	}
 842
 843	if (mark != RXRPC_SKB_MARK_NEW_CALL &&
 844	    !test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
 845		_leave("[no userid]");
 846		return 0;
 847	}
 848
 849	if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags)) {
 850		skb = alloc_skb(0, GFP_NOFS);
 851		if (!skb)
 852			return -ENOMEM;
 853
 854		rxrpc_new_skb(skb);
 855
 856		skb->mark = mark;
 857
 858		sp = rxrpc_skb(skb);
 859		memset(sp, 0, sizeof(*sp));
 860		sp->error = error;
 861		sp->call = call;
 862		rxrpc_get_call(call);
 863
 864		spin_lock_bh(&call->lock);
 865		ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
 866		spin_unlock_bh(&call->lock);
 867		BUG_ON(ret < 0);
 868	}
 869
 870	return 0;
 871}
 872
 873/*
 874 * handle background processing of incoming call packets and ACK / abort
 875 * generation
 876 */
 877void rxrpc_process_call(struct work_struct *work)
 878{
 879	struct rxrpc_call *call =
 880		container_of(work, struct rxrpc_call, processor);
 881	struct rxrpc_ackpacket ack;
 882	struct rxrpc_ackinfo ackinfo;
 883	struct rxrpc_header hdr;
 884	struct msghdr msg;
 885	struct kvec iov[5];
 886	unsigned long bits;
 887	__be32 data, pad;
 888	size_t len;
 889	int genbit, loop, nbit, ioc, ret, mtu;
 890	u32 abort_code = RX_PROTOCOL_ERROR;
 891	u8 *acks = NULL;
 892
 893	//printk("\n--------------------\n");
 894	_enter("{%d,%s,%lx} [%lu]",
 895	       call->debug_id, rxrpc_call_states[call->state], call->events,
 896	       (jiffies - call->creation_jif) / (HZ / 10));
 897
 898	if (test_and_set_bit(RXRPC_CALL_PROC_BUSY, &call->flags)) {
 899		_debug("XXXXXXXXXXXXX RUNNING ON MULTIPLE CPUS XXXXXXXXXXXXX");
 900		return;
 901	}
 902
 903	/* there's a good chance we're going to have to send a message, so set
 904	 * one up in advance */
 905	msg.msg_name	= &call->conn->trans->peer->srx.transport.sin;
 906	msg.msg_namelen	= sizeof(call->conn->trans->peer->srx.transport.sin);
 907	msg.msg_control	= NULL;
 908	msg.msg_controllen = 0;
 909	msg.msg_flags	= 0;
 910
 911	hdr.epoch	= call->conn->epoch;
 912	hdr.cid		= call->cid;
 913	hdr.callNumber	= call->call_id;
 914	hdr.seq		= 0;
 915	hdr.type	= RXRPC_PACKET_TYPE_ACK;
 916	hdr.flags	= call->conn->out_clientflag;
 917	hdr.userStatus	= 0;
 918	hdr.securityIndex = call->conn->security_ix;
 919	hdr._rsvd	= 0;
 920	hdr.serviceId	= call->conn->service_id;
 921
 922	memset(iov, 0, sizeof(iov));
 923	iov[0].iov_base	= &hdr;
 924	iov[0].iov_len	= sizeof(hdr);
 925
 926	/* deal with events of a final nature */
 927	if (test_bit(RXRPC_CALL_RELEASE, &call->events)) {
 928		rxrpc_release_call(call);
 929		clear_bit(RXRPC_CALL_RELEASE, &call->events);
 930	}
 931
 932	if (test_bit(RXRPC_CALL_RCVD_ERROR, &call->events)) {
 933		int error;
 934
 935		clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
 936		clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
 937		clear_bit(RXRPC_CALL_ABORT, &call->events);
 938
 939		error = call->conn->trans->peer->net_error;
 940		_debug("post net error %d", error);
 941
 942		if (rxrpc_post_message(call, RXRPC_SKB_MARK_NET_ERROR,
 943				       error, true) < 0)
 944			goto no_mem;
 945		clear_bit(RXRPC_CALL_RCVD_ERROR, &call->events);
 946		goto kill_ACKs;
 947	}
 948
 949	if (test_bit(RXRPC_CALL_CONN_ABORT, &call->events)) {
 950		ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
 951
 952		clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
 953		clear_bit(RXRPC_CALL_ABORT, &call->events);
 954
 955		_debug("post conn abort");
 956
 957		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 958				       call->conn->error, true) < 0)
 959			goto no_mem;
 960		clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
 961		goto kill_ACKs;
 962	}
 963
 964	if (test_bit(RXRPC_CALL_REJECT_BUSY, &call->events)) {
 965		hdr.type = RXRPC_PACKET_TYPE_BUSY;
 966		genbit = RXRPC_CALL_REJECT_BUSY;
 967		goto send_message;
 968	}
 969
 970	if (test_bit(RXRPC_CALL_ABORT, &call->events)) {
 971		ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
 972
 973		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 974				       ECONNABORTED, true) < 0)
 975			goto no_mem;
 976		hdr.type = RXRPC_PACKET_TYPE_ABORT;
 977		data = htonl(call->abort_code);
 978		iov[1].iov_base = &data;
 979		iov[1].iov_len = sizeof(data);
 980		genbit = RXRPC_CALL_ABORT;
 981		goto send_message;
 982	}
 983
 984	if (test_bit(RXRPC_CALL_ACK_FINAL, &call->events)) {
 985		genbit = RXRPC_CALL_ACK_FINAL;
 986
 987		ack.bufferSpace	= htons(8);
 988		ack.maxSkew	= 0;
 989		ack.serial	= 0;
 990		ack.reason	= RXRPC_ACK_IDLE;
 991		ack.nAcks	= 0;
 992		call->ackr_reason = 0;
 993
 994		spin_lock_bh(&call->lock);
 995		ack.serial = call->ackr_serial;
 996		ack.previousPacket = call->ackr_prev_seq;
 997		ack.firstPacket = htonl(call->rx_data_eaten + 1);
 998		spin_unlock_bh(&call->lock);
 999
1000		pad = 0;
1001
1002		iov[1].iov_base = &ack;
1003		iov[1].iov_len	= sizeof(ack);
1004		iov[2].iov_base = &pad;
1005		iov[2].iov_len	= 3;
1006		iov[3].iov_base = &ackinfo;
1007		iov[3].iov_len	= sizeof(ackinfo);
1008		goto send_ACK;
1009	}
1010
1011	if (call->events & ((1 << RXRPC_CALL_RCVD_BUSY) |
1012			    (1 << RXRPC_CALL_RCVD_ABORT))
1013	    ) {
1014		u32 mark;
1015
1016		if (test_bit(RXRPC_CALL_RCVD_ABORT, &call->events))
1017			mark = RXRPC_SKB_MARK_REMOTE_ABORT;
1018		else
1019			mark = RXRPC_SKB_MARK_BUSY;
1020
1021		_debug("post abort/busy");
1022		rxrpc_clear_tx_window(call);
1023		if (rxrpc_post_message(call, mark, ECONNABORTED, true) < 0)
1024			goto no_mem;
1025
1026		clear_bit(RXRPC_CALL_RCVD_BUSY, &call->events);
1027		clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
1028		goto kill_ACKs;
1029	}
1030
1031	if (test_and_clear_bit(RXRPC_CALL_RCVD_ACKALL, &call->events)) {
1032		_debug("do implicit ackall");
1033		rxrpc_clear_tx_window(call);
1034	}
1035
1036	if (test_bit(RXRPC_CALL_LIFE_TIMER, &call->events)) {
1037		write_lock_bh(&call->state_lock);
1038		if (call->state <= RXRPC_CALL_COMPLETE) {
1039			call->state = RXRPC_CALL_LOCALLY_ABORTED;
1040			call->abort_code = RX_CALL_TIMEOUT;
1041			set_bit(RXRPC_CALL_ABORT, &call->events);
1042		}
1043		write_unlock_bh(&call->state_lock);
1044
1045		_debug("post timeout");
1046		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
1047				       ETIME, true) < 0)
1048			goto no_mem;
1049
1050		clear_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
1051		goto kill_ACKs;
1052	}
1053
1054	/* deal with assorted inbound messages */
1055	if (!skb_queue_empty(&call->rx_queue)) {
1056		switch (rxrpc_process_rx_queue(call, &abort_code)) {
1057		case 0:
1058		case -EAGAIN:
1059			break;
1060		case -ENOMEM:
1061			goto no_mem;
1062		case -EKEYEXPIRED:
1063		case -EKEYREJECTED:
1064		case -EPROTO:
1065			rxrpc_abort_call(call, abort_code);
1066			goto kill_ACKs;
1067		}
1068	}
1069
1070	/* handle resending */
1071	if (test_and_clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
1072		rxrpc_resend_timer(call);
1073	if (test_and_clear_bit(RXRPC_CALL_RESEND, &call->events))
1074		rxrpc_resend(call);
1075
1076	/* consider sending an ordinary ACK */
1077	if (test_bit(RXRPC_CALL_ACK, &call->events)) {
1078		_debug("send ACK: window: %d - %d { %lx }",
1079		       call->rx_data_eaten, call->ackr_win_top,
1080		       call->ackr_window[0]);
1081
1082		if (call->state > RXRPC_CALL_SERVER_ACK_REQUEST &&
1083		    call->ackr_reason != RXRPC_ACK_PING_RESPONSE) {
1084			/* ACK by sending reply DATA packet in this state */
1085			clear_bit(RXRPC_CALL_ACK, &call->events);
1086			goto maybe_reschedule;
1087		}
1088
1089		genbit = RXRPC_CALL_ACK;
1090
1091		acks = kzalloc(call->ackr_win_top - call->rx_data_eaten,
1092			       GFP_NOFS);
1093		if (!acks)
1094			goto no_mem;
1095
1096		//hdr.flags	= RXRPC_SLOW_START_OK;
1097		ack.bufferSpace	= htons(8);
1098		ack.maxSkew	= 0;
1099		ack.serial	= 0;
1100		ack.reason	= 0;
1101
1102		spin_lock_bh(&call->lock);
1103		ack.reason = call->ackr_reason;
1104		ack.serial = call->ackr_serial;
1105		ack.previousPacket = call->ackr_prev_seq;
1106		ack.firstPacket = htonl(call->rx_data_eaten + 1);
1107
1108		ack.nAcks = 0;
1109		for (loop = 0; loop < RXRPC_ACKR_WINDOW_ASZ; loop++) {
1110			nbit = loop * BITS_PER_LONG;
1111			for (bits = call->ackr_window[loop]; bits; bits >>= 1
1112			     ) {
1113				_debug("- l=%d n=%d b=%lx", loop, nbit, bits);
1114				if (bits & 1) {
1115					acks[nbit] = RXRPC_ACK_TYPE_ACK;
1116					ack.nAcks = nbit + 1;
1117				}
1118				nbit++;
1119			}
1120		}
1121		call->ackr_reason = 0;
1122		spin_unlock_bh(&call->lock);
1123
1124		pad = 0;
1125
1126		iov[1].iov_base = &ack;
1127		iov[1].iov_len	= sizeof(ack);
1128		iov[2].iov_base = acks;
1129		iov[2].iov_len	= ack.nAcks;
1130		iov[3].iov_base = &pad;
1131		iov[3].iov_len	= 3;
1132		iov[4].iov_base = &ackinfo;
1133		iov[4].iov_len	= sizeof(ackinfo);
1134
1135		switch (ack.reason) {
1136		case RXRPC_ACK_REQUESTED:
1137		case RXRPC_ACK_DUPLICATE:
1138		case RXRPC_ACK_OUT_OF_SEQUENCE:
1139		case RXRPC_ACK_EXCEEDS_WINDOW:
1140		case RXRPC_ACK_NOSPACE:
1141		case RXRPC_ACK_PING:
1142		case RXRPC_ACK_PING_RESPONSE:
1143			goto send_ACK_with_skew;
1144		case RXRPC_ACK_DELAY:
1145		case RXRPC_ACK_IDLE:
1146			goto send_ACK;
1147		}
1148	}
1149
1150	/* handle completion of security negotiations on an incoming
1151	 * connection */
1152	if (test_and_clear_bit(RXRPC_CALL_SECURED, &call->events)) {
1153		_debug("secured");
1154		spin_lock_bh(&call->lock);
1155
1156		if (call->state == RXRPC_CALL_SERVER_SECURING) {
1157			_debug("securing");
1158			write_lock(&call->conn->lock);
1159			if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1160			    !test_bit(RXRPC_CALL_RELEASE, &call->events)) {
1161				_debug("not released");
1162				call->state = RXRPC_CALL_SERVER_ACCEPTING;
1163				list_move_tail(&call->accept_link,
1164					       &call->socket->acceptq);
1165			}
1166			write_unlock(&call->conn->lock);
1167			read_lock(&call->state_lock);
1168			if (call->state < RXRPC_CALL_COMPLETE)
1169				set_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1170			read_unlock(&call->state_lock);
1171		}
1172
1173		spin_unlock_bh(&call->lock);
1174		if (!test_bit(RXRPC_CALL_POST_ACCEPT, &call->events))
1175			goto maybe_reschedule;
1176	}
1177
1178	/* post a notification of an acceptable connection to the app */
1179	if (test_bit(RXRPC_CALL_POST_ACCEPT, &call->events)) {
1180		_debug("post accept");
1181		if (rxrpc_post_message(call, RXRPC_SKB_MARK_NEW_CALL,
1182				       0, false) < 0)
1183			goto no_mem;
1184		clear_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1185		goto maybe_reschedule;
1186	}
1187
1188	/* handle incoming call acceptance */
1189	if (test_and_clear_bit(RXRPC_CALL_ACCEPTED, &call->events)) {
1190		_debug("accepted");
1191		ASSERTCMP(call->rx_data_post, ==, 0);
1192		call->rx_data_post = 1;
1193		read_lock_bh(&call->state_lock);
1194		if (call->state < RXRPC_CALL_COMPLETE)
1195			set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
1196		read_unlock_bh(&call->state_lock);
1197	}
1198
1199	/* drain the out of sequence received packet queue into the packet Rx
1200	 * queue */
1201	if (test_and_clear_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events)) {
1202		while (call->rx_data_post == call->rx_first_oos)
1203			if (rxrpc_drain_rx_oos_queue(call) < 0)
1204				break;
1205		goto maybe_reschedule;
1206	}
1207
1208	/* other events may have been raised since we started checking */
1209	goto maybe_reschedule;
1210
1211send_ACK_with_skew:
1212	ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) -
1213			    ntohl(ack.serial));
1214send_ACK:
1215	mtu = call->conn->trans->peer->if_mtu;
1216	mtu -= call->conn->trans->peer->hdrsize;
1217	ackinfo.maxMTU	= htonl(mtu);
1218	ackinfo.rwind	= htonl(rxrpc_rx_window_size);
1219
1220	/* permit the peer to send us jumbo packets if it wants to */
1221	ackinfo.rxMTU	= htonl(rxrpc_rx_mtu);
1222	ackinfo.jumbo_max = htonl(rxrpc_rx_jumbo_max);
1223
1224	hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1225	_proto("Tx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
1226	       ntohl(hdr.serial),
1227	       ntohs(ack.maxSkew),
1228	       ntohl(ack.firstPacket),
1229	       ntohl(ack.previousPacket),
1230	       ntohl(ack.serial),
1231	       rxrpc_acks(ack.reason),
1232	       ack.nAcks);
1233
1234	del_timer_sync(&call->ack_timer);
1235	if (ack.nAcks > 0)
1236		set_bit(RXRPC_CALL_TX_SOFT_ACK, &call->flags);
1237	goto send_message_2;
1238
1239send_message:
1240	_debug("send message");
1241
1242	hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1243	_proto("Tx %s %%%u", rxrpc_pkts[hdr.type], ntohl(hdr.serial));
1244send_message_2:
1245
1246	len = iov[0].iov_len;
1247	ioc = 1;
1248	if (iov[4].iov_len) {
1249		ioc = 5;
1250		len += iov[4].iov_len;
1251		len += iov[3].iov_len;
1252		len += iov[2].iov_len;
1253		len += iov[1].iov_len;
1254	} else if (iov[3].iov_len) {
1255		ioc = 4;
1256		len += iov[3].iov_len;
1257		len += iov[2].iov_len;
1258		len += iov[1].iov_len;
1259	} else if (iov[2].iov_len) {
1260		ioc = 3;
1261		len += iov[2].iov_len;
1262		len += iov[1].iov_len;
1263	} else if (iov[1].iov_len) {
1264		ioc = 2;
1265		len += iov[1].iov_len;
1266	}
1267
1268	ret = kernel_sendmsg(call->conn->trans->local->socket,
1269			     &msg, iov, ioc, len);
1270	if (ret < 0) {
1271		_debug("sendmsg failed: %d", ret);
1272		read_lock_bh(&call->state_lock);
1273		if (call->state < RXRPC_CALL_DEAD)
1274			rxrpc_queue_call(call);
1275		read_unlock_bh(&call->state_lock);
1276		goto error;
1277	}
1278
1279	switch (genbit) {
1280	case RXRPC_CALL_ABORT:
1281		clear_bit(genbit, &call->events);
1282		clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
1283		goto kill_ACKs;
1284
1285	case RXRPC_CALL_ACK_FINAL:
1286		write_lock_bh(&call->state_lock);
1287		if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
1288			call->state = RXRPC_CALL_COMPLETE;
1289		write_unlock_bh(&call->state_lock);
1290		goto kill_ACKs;
1291
1292	default:
1293		clear_bit(genbit, &call->events);
1294		switch (call->state) {
1295		case RXRPC_CALL_CLIENT_AWAIT_REPLY:
1296		case RXRPC_CALL_CLIENT_RECV_REPLY:
1297		case RXRPC_CALL_SERVER_RECV_REQUEST:
1298		case RXRPC_CALL_SERVER_ACK_REQUEST:
1299			_debug("start ACK timer");
1300			rxrpc_propose_ACK(call, RXRPC_ACK_DELAY,
1301					  call->ackr_serial, false);
1302		default:
1303			break;
1304		}
1305		goto maybe_reschedule;
1306	}
1307
1308kill_ACKs:
1309	del_timer_sync(&call->ack_timer);
1310	if (test_and_clear_bit(RXRPC_CALL_ACK_FINAL, &call->events))
1311		rxrpc_put_call(call);
1312	clear_bit(RXRPC_CALL_ACK, &call->events);
1313
1314maybe_reschedule:
1315	if (call->events || !skb_queue_empty(&call->rx_queue)) {
1316		read_lock_bh(&call->state_lock);
1317		if (call->state < RXRPC_CALL_DEAD)
1318			rxrpc_queue_call(call);
1319		read_unlock_bh(&call->state_lock);
1320	}
1321
1322	/* don't leave aborted connections on the accept queue */
1323	if (call->state >= RXRPC_CALL_COMPLETE &&
1324	    !list_empty(&call->accept_link)) {
1325		_debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
1326		       call, call->events, call->flags,
1327		       ntohl(call->conn->cid));
1328
1329		read_lock_bh(&call->state_lock);
1330		if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1331		    !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
1332			rxrpc_queue_call(call);
1333		read_unlock_bh(&call->state_lock);
1334	}
1335
1336error:
1337	clear_bit(RXRPC_CALL_PROC_BUSY, &call->flags);
1338	kfree(acks);
1339
1340	/* because we don't want two CPUs both processing the work item for one
1341	 * call at the same time, we use a flag to note when it's busy; however
1342	 * this means there's a race between clearing the flag and setting the
1343	 * work pending bit and the work item being processed again */
1344	if (call->events && !work_pending(&call->processor)) {
1345		_debug("jumpstart %x", ntohl(call->conn->cid));
1346		rxrpc_queue_call(call);
1347	}
1348
1349	_leave("");
1350	return;
1351
1352no_mem:
1353	_debug("out of memory");
1354	goto maybe_reschedule;
1355}