Linux Audio

Check our new training course

Loading...
Note: File does not exist in v6.2.
   1/* RxRPC individual remote procedure call handling
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/slab.h>
  13#include <linux/module.h>
  14#include <linux/circ_buf.h>
  15#include <linux/hashtable.h>
  16#include <linux/spinlock_types.h>
  17#include <net/sock.h>
  18#include <net/af_rxrpc.h>
  19#include "ar-internal.h"
  20
  21/*
  22 * Maximum lifetime of a call (in jiffies).
  23 */
  24unsigned rxrpc_max_call_lifetime = 60 * HZ;
  25
  26/*
  27 * Time till dead call expires after last use (in jiffies).
  28 */
  29unsigned rxrpc_dead_call_expiry = 2 * HZ;
  30
  31const char *const rxrpc_call_states[] = {
  32	[RXRPC_CALL_CLIENT_SEND_REQUEST]	= "ClSndReq",
  33	[RXRPC_CALL_CLIENT_AWAIT_REPLY]		= "ClAwtRpl",
  34	[RXRPC_CALL_CLIENT_RECV_REPLY]		= "ClRcvRpl",
  35	[RXRPC_CALL_CLIENT_FINAL_ACK]		= "ClFnlACK",
  36	[RXRPC_CALL_SERVER_SECURING]		= "SvSecure",
  37	[RXRPC_CALL_SERVER_ACCEPTING]		= "SvAccept",
  38	[RXRPC_CALL_SERVER_RECV_REQUEST]	= "SvRcvReq",
  39	[RXRPC_CALL_SERVER_ACK_REQUEST]		= "SvAckReq",
  40	[RXRPC_CALL_SERVER_SEND_REPLY]		= "SvSndRpl",
  41	[RXRPC_CALL_SERVER_AWAIT_ACK]		= "SvAwtACK",
  42	[RXRPC_CALL_COMPLETE]			= "Complete",
  43	[RXRPC_CALL_SERVER_BUSY]		= "SvBusy  ",
  44	[RXRPC_CALL_REMOTELY_ABORTED]		= "RmtAbort",
  45	[RXRPC_CALL_LOCALLY_ABORTED]		= "LocAbort",
  46	[RXRPC_CALL_NETWORK_ERROR]		= "NetError",
  47	[RXRPC_CALL_DEAD]			= "Dead    ",
  48};
  49
  50struct kmem_cache *rxrpc_call_jar;
  51LIST_HEAD(rxrpc_calls);
  52DEFINE_RWLOCK(rxrpc_call_lock);
  53
  54static void rxrpc_destroy_call(struct work_struct *work);
  55static void rxrpc_call_life_expired(unsigned long _call);
  56static void rxrpc_dead_call_expired(unsigned long _call);
  57static void rxrpc_ack_time_expired(unsigned long _call);
  58static void rxrpc_resend_time_expired(unsigned long _call);
  59
  60static DEFINE_SPINLOCK(rxrpc_call_hash_lock);
  61static DEFINE_HASHTABLE(rxrpc_call_hash, 10);
  62
  63/*
  64 * Hash function for rxrpc_call_hash
  65 */
  66static unsigned long rxrpc_call_hashfunc(
  67	u8		clientflag,
  68	__be32		cid,
  69	__be32		call_id,
  70	__be32		epoch,
  71	__be16		service_id,
  72	sa_family_t	proto,
  73	void		*localptr,
  74	unsigned int	addr_size,
  75	const u8	*peer_addr)
  76{
  77	const u16 *p;
  78	unsigned int i;
  79	unsigned long key;
  80	u32 hcid = ntohl(cid);
  81
  82	_enter("");
  83
  84	key = (unsigned long)localptr;
  85	/* We just want to add up the __be32 values, so forcing the
  86	 * cast should be okay.
  87	 */
  88	key += (__force u32)epoch;
  89	key += (__force u16)service_id;
  90	key += (__force u32)call_id;
  91	key += (hcid & RXRPC_CIDMASK) >> RXRPC_CIDSHIFT;
  92	key += hcid & RXRPC_CHANNELMASK;
  93	key += clientflag;
  94	key += proto;
  95	/* Step through the peer address in 16-bit portions for speed */
  96	for (i = 0, p = (const u16 *)peer_addr; i < addr_size >> 1; i++, p++)
  97		key += *p;
  98	_leave(" key = 0x%lx", key);
  99	return key;
 100}
 101
 102/*
 103 * Add a call to the hashtable
 104 */
 105static void rxrpc_call_hash_add(struct rxrpc_call *call)
 106{
 107	unsigned long key;
 108	unsigned int addr_size = 0;
 109
 110	_enter("");
 111	switch (call->proto) {
 112	case AF_INET:
 113		addr_size = sizeof(call->peer_ip.ipv4_addr);
 114		break;
 115	case AF_INET6:
 116		addr_size = sizeof(call->peer_ip.ipv6_addr);
 117		break;
 118	default:
 119		break;
 120	}
 121	key = rxrpc_call_hashfunc(call->in_clientflag, call->cid,
 122				  call->call_id, call->epoch,
 123				  call->service_id, call->proto,
 124				  call->conn->trans->local, addr_size,
 125				  call->peer_ip.ipv6_addr);
 126	/* Store the full key in the call */
 127	call->hash_key = key;
 128	spin_lock(&rxrpc_call_hash_lock);
 129	hash_add_rcu(rxrpc_call_hash, &call->hash_node, key);
 130	spin_unlock(&rxrpc_call_hash_lock);
 131	_leave("");
 132}
 133
 134/*
 135 * Remove a call from the hashtable
 136 */
 137static void rxrpc_call_hash_del(struct rxrpc_call *call)
 138{
 139	_enter("");
 140	spin_lock(&rxrpc_call_hash_lock);
 141	hash_del_rcu(&call->hash_node);
 142	spin_unlock(&rxrpc_call_hash_lock);
 143	_leave("");
 144}
 145
 146/*
 147 * Find a call in the hashtable and return it, or NULL if it
 148 * isn't there.
 149 */
 150struct rxrpc_call *rxrpc_find_call_hash(
 151	u8		clientflag,
 152	__be32		cid,
 153	__be32		call_id,
 154	__be32		epoch,
 155	__be16		service_id,
 156	void		*localptr,
 157	sa_family_t	proto,
 158	const u8	*peer_addr)
 159{
 160	unsigned long key;
 161	unsigned int addr_size = 0;
 162	struct rxrpc_call *call = NULL;
 163	struct rxrpc_call *ret = NULL;
 164
 165	_enter("");
 166	switch (proto) {
 167	case AF_INET:
 168		addr_size = sizeof(call->peer_ip.ipv4_addr);
 169		break;
 170	case AF_INET6:
 171		addr_size = sizeof(call->peer_ip.ipv6_addr);
 172		break;
 173	default:
 174		break;
 175	}
 176
 177	key = rxrpc_call_hashfunc(clientflag, cid, call_id, epoch,
 178				  service_id, proto, localptr, addr_size,
 179				  peer_addr);
 180	hash_for_each_possible_rcu(rxrpc_call_hash, call, hash_node, key) {
 181		if (call->hash_key == key &&
 182		    call->call_id == call_id &&
 183		    call->cid == cid &&
 184		    call->in_clientflag == clientflag &&
 185		    call->service_id == service_id &&
 186		    call->proto == proto &&
 187		    call->local == localptr &&
 188		    memcmp(call->peer_ip.ipv6_addr, peer_addr,
 189			      addr_size) == 0 &&
 190		    call->epoch == epoch) {
 191			ret = call;
 192			break;
 193		}
 194	}
 195	_leave(" = %p", ret);
 196	return ret;
 197}
 198
 199/*
 200 * allocate a new call
 201 */
 202static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
 203{
 204	struct rxrpc_call *call;
 205
 206	call = kmem_cache_zalloc(rxrpc_call_jar, gfp);
 207	if (!call)
 208		return NULL;
 209
 210	call->acks_winsz = 16;
 211	call->acks_window = kmalloc(call->acks_winsz * sizeof(unsigned long),
 212				    gfp);
 213	if (!call->acks_window) {
 214		kmem_cache_free(rxrpc_call_jar, call);
 215		return NULL;
 216	}
 217
 218	setup_timer(&call->lifetimer, &rxrpc_call_life_expired,
 219		    (unsigned long) call);
 220	setup_timer(&call->deadspan, &rxrpc_dead_call_expired,
 221		    (unsigned long) call);
 222	setup_timer(&call->ack_timer, &rxrpc_ack_time_expired,
 223		    (unsigned long) call);
 224	setup_timer(&call->resend_timer, &rxrpc_resend_time_expired,
 225		    (unsigned long) call);
 226	INIT_WORK(&call->destroyer, &rxrpc_destroy_call);
 227	INIT_WORK(&call->processor, &rxrpc_process_call);
 228	INIT_LIST_HEAD(&call->accept_link);
 229	skb_queue_head_init(&call->rx_queue);
 230	skb_queue_head_init(&call->rx_oos_queue);
 231	init_waitqueue_head(&call->tx_waitq);
 232	spin_lock_init(&call->lock);
 233	rwlock_init(&call->state_lock);
 234	atomic_set(&call->usage, 1);
 235	call->debug_id = atomic_inc_return(&rxrpc_debug_id);
 236	call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
 237
 238	memset(&call->sock_node, 0xed, sizeof(call->sock_node));
 239
 240	call->rx_data_expect = 1;
 241	call->rx_data_eaten = 0;
 242	call->rx_first_oos = 0;
 243	call->ackr_win_top = call->rx_data_eaten + 1 + rxrpc_rx_window_size;
 244	call->creation_jif = jiffies;
 245	return call;
 246}
 247
 248/*
 249 * allocate a new client call and attempt to get a connection slot for it
 250 */
 251static struct rxrpc_call *rxrpc_alloc_client_call(
 252	struct rxrpc_sock *rx,
 253	struct rxrpc_transport *trans,
 254	struct rxrpc_conn_bundle *bundle,
 255	gfp_t gfp)
 256{
 257	struct rxrpc_call *call;
 258	int ret;
 259
 260	_enter("");
 261
 262	ASSERT(rx != NULL);
 263	ASSERT(trans != NULL);
 264	ASSERT(bundle != NULL);
 265
 266	call = rxrpc_alloc_call(gfp);
 267	if (!call)
 268		return ERR_PTR(-ENOMEM);
 269
 270	sock_hold(&rx->sk);
 271	call->socket = rx;
 272	call->rx_data_post = 1;
 273
 274	ret = rxrpc_connect_call(rx, trans, bundle, call, gfp);
 275	if (ret < 0) {
 276		kmem_cache_free(rxrpc_call_jar, call);
 277		return ERR_PTR(ret);
 278	}
 279
 280	/* Record copies of information for hashtable lookup */
 281	call->proto = rx->proto;
 282	call->local = trans->local;
 283	switch (call->proto) {
 284	case AF_INET:
 285		call->peer_ip.ipv4_addr =
 286			trans->peer->srx.transport.sin.sin_addr.s_addr;
 287		break;
 288	case AF_INET6:
 289		memcpy(call->peer_ip.ipv6_addr,
 290		       trans->peer->srx.transport.sin6.sin6_addr.in6_u.u6_addr8,
 291		       sizeof(call->peer_ip.ipv6_addr));
 292		break;
 293	}
 294	call->epoch = call->conn->epoch;
 295	call->service_id = call->conn->service_id;
 296	call->in_clientflag = call->conn->in_clientflag;
 297	/* Add the new call to the hashtable */
 298	rxrpc_call_hash_add(call);
 299
 300	spin_lock(&call->conn->trans->peer->lock);
 301	list_add(&call->error_link, &call->conn->trans->peer->error_targets);
 302	spin_unlock(&call->conn->trans->peer->lock);
 303
 304	call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime;
 305	add_timer(&call->lifetimer);
 306
 307	_leave(" = %p", call);
 308	return call;
 309}
 310
 311/*
 312 * set up a call for the given data
 313 * - called in process context with IRQs enabled
 314 */
 315struct rxrpc_call *rxrpc_get_client_call(struct rxrpc_sock *rx,
 316					 struct rxrpc_transport *trans,
 317					 struct rxrpc_conn_bundle *bundle,
 318					 unsigned long user_call_ID,
 319					 int create,
 320					 gfp_t gfp)
 321{
 322	struct rxrpc_call *call, *candidate;
 323	struct rb_node *p, *parent, **pp;
 324
 325	_enter("%p,%d,%d,%lx,%d",
 326	       rx, trans ? trans->debug_id : -1, bundle ? bundle->debug_id : -1,
 327	       user_call_ID, create);
 328
 329	/* search the extant calls first for one that matches the specified
 330	 * user ID */
 331	read_lock(&rx->call_lock);
 332
 333	p = rx->calls.rb_node;
 334	while (p) {
 335		call = rb_entry(p, struct rxrpc_call, sock_node);
 336
 337		if (user_call_ID < call->user_call_ID)
 338			p = p->rb_left;
 339		else if (user_call_ID > call->user_call_ID)
 340			p = p->rb_right;
 341		else
 342			goto found_extant_call;
 343	}
 344
 345	read_unlock(&rx->call_lock);
 346
 347	if (!create || !trans)
 348		return ERR_PTR(-EBADSLT);
 349
 350	/* not yet present - create a candidate for a new record and then
 351	 * redo the search */
 352	candidate = rxrpc_alloc_client_call(rx, trans, bundle, gfp);
 353	if (IS_ERR(candidate)) {
 354		_leave(" = %ld", PTR_ERR(candidate));
 355		return candidate;
 356	}
 357
 358	candidate->user_call_ID = user_call_ID;
 359	__set_bit(RXRPC_CALL_HAS_USERID, &candidate->flags);
 360
 361	write_lock(&rx->call_lock);
 362
 363	pp = &rx->calls.rb_node;
 364	parent = NULL;
 365	while (*pp) {
 366		parent = *pp;
 367		call = rb_entry(parent, struct rxrpc_call, sock_node);
 368
 369		if (user_call_ID < call->user_call_ID)
 370			pp = &(*pp)->rb_left;
 371		else if (user_call_ID > call->user_call_ID)
 372			pp = &(*pp)->rb_right;
 373		else
 374			goto found_extant_second;
 375	}
 376
 377	/* second search also failed; add the new call */
 378	call = candidate;
 379	candidate = NULL;
 380	rxrpc_get_call(call);
 381
 382	rb_link_node(&call->sock_node, parent, pp);
 383	rb_insert_color(&call->sock_node, &rx->calls);
 384	write_unlock(&rx->call_lock);
 385
 386	write_lock_bh(&rxrpc_call_lock);
 387	list_add_tail(&call->link, &rxrpc_calls);
 388	write_unlock_bh(&rxrpc_call_lock);
 389
 390	_net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
 391
 392	_leave(" = %p [new]", call);
 393	return call;
 394
 395	/* we found the call in the list immediately */
 396found_extant_call:
 397	rxrpc_get_call(call);
 398	read_unlock(&rx->call_lock);
 399	_leave(" = %p [extant %d]", call, atomic_read(&call->usage));
 400	return call;
 401
 402	/* we found the call on the second time through the list */
 403found_extant_second:
 404	rxrpc_get_call(call);
 405	write_unlock(&rx->call_lock);
 406	rxrpc_put_call(candidate);
 407	_leave(" = %p [second %d]", call, atomic_read(&call->usage));
 408	return call;
 409}
 410
 411/*
 412 * set up an incoming call
 413 * - called in process context with IRQs enabled
 414 */
 415struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
 416				       struct rxrpc_connection *conn,
 417				       struct rxrpc_header *hdr,
 418				       gfp_t gfp)
 419{
 420	struct rxrpc_call *call, *candidate;
 421	struct rb_node **p, *parent;
 422	__be32 call_id;
 423
 424	_enter(",%d,,%x", conn->debug_id, gfp);
 425
 426	ASSERT(rx != NULL);
 427
 428	candidate = rxrpc_alloc_call(gfp);
 429	if (!candidate)
 430		return ERR_PTR(-EBUSY);
 431
 432	candidate->socket = rx;
 433	candidate->conn = conn;
 434	candidate->cid = hdr->cid;
 435	candidate->call_id = hdr->callNumber;
 436	candidate->channel = ntohl(hdr->cid) & RXRPC_CHANNELMASK;
 437	candidate->rx_data_post = 0;
 438	candidate->state = RXRPC_CALL_SERVER_ACCEPTING;
 439	if (conn->security_ix > 0)
 440		candidate->state = RXRPC_CALL_SERVER_SECURING;
 441
 442	write_lock_bh(&conn->lock);
 443
 444	/* set the channel for this call */
 445	call = conn->channels[candidate->channel];
 446	_debug("channel[%u] is %p", candidate->channel, call);
 447	if (call && call->call_id == hdr->callNumber) {
 448		/* already set; must've been a duplicate packet */
 449		_debug("extant call [%d]", call->state);
 450		ASSERTCMP(call->conn, ==, conn);
 451
 452		read_lock(&call->state_lock);
 453		switch (call->state) {
 454		case RXRPC_CALL_LOCALLY_ABORTED:
 455			if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
 456				rxrpc_queue_call(call);
 457		case RXRPC_CALL_REMOTELY_ABORTED:
 458			read_unlock(&call->state_lock);
 459			goto aborted_call;
 460		default:
 461			rxrpc_get_call(call);
 462			read_unlock(&call->state_lock);
 463			goto extant_call;
 464		}
 465	}
 466
 467	if (call) {
 468		/* it seems the channel is still in use from the previous call
 469		 * - ditch the old binding if its call is now complete */
 470		_debug("CALL: %u { %s }",
 471		       call->debug_id, rxrpc_call_states[call->state]);
 472
 473		if (call->state >= RXRPC_CALL_COMPLETE) {
 474			conn->channels[call->channel] = NULL;
 475		} else {
 476			write_unlock_bh(&conn->lock);
 477			kmem_cache_free(rxrpc_call_jar, candidate);
 478			_leave(" = -EBUSY");
 479			return ERR_PTR(-EBUSY);
 480		}
 481	}
 482
 483	/* check the call number isn't duplicate */
 484	_debug("check dup");
 485	call_id = hdr->callNumber;
 486	p = &conn->calls.rb_node;
 487	parent = NULL;
 488	while (*p) {
 489		parent = *p;
 490		call = rb_entry(parent, struct rxrpc_call, conn_node);
 491
 492		/* The tree is sorted in order of the __be32 value without
 493		 * turning it into host order.
 494		 */
 495		if ((__force u32)call_id < (__force u32)call->call_id)
 496			p = &(*p)->rb_left;
 497		else if ((__force u32)call_id > (__force u32)call->call_id)
 498			p = &(*p)->rb_right;
 499		else
 500			goto old_call;
 501	}
 502
 503	/* make the call available */
 504	_debug("new call");
 505	call = candidate;
 506	candidate = NULL;
 507	rb_link_node(&call->conn_node, parent, p);
 508	rb_insert_color(&call->conn_node, &conn->calls);
 509	conn->channels[call->channel] = call;
 510	sock_hold(&rx->sk);
 511	atomic_inc(&conn->usage);
 512	write_unlock_bh(&conn->lock);
 513
 514	spin_lock(&conn->trans->peer->lock);
 515	list_add(&call->error_link, &conn->trans->peer->error_targets);
 516	spin_unlock(&conn->trans->peer->lock);
 517
 518	write_lock_bh(&rxrpc_call_lock);
 519	list_add_tail(&call->link, &rxrpc_calls);
 520	write_unlock_bh(&rxrpc_call_lock);
 521
 522	/* Record copies of information for hashtable lookup */
 523	call->proto = rx->proto;
 524	call->local = conn->trans->local;
 525	switch (call->proto) {
 526	case AF_INET:
 527		call->peer_ip.ipv4_addr =
 528			conn->trans->peer->srx.transport.sin.sin_addr.s_addr;
 529		break;
 530	case AF_INET6:
 531		memcpy(call->peer_ip.ipv6_addr,
 532		       conn->trans->peer->srx.transport.sin6.sin6_addr.in6_u.u6_addr8,
 533		       sizeof(call->peer_ip.ipv6_addr));
 534		break;
 535	default:
 536		break;
 537	}
 538	call->epoch = conn->epoch;
 539	call->service_id = conn->service_id;
 540	call->in_clientflag = conn->in_clientflag;
 541	/* Add the new call to the hashtable */
 542	rxrpc_call_hash_add(call);
 543
 544	_net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
 545
 546	call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime;
 547	add_timer(&call->lifetimer);
 548	_leave(" = %p {%d} [new]", call, call->debug_id);
 549	return call;
 550
 551extant_call:
 552	write_unlock_bh(&conn->lock);
 553	kmem_cache_free(rxrpc_call_jar, candidate);
 554	_leave(" = %p {%d} [extant]", call, call ? call->debug_id : -1);
 555	return call;
 556
 557aborted_call:
 558	write_unlock_bh(&conn->lock);
 559	kmem_cache_free(rxrpc_call_jar, candidate);
 560	_leave(" = -ECONNABORTED");
 561	return ERR_PTR(-ECONNABORTED);
 562
 563old_call:
 564	write_unlock_bh(&conn->lock);
 565	kmem_cache_free(rxrpc_call_jar, candidate);
 566	_leave(" = -ECONNRESET [old]");
 567	return ERR_PTR(-ECONNRESET);
 568}
 569
 570/*
 571 * find an extant server call
 572 * - called in process context with IRQs enabled
 573 */
 574struct rxrpc_call *rxrpc_find_server_call(struct rxrpc_sock *rx,
 575					  unsigned long user_call_ID)
 576{
 577	struct rxrpc_call *call;
 578	struct rb_node *p;
 579
 580	_enter("%p,%lx", rx, user_call_ID);
 581
 582	/* search the extant calls for one that matches the specified user
 583	 * ID */
 584	read_lock(&rx->call_lock);
 585
 586	p = rx->calls.rb_node;
 587	while (p) {
 588		call = rb_entry(p, struct rxrpc_call, sock_node);
 589
 590		if (user_call_ID < call->user_call_ID)
 591			p = p->rb_left;
 592		else if (user_call_ID > call->user_call_ID)
 593			p = p->rb_right;
 594		else
 595			goto found_extant_call;
 596	}
 597
 598	read_unlock(&rx->call_lock);
 599	_leave(" = NULL");
 600	return NULL;
 601
 602	/* we found the call in the list immediately */
 603found_extant_call:
 604	rxrpc_get_call(call);
 605	read_unlock(&rx->call_lock);
 606	_leave(" = %p [%d]", call, atomic_read(&call->usage));
 607	return call;
 608}
 609
 610/*
 611 * detach a call from a socket and set up for release
 612 */
 613void rxrpc_release_call(struct rxrpc_call *call)
 614{
 615	struct rxrpc_connection *conn = call->conn;
 616	struct rxrpc_sock *rx = call->socket;
 617
 618	_enter("{%d,%d,%d,%d}",
 619	       call->debug_id, atomic_read(&call->usage),
 620	       atomic_read(&call->ackr_not_idle),
 621	       call->rx_first_oos);
 622
 623	spin_lock_bh(&call->lock);
 624	if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
 625		BUG();
 626	spin_unlock_bh(&call->lock);
 627
 628	/* dissociate from the socket
 629	 * - the socket's ref on the call is passed to the death timer
 630	 */
 631	_debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
 632
 633	write_lock_bh(&rx->call_lock);
 634	if (!list_empty(&call->accept_link)) {
 635		_debug("unlinking once-pending call %p { e=%lx f=%lx }",
 636		       call, call->events, call->flags);
 637		ASSERT(!test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
 638		list_del_init(&call->accept_link);
 639		sk_acceptq_removed(&rx->sk);
 640	} else if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
 641		rb_erase(&call->sock_node, &rx->calls);
 642		memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
 643		clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
 644	}
 645	write_unlock_bh(&rx->call_lock);
 646
 647	/* free up the channel for reuse */
 648	spin_lock(&conn->trans->client_lock);
 649	write_lock_bh(&conn->lock);
 650	write_lock(&call->state_lock);
 651
 652	if (conn->channels[call->channel] == call)
 653		conn->channels[call->channel] = NULL;
 654
 655	if (conn->out_clientflag && conn->bundle) {
 656		conn->avail_calls++;
 657		switch (conn->avail_calls) {
 658		case 1:
 659			list_move_tail(&conn->bundle_link,
 660				       &conn->bundle->avail_conns);
 661		case 2 ... RXRPC_MAXCALLS - 1:
 662			ASSERT(conn->channels[0] == NULL ||
 663			       conn->channels[1] == NULL ||
 664			       conn->channels[2] == NULL ||
 665			       conn->channels[3] == NULL);
 666			break;
 667		case RXRPC_MAXCALLS:
 668			list_move_tail(&conn->bundle_link,
 669				       &conn->bundle->unused_conns);
 670			ASSERT(conn->channels[0] == NULL &&
 671			       conn->channels[1] == NULL &&
 672			       conn->channels[2] == NULL &&
 673			       conn->channels[3] == NULL);
 674			break;
 675		default:
 676			printk(KERN_ERR "RxRPC: conn->avail_calls=%d\n",
 677			       conn->avail_calls);
 678			BUG();
 679		}
 680	}
 681
 682	spin_unlock(&conn->trans->client_lock);
 683
 684	if (call->state < RXRPC_CALL_COMPLETE &&
 685	    call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
 686		_debug("+++ ABORTING STATE %d +++\n", call->state);
 687		call->state = RXRPC_CALL_LOCALLY_ABORTED;
 688		call->abort_code = RX_CALL_DEAD;
 689		set_bit(RXRPC_CALL_ABORT, &call->events);
 690		rxrpc_queue_call(call);
 691	}
 692	write_unlock(&call->state_lock);
 693	write_unlock_bh(&conn->lock);
 694
 695	/* clean up the Rx queue */
 696	if (!skb_queue_empty(&call->rx_queue) ||
 697	    !skb_queue_empty(&call->rx_oos_queue)) {
 698		struct rxrpc_skb_priv *sp;
 699		struct sk_buff *skb;
 700
 701		_debug("purge Rx queues");
 702
 703		spin_lock_bh(&call->lock);
 704		while ((skb = skb_dequeue(&call->rx_queue)) ||
 705		       (skb = skb_dequeue(&call->rx_oos_queue))) {
 706			sp = rxrpc_skb(skb);
 707			if (sp->call) {
 708				ASSERTCMP(sp->call, ==, call);
 709				rxrpc_put_call(call);
 710				sp->call = NULL;
 711			}
 712			skb->destructor = NULL;
 713			spin_unlock_bh(&call->lock);
 714
 715			_debug("- zap %s %%%u #%u",
 716			       rxrpc_pkts[sp->hdr.type],
 717			       ntohl(sp->hdr.serial),
 718			       ntohl(sp->hdr.seq));
 719			rxrpc_free_skb(skb);
 720			spin_lock_bh(&call->lock);
 721		}
 722		spin_unlock_bh(&call->lock);
 723
 724		ASSERTCMP(call->state, !=, RXRPC_CALL_COMPLETE);
 725	}
 726
 727	del_timer_sync(&call->resend_timer);
 728	del_timer_sync(&call->ack_timer);
 729	del_timer_sync(&call->lifetimer);
 730	call->deadspan.expires = jiffies + rxrpc_dead_call_expiry;
 731	add_timer(&call->deadspan);
 732
 733	_leave("");
 734}
 735
 736/*
 737 * handle a dead call being ready for reaping
 738 */
 739static void rxrpc_dead_call_expired(unsigned long _call)
 740{
 741	struct rxrpc_call *call = (struct rxrpc_call *) _call;
 742
 743	_enter("{%d}", call->debug_id);
 744
 745	write_lock_bh(&call->state_lock);
 746	call->state = RXRPC_CALL_DEAD;
 747	write_unlock_bh(&call->state_lock);
 748	rxrpc_put_call(call);
 749}
 750
 751/*
 752 * mark a call as to be released, aborting it if it's still in progress
 753 * - called with softirqs disabled
 754 */
 755static void rxrpc_mark_call_released(struct rxrpc_call *call)
 756{
 757	bool sched;
 758
 759	write_lock(&call->state_lock);
 760	if (call->state < RXRPC_CALL_DEAD) {
 761		sched = false;
 762		if (call->state < RXRPC_CALL_COMPLETE) {
 763			_debug("abort call %p", call);
 764			call->state = RXRPC_CALL_LOCALLY_ABORTED;
 765			call->abort_code = RX_CALL_DEAD;
 766			if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
 767				sched = true;
 768		}
 769		if (!test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
 770			sched = true;
 771		if (sched)
 772			rxrpc_queue_call(call);
 773	}
 774	write_unlock(&call->state_lock);
 775}
 776
 777/*
 778 * release all the calls associated with a socket
 779 */
 780void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
 781{
 782	struct rxrpc_call *call;
 783	struct rb_node *p;
 784
 785	_enter("%p", rx);
 786
 787	read_lock_bh(&rx->call_lock);
 788
 789	/* mark all the calls as no longer wanting incoming packets */
 790	for (p = rb_first(&rx->calls); p; p = rb_next(p)) {
 791		call = rb_entry(p, struct rxrpc_call, sock_node);
 792		rxrpc_mark_call_released(call);
 793	}
 794
 795	/* kill the not-yet-accepted incoming calls */
 796	list_for_each_entry(call, &rx->secureq, accept_link) {
 797		rxrpc_mark_call_released(call);
 798	}
 799
 800	list_for_each_entry(call, &rx->acceptq, accept_link) {
 801		rxrpc_mark_call_released(call);
 802	}
 803
 804	read_unlock_bh(&rx->call_lock);
 805	_leave("");
 806}
 807
 808/*
 809 * release a call
 810 */
 811void __rxrpc_put_call(struct rxrpc_call *call)
 812{
 813	ASSERT(call != NULL);
 814
 815	_enter("%p{u=%d}", call, atomic_read(&call->usage));
 816
 817	ASSERTCMP(atomic_read(&call->usage), >, 0);
 818
 819	if (atomic_dec_and_test(&call->usage)) {
 820		_debug("call %d dead", call->debug_id);
 821		ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
 822		rxrpc_queue_work(&call->destroyer);
 823	}
 824	_leave("");
 825}
 826
 827/*
 828 * clean up a call
 829 */
 830static void rxrpc_cleanup_call(struct rxrpc_call *call)
 831{
 832	_net("DESTROY CALL %d", call->debug_id);
 833
 834	ASSERT(call->socket);
 835
 836	memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
 837
 838	del_timer_sync(&call->lifetimer);
 839	del_timer_sync(&call->deadspan);
 840	del_timer_sync(&call->ack_timer);
 841	del_timer_sync(&call->resend_timer);
 842
 843	ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
 844	ASSERTCMP(call->events, ==, 0);
 845	if (work_pending(&call->processor)) {
 846		_debug("defer destroy");
 847		rxrpc_queue_work(&call->destroyer);
 848		return;
 849	}
 850
 851	if (call->conn) {
 852		spin_lock(&call->conn->trans->peer->lock);
 853		list_del(&call->error_link);
 854		spin_unlock(&call->conn->trans->peer->lock);
 855
 856		write_lock_bh(&call->conn->lock);
 857		rb_erase(&call->conn_node, &call->conn->calls);
 858		write_unlock_bh(&call->conn->lock);
 859		rxrpc_put_connection(call->conn);
 860	}
 861
 862	/* Remove the call from the hash */
 863	rxrpc_call_hash_del(call);
 864
 865	if (call->acks_window) {
 866		_debug("kill Tx window %d",
 867		       CIRC_CNT(call->acks_head, call->acks_tail,
 868				call->acks_winsz));
 869		smp_mb();
 870		while (CIRC_CNT(call->acks_head, call->acks_tail,
 871				call->acks_winsz) > 0) {
 872			struct rxrpc_skb_priv *sp;
 873			unsigned long _skb;
 874
 875			_skb = call->acks_window[call->acks_tail] & ~1;
 876			sp = rxrpc_skb((struct sk_buff *) _skb);
 877			_debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
 878			rxrpc_free_skb((struct sk_buff *) _skb);
 879			call->acks_tail =
 880				(call->acks_tail + 1) & (call->acks_winsz - 1);
 881		}
 882
 883		kfree(call->acks_window);
 884	}
 885
 886	rxrpc_free_skb(call->tx_pending);
 887
 888	rxrpc_purge_queue(&call->rx_queue);
 889	ASSERT(skb_queue_empty(&call->rx_oos_queue));
 890	sock_put(&call->socket->sk);
 891	kmem_cache_free(rxrpc_call_jar, call);
 892}
 893
 894/*
 895 * destroy a call
 896 */
 897static void rxrpc_destroy_call(struct work_struct *work)
 898{
 899	struct rxrpc_call *call =
 900		container_of(work, struct rxrpc_call, destroyer);
 901
 902	_enter("%p{%d,%d,%p}",
 903	       call, atomic_read(&call->usage), call->channel, call->conn);
 904
 905	ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
 906
 907	write_lock_bh(&rxrpc_call_lock);
 908	list_del_init(&call->link);
 909	write_unlock_bh(&rxrpc_call_lock);
 910
 911	rxrpc_cleanup_call(call);
 912	_leave("");
 913}
 914
 915/*
 916 * preemptively destroy all the call records from a transport endpoint rather
 917 * than waiting for them to time out
 918 */
 919void __exit rxrpc_destroy_all_calls(void)
 920{
 921	struct rxrpc_call *call;
 922
 923	_enter("");
 924	write_lock_bh(&rxrpc_call_lock);
 925
 926	while (!list_empty(&rxrpc_calls)) {
 927		call = list_entry(rxrpc_calls.next, struct rxrpc_call, link);
 928		_debug("Zapping call %p", call);
 929
 930		list_del_init(&call->link);
 931
 932		switch (atomic_read(&call->usage)) {
 933		case 0:
 934			ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
 935			break;
 936		case 1:
 937			if (del_timer_sync(&call->deadspan) != 0 &&
 938			    call->state != RXRPC_CALL_DEAD)
 939				rxrpc_dead_call_expired((unsigned long) call);
 940			if (call->state != RXRPC_CALL_DEAD)
 941				break;
 942		default:
 943			printk(KERN_ERR "RXRPC:"
 944			       " Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
 945			       call, atomic_read(&call->usage),
 946			       atomic_read(&call->ackr_not_idle),
 947			       rxrpc_call_states[call->state],
 948			       call->flags, call->events);
 949			if (!skb_queue_empty(&call->rx_queue))
 950				printk(KERN_ERR"RXRPC: Rx queue occupied\n");
 951			if (!skb_queue_empty(&call->rx_oos_queue))
 952				printk(KERN_ERR"RXRPC: OOS queue occupied\n");
 953			break;
 954		}
 955
 956		write_unlock_bh(&rxrpc_call_lock);
 957		cond_resched();
 958		write_lock_bh(&rxrpc_call_lock);
 959	}
 960
 961	write_unlock_bh(&rxrpc_call_lock);
 962	_leave("");
 963}
 964
 965/*
 966 * handle call lifetime being exceeded
 967 */
 968static void rxrpc_call_life_expired(unsigned long _call)
 969{
 970	struct rxrpc_call *call = (struct rxrpc_call *) _call;
 971
 972	if (call->state >= RXRPC_CALL_COMPLETE)
 973		return;
 974
 975	_enter("{%d}", call->debug_id);
 976	read_lock_bh(&call->state_lock);
 977	if (call->state < RXRPC_CALL_COMPLETE) {
 978		set_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
 979		rxrpc_queue_call(call);
 980	}
 981	read_unlock_bh(&call->state_lock);
 982}
 983
 984/*
 985 * handle resend timer expiry
 986 * - may not take call->state_lock as this can deadlock against del_timer_sync()
 987 */
 988static void rxrpc_resend_time_expired(unsigned long _call)
 989{
 990	struct rxrpc_call *call = (struct rxrpc_call *) _call;
 991
 992	_enter("{%d}", call->debug_id);
 993
 994	if (call->state >= RXRPC_CALL_COMPLETE)
 995		return;
 996
 997	clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 998	if (!test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
 999		rxrpc_queue_call(call);
1000}
1001
1002/*
1003 * handle ACK timer expiry
1004 */
1005static void rxrpc_ack_time_expired(unsigned long _call)
1006{
1007	struct rxrpc_call *call = (struct rxrpc_call *) _call;
1008
1009	_enter("{%d}", call->debug_id);
1010
1011	if (call->state >= RXRPC_CALL_COMPLETE)
1012		return;
1013
1014	read_lock_bh(&call->state_lock);
1015	if (call->state < RXRPC_CALL_COMPLETE &&
1016	    !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
1017		rxrpc_queue_call(call);
1018	read_unlock_bh(&call->state_lock);
1019}