Linux Audio

Check our new training course

Loading...
   1/*
   2 * This file is subject to the terms and conditions of the GNU General Public
   3 * License.  See the file "COPYING" in the main directory of this archive
   4 * for more details.
   5 *
   6 * Copyright (c) 2004-2009 Silicon Graphics, Inc.  All Rights Reserved.
   7 */
   8
   9/*
  10 * Cross Partition Communication (XPC) channel support.
  11 *
  12 *	This is the part of XPC that manages the channels and
  13 *	sends/receives messages across them to/from other partitions.
  14 *
  15 */
  16
  17#include <linux/device.h>
  18#include "xpc.h"
  19
  20/*
  21 * Process a connect message from a remote partition.
  22 *
  23 * Note: xpc_process_connect() is expecting to be called with the
  24 * spin_lock_irqsave held and will leave it locked upon return.
  25 */
  26static void
  27xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags)
  28{
  29	enum xp_retval ret;
  30
  31	lockdep_assert_held(&ch->lock);
  32
  33	if (!(ch->flags & XPC_C_OPENREQUEST) ||
  34	    !(ch->flags & XPC_C_ROPENREQUEST)) {
  35		/* nothing more to do for now */
  36		return;
  37	}
  38	DBUG_ON(!(ch->flags & XPC_C_CONNECTING));
  39
  40	if (!(ch->flags & XPC_C_SETUP)) {
  41		spin_unlock_irqrestore(&ch->lock, *irq_flags);
  42		ret = xpc_arch_ops.setup_msg_structures(ch);
  43		spin_lock_irqsave(&ch->lock, *irq_flags);
  44
  45		if (ret != xpSuccess)
  46			XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags);
  47		else
  48			ch->flags |= XPC_C_SETUP;
  49
  50		if (ch->flags & XPC_C_DISCONNECTING)
  51			return;
  52	}
  53
  54	if (!(ch->flags & XPC_C_OPENREPLY)) {
  55		ch->flags |= XPC_C_OPENREPLY;
  56		xpc_arch_ops.send_chctl_openreply(ch, irq_flags);
  57	}
  58
  59	if (!(ch->flags & XPC_C_ROPENREPLY))
  60		return;
  61
  62	if (!(ch->flags & XPC_C_OPENCOMPLETE)) {
  63		ch->flags |= (XPC_C_OPENCOMPLETE | XPC_C_CONNECTED);
  64		xpc_arch_ops.send_chctl_opencomplete(ch, irq_flags);
  65	}
  66
  67	if (!(ch->flags & XPC_C_ROPENCOMPLETE))
  68		return;
  69
  70	dev_info(xpc_chan, "channel %d to partition %d connected\n",
  71		 ch->number, ch->partid);
  72
  73	ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP);	/* clear all else */
  74}
  75
  76/*
  77 * spin_lock_irqsave() is expected to be held on entry.
  78 */
  79static void
  80xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
  81{
  82	struct xpc_partition *part = &xpc_partitions[ch->partid];
  83	u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED);
  84
  85	lockdep_assert_held(&ch->lock);
  86
  87	if (!(ch->flags & XPC_C_DISCONNECTING))
  88		return;
  89
  90	DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
  91
  92	/* make sure all activity has settled down first */
  93
  94	if (atomic_read(&ch->kthreads_assigned) > 0 ||
  95	    atomic_read(&ch->references) > 0) {
  96		return;
  97	}
  98	DBUG_ON((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
  99		!(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE));
 100
 101	if (part->act_state == XPC_P_AS_DEACTIVATING) {
 102		/* can't proceed until the other side disengages from us */
 103		if (xpc_arch_ops.partition_engaged(ch->partid))
 104			return;
 105
 106	} else {
 107
 108		/* as long as the other side is up do the full protocol */
 109
 110		if (!(ch->flags & XPC_C_RCLOSEREQUEST))
 111			return;
 112
 113		if (!(ch->flags & XPC_C_CLOSEREPLY)) {
 114			ch->flags |= XPC_C_CLOSEREPLY;
 115			xpc_arch_ops.send_chctl_closereply(ch, irq_flags);
 116		}
 117
 118		if (!(ch->flags & XPC_C_RCLOSEREPLY))
 119			return;
 120	}
 121
 122	/* wake those waiting for notify completion */
 123	if (atomic_read(&ch->n_to_notify) > 0) {
 124		/* we do callout while holding ch->lock, callout can't block */
 125		xpc_arch_ops.notify_senders_of_disconnect(ch);
 126	}
 127
 128	/* both sides are disconnected now */
 129
 130	if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) {
 131		spin_unlock_irqrestore(&ch->lock, *irq_flags);
 132		xpc_disconnect_callout(ch, xpDisconnected);
 133		spin_lock_irqsave(&ch->lock, *irq_flags);
 134	}
 135
 136	DBUG_ON(atomic_read(&ch->n_to_notify) != 0);
 137
 138	/* it's now safe to free the channel's message queues */
 139	xpc_arch_ops.teardown_msg_structures(ch);
 140
 141	ch->func = NULL;
 142	ch->key = NULL;
 143	ch->entry_size = 0;
 144	ch->local_nentries = 0;
 145	ch->remote_nentries = 0;
 146	ch->kthreads_assigned_limit = 0;
 147	ch->kthreads_idle_limit = 0;
 148
 149	/*
 150	 * Mark the channel disconnected and clear all other flags, including
 151	 * XPC_C_SETUP (because of call to
 152	 * xpc_arch_ops.teardown_msg_structures()) but not including
 153	 * XPC_C_WDISCONNECT (if it was set).
 154	 */
 155	ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT));
 156
 157	atomic_dec(&part->nchannels_active);
 158
 159	if (channel_was_connected) {
 160		dev_info(xpc_chan, "channel %d to partition %d disconnected, "
 161			 "reason=%d\n", ch->number, ch->partid, ch->reason);
 162	}
 163
 164	if (ch->flags & XPC_C_WDISCONNECT) {
 165		/* we won't lose the CPU since we're holding ch->lock */
 166		complete(&ch->wdisconnect_wait);
 167	} else if (ch->delayed_chctl_flags) {
 168		if (part->act_state != XPC_P_AS_DEACTIVATING) {
 169			/* time to take action on any delayed chctl flags */
 170			spin_lock(&part->chctl_lock);
 171			part->chctl.flags[ch->number] |=
 172			    ch->delayed_chctl_flags;
 173			spin_unlock(&part->chctl_lock);
 174		}
 175		ch->delayed_chctl_flags = 0;
 176	}
 177}
 178
 179/*
 180 * Process a change in the channel's remote connection state.
 181 */
 182static void
 183xpc_process_openclose_chctl_flags(struct xpc_partition *part, int ch_number,
 184				  u8 chctl_flags)
 185{
 186	unsigned long irq_flags;
 187	struct xpc_openclose_args *args =
 188	    &part->remote_openclose_args[ch_number];
 189	struct xpc_channel *ch = &part->channels[ch_number];
 190	enum xp_retval reason;
 191	enum xp_retval ret;
 192	int create_kthread = 0;
 193
 194	spin_lock_irqsave(&ch->lock, irq_flags);
 195
 196again:
 197
 198	if ((ch->flags & XPC_C_DISCONNECTED) &&
 199	    (ch->flags & XPC_C_WDISCONNECT)) {
 200		/*
 201		 * Delay processing chctl flags until thread waiting disconnect
 202		 * has had a chance to see that the channel is disconnected.
 203		 */
 204		ch->delayed_chctl_flags |= chctl_flags;
 205		goto out;
 206	}
 207
 208	if (chctl_flags & XPC_CHCTL_CLOSEREQUEST) {
 209
 210		dev_dbg(xpc_chan, "XPC_CHCTL_CLOSEREQUEST (reason=%d) received "
 211			"from partid=%d, channel=%d\n", args->reason,
 212			ch->partid, ch->number);
 213
 214		/*
 215		 * If RCLOSEREQUEST is set, we're probably waiting for
 216		 * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
 217		 * with this RCLOSEREQUEST in the chctl_flags.
 218		 */
 219
 220		if (ch->flags & XPC_C_RCLOSEREQUEST) {
 221			DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
 222			DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
 223			DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY));
 224			DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY);
 225
 226			DBUG_ON(!(chctl_flags & XPC_CHCTL_CLOSEREPLY));
 227			chctl_flags &= ~XPC_CHCTL_CLOSEREPLY;
 228			ch->flags |= XPC_C_RCLOSEREPLY;
 229
 230			/* both sides have finished disconnecting */
 231			xpc_process_disconnect(ch, &irq_flags);
 232			DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
 233			goto again;
 234		}
 235
 236		if (ch->flags & XPC_C_DISCONNECTED) {
 237			if (!(chctl_flags & XPC_CHCTL_OPENREQUEST)) {
 238				if (part->chctl.flags[ch_number] &
 239				    XPC_CHCTL_OPENREQUEST) {
 240
 241					DBUG_ON(ch->delayed_chctl_flags != 0);
 242					spin_lock(&part->chctl_lock);
 243					part->chctl.flags[ch_number] |=
 244					    XPC_CHCTL_CLOSEREQUEST;
 245					spin_unlock(&part->chctl_lock);
 246				}
 247				goto out;
 248			}
 249
 250			XPC_SET_REASON(ch, 0, 0);
 251			ch->flags &= ~XPC_C_DISCONNECTED;
 252
 253			atomic_inc(&part->nchannels_active);
 254			ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST);
 255		}
 256
 257		chctl_flags &= ~(XPC_CHCTL_OPENREQUEST | XPC_CHCTL_OPENREPLY |
 258		    XPC_CHCTL_OPENCOMPLETE);
 259
 260		/*
 261		 * The meaningful CLOSEREQUEST connection state fields are:
 262		 *      reason = reason connection is to be closed
 263		 */
 264
 265		ch->flags |= XPC_C_RCLOSEREQUEST;
 266
 267		if (!(ch->flags & XPC_C_DISCONNECTING)) {
 268			reason = args->reason;
 269			if (reason <= xpSuccess || reason > xpUnknownReason)
 270				reason = xpUnknownReason;
 271			else if (reason == xpUnregistering)
 272				reason = xpOtherUnregistering;
 273
 274			XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
 275
 276			DBUG_ON(chctl_flags & XPC_CHCTL_CLOSEREPLY);
 277			goto out;
 278		}
 279
 280		xpc_process_disconnect(ch, &irq_flags);
 281	}
 282
 283	if (chctl_flags & XPC_CHCTL_CLOSEREPLY) {
 284
 285		dev_dbg(xpc_chan, "XPC_CHCTL_CLOSEREPLY received from partid="
 286			"%d, channel=%d\n", ch->partid, ch->number);
 287
 288		if (ch->flags & XPC_C_DISCONNECTED) {
 289			DBUG_ON(part->act_state != XPC_P_AS_DEACTIVATING);
 290			goto out;
 291		}
 292
 293		DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
 294
 295		if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
 296			if (part->chctl.flags[ch_number] &
 297			    XPC_CHCTL_CLOSEREQUEST) {
 298
 299				DBUG_ON(ch->delayed_chctl_flags != 0);
 300				spin_lock(&part->chctl_lock);
 301				part->chctl.flags[ch_number] |=
 302				    XPC_CHCTL_CLOSEREPLY;
 303				spin_unlock(&part->chctl_lock);
 304			}
 305			goto out;
 306		}
 307
 308		ch->flags |= XPC_C_RCLOSEREPLY;
 309
 310		if (ch->flags & XPC_C_CLOSEREPLY) {
 311			/* both sides have finished disconnecting */
 312			xpc_process_disconnect(ch, &irq_flags);
 313		}
 314	}
 315
 316	if (chctl_flags & XPC_CHCTL_OPENREQUEST) {
 317
 318		dev_dbg(xpc_chan, "XPC_CHCTL_OPENREQUEST (entry_size=%d, "
 319			"local_nentries=%d) received from partid=%d, "
 320			"channel=%d\n", args->entry_size, args->local_nentries,
 321			ch->partid, ch->number);
 322
 323		if (part->act_state == XPC_P_AS_DEACTIVATING ||
 324		    (ch->flags & XPC_C_ROPENREQUEST)) {
 325			goto out;
 326		}
 327
 328		if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) {
 329			ch->delayed_chctl_flags |= XPC_CHCTL_OPENREQUEST;
 330			goto out;
 331		}
 332		DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED |
 333				       XPC_C_OPENREQUEST)));
 334		DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
 335				     XPC_C_OPENREPLY | XPC_C_CONNECTED));
 336
 337		/*
 338		 * The meaningful OPENREQUEST connection state fields are:
 339		 *      entry_size = size of channel's messages in bytes
 340		 *      local_nentries = remote partition's local_nentries
 341		 */
 342		if (args->entry_size == 0 || args->local_nentries == 0) {
 343			/* assume OPENREQUEST was delayed by mistake */
 344			goto out;
 345		}
 346
 347		ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING);
 348		ch->remote_nentries = args->local_nentries;
 349
 350		if (ch->flags & XPC_C_OPENREQUEST) {
 351			if (args->entry_size != ch->entry_size) {
 352				XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
 353						       &irq_flags);
 354				goto out;
 355			}
 356		} else {
 357			ch->entry_size = args->entry_size;
 358
 359			XPC_SET_REASON(ch, 0, 0);
 360			ch->flags &= ~XPC_C_DISCONNECTED;
 361
 362			atomic_inc(&part->nchannels_active);
 363		}
 364
 365		xpc_process_connect(ch, &irq_flags);
 366	}
 367
 368	if (chctl_flags & XPC_CHCTL_OPENREPLY) {
 369
 370		dev_dbg(xpc_chan, "XPC_CHCTL_OPENREPLY (local_msgqueue_pa="
 371			"0x%lx, local_nentries=%d, remote_nentries=%d) "
 372			"received from partid=%d, channel=%d\n",
 373			args->local_msgqueue_pa, args->local_nentries,
 374			args->remote_nentries, ch->partid, ch->number);
 375
 376		if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED))
 377			goto out;
 378
 379		if (!(ch->flags & XPC_C_OPENREQUEST)) {
 380			XPC_DISCONNECT_CHANNEL(ch, xpOpenCloseError,
 381					       &irq_flags);
 382			goto out;
 383		}
 384
 385		DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST));
 386		DBUG_ON(ch->flags & XPC_C_CONNECTED);
 387
 388		/*
 389		 * The meaningful OPENREPLY connection state fields are:
 390		 *      local_msgqueue_pa = physical address of remote
 391		 *                          partition's local_msgqueue
 392		 *      local_nentries = remote partition's local_nentries
 393		 *      remote_nentries = remote partition's remote_nentries
 394		 */
 395		DBUG_ON(args->local_msgqueue_pa == 0);
 396		DBUG_ON(args->local_nentries == 0);
 397		DBUG_ON(args->remote_nentries == 0);
 398
 399		ret = xpc_arch_ops.save_remote_msgqueue_pa(ch,
 400						      args->local_msgqueue_pa);
 401		if (ret != xpSuccess) {
 402			XPC_DISCONNECT_CHANNEL(ch, ret, &irq_flags);
 403			goto out;
 404		}
 405		ch->flags |= XPC_C_ROPENREPLY;
 406
 407		if (args->local_nentries < ch->remote_nentries) {
 408			dev_dbg(xpc_chan, "XPC_CHCTL_OPENREPLY: new "
 409				"remote_nentries=%d, old remote_nentries=%d, "
 410				"partid=%d, channel=%d\n",
 411				args->local_nentries, ch->remote_nentries,
 412				ch->partid, ch->number);
 413
 414			ch->remote_nentries = args->local_nentries;
 415		}
 416		if (args->remote_nentries < ch->local_nentries) {
 417			dev_dbg(xpc_chan, "XPC_CHCTL_OPENREPLY: new "
 418				"local_nentries=%d, old local_nentries=%d, "
 419				"partid=%d, channel=%d\n",
 420				args->remote_nentries, ch->local_nentries,
 421				ch->partid, ch->number);
 422
 423			ch->local_nentries = args->remote_nentries;
 424		}
 425
 426		xpc_process_connect(ch, &irq_flags);
 427	}
 428
 429	if (chctl_flags & XPC_CHCTL_OPENCOMPLETE) {
 430
 431		dev_dbg(xpc_chan, "XPC_CHCTL_OPENCOMPLETE received from "
 432			"partid=%d, channel=%d\n", ch->partid, ch->number);
 433
 434		if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED))
 435			goto out;
 436
 437		if (!(ch->flags & XPC_C_OPENREQUEST) ||
 438		    !(ch->flags & XPC_C_OPENREPLY)) {
 439			XPC_DISCONNECT_CHANNEL(ch, xpOpenCloseError,
 440					       &irq_flags);
 441			goto out;
 442		}
 443
 444		DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST));
 445		DBUG_ON(!(ch->flags & XPC_C_ROPENREPLY));
 446		DBUG_ON(!(ch->flags & XPC_C_CONNECTED));
 447
 448		ch->flags |= XPC_C_ROPENCOMPLETE;
 449
 450		xpc_process_connect(ch, &irq_flags);
 451		create_kthread = 1;
 452	}
 453
 454out:
 455	spin_unlock_irqrestore(&ch->lock, irq_flags);
 456
 457	if (create_kthread)
 458		xpc_create_kthreads(ch, 1, 0);
 459}
 460
 461/*
 462 * Attempt to establish a channel connection to a remote partition.
 463 */
 464static enum xp_retval
 465xpc_connect_channel(struct xpc_channel *ch)
 466{
 467	unsigned long irq_flags;
 468	struct xpc_registration *registration = &xpc_registrations[ch->number];
 469
 470	if (mutex_trylock(&registration->mutex) == 0)
 471		return xpRetry;
 472
 473	if (!XPC_CHANNEL_REGISTERED(ch->number)) {
 474		mutex_unlock(&registration->mutex);
 475		return xpUnregistered;
 476	}
 477
 478	spin_lock_irqsave(&ch->lock, irq_flags);
 479
 480	DBUG_ON(ch->flags & XPC_C_CONNECTED);
 481	DBUG_ON(ch->flags & XPC_C_OPENREQUEST);
 482
 483	if (ch->flags & XPC_C_DISCONNECTING) {
 484		spin_unlock_irqrestore(&ch->lock, irq_flags);
 485		mutex_unlock(&registration->mutex);
 486		return ch->reason;
 487	}
 488
 489	/* add info from the channel connect registration to the channel */
 490
 491	ch->kthreads_assigned_limit = registration->assigned_limit;
 492	ch->kthreads_idle_limit = registration->idle_limit;
 493	DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
 494	DBUG_ON(atomic_read(&ch->kthreads_idle) != 0);
 495	DBUG_ON(atomic_read(&ch->kthreads_active) != 0);
 496
 497	ch->func = registration->func;
 498	DBUG_ON(registration->func == NULL);
 499	ch->key = registration->key;
 500
 501	ch->local_nentries = registration->nentries;
 502
 503	if (ch->flags & XPC_C_ROPENREQUEST) {
 504		if (registration->entry_size != ch->entry_size) {
 505			/* the local and remote sides aren't the same */
 506
 507			/*
 508			 * Because XPC_DISCONNECT_CHANNEL() can block we're
 509			 * forced to up the registration sema before we unlock
 510			 * the channel lock. But that's okay here because we're
 511			 * done with the part that required the registration
 512			 * sema. XPC_DISCONNECT_CHANNEL() requires that the
 513			 * channel lock be locked and will unlock and relock
 514			 * the channel lock as needed.
 515			 */
 516			mutex_unlock(&registration->mutex);
 517			XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
 518					       &irq_flags);
 519			spin_unlock_irqrestore(&ch->lock, irq_flags);
 520			return xpUnequalMsgSizes;
 521		}
 522	} else {
 523		ch->entry_size = registration->entry_size;
 524
 525		XPC_SET_REASON(ch, 0, 0);
 526		ch->flags &= ~XPC_C_DISCONNECTED;
 527
 528		atomic_inc(&xpc_partitions[ch->partid].nchannels_active);
 529	}
 530
 531	mutex_unlock(&registration->mutex);
 532
 533	/* initiate the connection */
 534
 535	ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING);
 536	xpc_arch_ops.send_chctl_openrequest(ch, &irq_flags);
 537
 538	xpc_process_connect(ch, &irq_flags);
 539
 540	spin_unlock_irqrestore(&ch->lock, irq_flags);
 541
 542	return xpSuccess;
 543}
 544
 545void
 546xpc_process_sent_chctl_flags(struct xpc_partition *part)
 547{
 548	unsigned long irq_flags;
 549	union xpc_channel_ctl_flags chctl;
 550	struct xpc_channel *ch;
 551	int ch_number;
 552	u32 ch_flags;
 553
 554	chctl.all_flags = xpc_arch_ops.get_chctl_all_flags(part);
 555
 556	/*
 557	 * Initiate channel connections for registered channels.
 558	 *
 559	 * For each connected channel that has pending messages activate idle
 560	 * kthreads and/or create new kthreads as needed.
 561	 */
 562
 563	for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
 564		ch = &part->channels[ch_number];
 565
 566		/*
 567		 * Process any open or close related chctl flags, and then deal
 568		 * with connecting or disconnecting the channel as required.
 569		 */
 570
 571		if (chctl.flags[ch_number] & XPC_OPENCLOSE_CHCTL_FLAGS) {
 572			xpc_process_openclose_chctl_flags(part, ch_number,
 573							chctl.flags[ch_number]);
 574		}
 575
 576		ch_flags = ch->flags;	/* need an atomic snapshot of flags */
 577
 578		if (ch_flags & XPC_C_DISCONNECTING) {
 579			spin_lock_irqsave(&ch->lock, irq_flags);
 580			xpc_process_disconnect(ch, &irq_flags);
 581			spin_unlock_irqrestore(&ch->lock, irq_flags);
 582			continue;
 583		}
 584
 585		if (part->act_state == XPC_P_AS_DEACTIVATING)
 586			continue;
 587
 588		if (!(ch_flags & XPC_C_CONNECTED)) {
 589			if (!(ch_flags & XPC_C_OPENREQUEST)) {
 590				DBUG_ON(ch_flags & XPC_C_SETUP);
 591				(void)xpc_connect_channel(ch);
 592			}
 593			continue;
 594		}
 595
 596		/*
 597		 * Process any message related chctl flags, this may involve
 598		 * the activation of kthreads to deliver any pending messages
 599		 * sent from the other partition.
 600		 */
 601
 602		if (chctl.flags[ch_number] & XPC_MSG_CHCTL_FLAGS)
 603			xpc_arch_ops.process_msg_chctl_flags(part, ch_number);
 604	}
 605}
 606
 607/*
 608 * XPC's heartbeat code calls this function to inform XPC that a partition is
 609 * going down.  XPC responds by tearing down the XPartition Communication
 610 * infrastructure used for the just downed partition.
 611 *
 612 * XPC's heartbeat code will never call this function and xpc_partition_up()
 613 * at the same time. Nor will it ever make multiple calls to either function
 614 * at the same time.
 615 */
 616void
 617xpc_partition_going_down(struct xpc_partition *part, enum xp_retval reason)
 618{
 619	unsigned long irq_flags;
 620	int ch_number;
 621	struct xpc_channel *ch;
 622
 623	dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n",
 624		XPC_PARTID(part), reason);
 625
 626	if (!xpc_part_ref(part)) {
 627		/* infrastructure for this partition isn't currently set up */
 628		return;
 629	}
 630
 631	/* disconnect channels associated with the partition going down */
 632
 633	for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
 634		ch = &part->channels[ch_number];
 635
 636		xpc_msgqueue_ref(ch);
 637		spin_lock_irqsave(&ch->lock, irq_flags);
 638
 639		XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
 640
 641		spin_unlock_irqrestore(&ch->lock, irq_flags);
 642		xpc_msgqueue_deref(ch);
 643	}
 644
 645	xpc_wakeup_channel_mgr(part);
 646
 647	xpc_part_deref(part);
 648}
 649
 650/*
 651 * Called by XP at the time of channel connection registration to cause
 652 * XPC to establish connections to all currently active partitions.
 653 */
 654void
 655xpc_initiate_connect(int ch_number)
 656{
 657	short partid;
 658	struct xpc_partition *part;
 659
 660	DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
 661
 662	for (partid = 0; partid < xp_max_npartitions; partid++) {
 663		part = &xpc_partitions[partid];
 664
 665		if (xpc_part_ref(part)) {
 666			/*
 667			 * Initiate the establishment of a connection on the
 668			 * newly registered channel to the remote partition.
 669			 */
 670			xpc_wakeup_channel_mgr(part);
 671			xpc_part_deref(part);
 672		}
 673	}
 674}
 675
 676void
 677xpc_connected_callout(struct xpc_channel *ch)
 678{
 679	/* let the registerer know that a connection has been established */
 680
 681	if (ch->func != NULL) {
 682		dev_dbg(xpc_chan, "ch->func() called, reason=xpConnected, "
 683			"partid=%d, channel=%d\n", ch->partid, ch->number);
 684
 685		ch->func(xpConnected, ch->partid, ch->number,
 686			 (void *)(u64)ch->local_nentries, ch->key);
 687
 688		dev_dbg(xpc_chan, "ch->func() returned, reason=xpConnected, "
 689			"partid=%d, channel=%d\n", ch->partid, ch->number);
 690	}
 691}
 692
 693/*
 694 * Called by XP at the time of channel connection unregistration to cause
 695 * XPC to teardown all current connections for the specified channel.
 696 *
 697 * Before returning xpc_initiate_disconnect() will wait until all connections
 698 * on the specified channel have been closed/torndown. So the caller can be
 699 * assured that they will not be receiving any more callouts from XPC to the
 700 * function they registered via xpc_connect().
 701 *
 702 * Arguments:
 703 *
 704 *	ch_number - channel # to unregister.
 705 */
 706void
 707xpc_initiate_disconnect(int ch_number)
 708{
 709	unsigned long irq_flags;
 710	short partid;
 711	struct xpc_partition *part;
 712	struct xpc_channel *ch;
 713
 714	DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
 715
 716	/* initiate the channel disconnect for every active partition */
 717	for (partid = 0; partid < xp_max_npartitions; partid++) {
 718		part = &xpc_partitions[partid];
 719
 720		if (xpc_part_ref(part)) {
 721			ch = &part->channels[ch_number];
 722			xpc_msgqueue_ref(ch);
 723
 724			spin_lock_irqsave(&ch->lock, irq_flags);
 725
 726			if (!(ch->flags & XPC_C_DISCONNECTED)) {
 727				ch->flags |= XPC_C_WDISCONNECT;
 728
 729				XPC_DISCONNECT_CHANNEL(ch, xpUnregistering,
 730						       &irq_flags);
 731			}
 732
 733			spin_unlock_irqrestore(&ch->lock, irq_flags);
 734
 735			xpc_msgqueue_deref(ch);
 736			xpc_part_deref(part);
 737		}
 738	}
 739
 740	xpc_disconnect_wait(ch_number);
 741}
 742
 743/*
 744 * To disconnect a channel, and reflect it back to all who may be waiting.
 745 *
 746 * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
 747 * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
 748 * xpc_disconnect_wait().
 749 *
 750 * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
 751 */
 752void
 753xpc_disconnect_channel(const int line, struct xpc_channel *ch,
 754		       enum xp_retval reason, unsigned long *irq_flags)
 755{
 756	u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED);
 757
 758	lockdep_assert_held(&ch->lock);
 759
 760	if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED))
 761		return;
 762
 763	DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED)));
 764
 765	dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n",
 766		reason, line, ch->partid, ch->number);
 767
 768	XPC_SET_REASON(ch, reason, line);
 769
 770	ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
 771	/* some of these may not have been set */
 772	ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY |
 773		       XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
 774		       XPC_C_CONNECTING | XPC_C_CONNECTED);
 775
 776	xpc_arch_ops.send_chctl_closerequest(ch, irq_flags);
 777
 778	if (channel_was_connected)
 779		ch->flags |= XPC_C_WASCONNECTED;
 780
 781	spin_unlock_irqrestore(&ch->lock, *irq_flags);
 782
 783	/* wake all idle kthreads so they can exit */
 784	if (atomic_read(&ch->kthreads_idle) > 0) {
 785		wake_up_all(&ch->idle_wq);
 786
 787	} else if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
 788		   !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) {
 789		/* start a kthread that will do the xpDisconnecting callout */
 790		xpc_create_kthreads(ch, 1, 1);
 791	}
 792
 793	/* wake those waiting to allocate an entry from the local msg queue */
 794	if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
 795		wake_up(&ch->msg_allocate_wq);
 796
 797	spin_lock_irqsave(&ch->lock, *irq_flags);
 798}
 799
 800void
 801xpc_disconnect_callout(struct xpc_channel *ch, enum xp_retval reason)
 802{
 803	/*
 804	 * Let the channel's registerer know that the channel is being
 805	 * disconnected. We don't want to do this if the registerer was never
 806	 * informed of a connection being made.
 807	 */
 808
 809	if (ch->func != NULL) {
 810		dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, "
 811			"channel=%d\n", reason, ch->partid, ch->number);
 812
 813		ch->func(reason, ch->partid, ch->number, NULL, ch->key);
 814
 815		dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, "
 816			"channel=%d\n", reason, ch->partid, ch->number);
 817	}
 818}
 819
 820/*
 821 * Wait for a message entry to become available for the specified channel,
 822 * but don't wait any longer than 1 jiffy.
 823 */
 824enum xp_retval
 825xpc_allocate_msg_wait(struct xpc_channel *ch)
 826{
 827	enum xp_retval ret;
 828	DEFINE_WAIT(wait);
 829
 830	if (ch->flags & XPC_C_DISCONNECTING) {
 831		DBUG_ON(ch->reason == xpInterrupted);
 832		return ch->reason;
 833	}
 834
 835	atomic_inc(&ch->n_on_msg_allocate_wq);
 836	prepare_to_wait(&ch->msg_allocate_wq, &wait, TASK_INTERRUPTIBLE);
 837	ret = schedule_timeout(1);
 838	finish_wait(&ch->msg_allocate_wq, &wait);
 839	atomic_dec(&ch->n_on_msg_allocate_wq);
 840
 841	if (ch->flags & XPC_C_DISCONNECTING) {
 842		ret = ch->reason;
 843		DBUG_ON(ch->reason == xpInterrupted);
 844	} else if (ret == 0) {
 845		ret = xpTimeout;
 846	} else {
 847		ret = xpInterrupted;
 848	}
 849
 850	return ret;
 851}
 852
 853/*
 854 * Send a message that contains the user's payload on the specified channel
 855 * connected to the specified partition.
 856 *
 857 * NOTE that this routine can sleep waiting for a message entry to become
 858 * available. To not sleep, pass in the XPC_NOWAIT flag.
 859 *
 860 * Once sent, this routine will not wait for the message to be received, nor
 861 * will notification be given when it does happen.
 862 *
 863 * Arguments:
 864 *
 865 *	partid - ID of partition to which the channel is connected.
 866 *	ch_number - channel # to send message on.
 867 *	flags - see xp.h for valid flags.
 868 *	payload - pointer to the payload which is to be sent.
 869 *	payload_size - size of the payload in bytes.
 870 */
 871enum xp_retval
 872xpc_initiate_send(short partid, int ch_number, u32 flags, void *payload,
 873		  u16 payload_size)
 874{
 875	struct xpc_partition *part = &xpc_partitions[partid];
 876	enum xp_retval ret = xpUnknownReason;
 877
 878	dev_dbg(xpc_chan, "payload=0x%p, partid=%d, channel=%d\n", payload,
 879		partid, ch_number);
 880
 881	DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
 882	DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
 883	DBUG_ON(payload == NULL);
 884
 885	if (xpc_part_ref(part)) {
 886		ret = xpc_arch_ops.send_payload(&part->channels[ch_number],
 887				  flags, payload, payload_size, 0, NULL, NULL);
 888		xpc_part_deref(part);
 889	}
 890
 891	return ret;
 892}
 893
 894/*
 895 * Send a message that contains the user's payload on the specified channel
 896 * connected to the specified partition.
 897 *
 898 * NOTE that this routine can sleep waiting for a message entry to become
 899 * available. To not sleep, pass in the XPC_NOWAIT flag.
 900 *
 901 * This routine will not wait for the message to be sent or received.
 902 *
 903 * Once the remote end of the channel has received the message, the function
 904 * passed as an argument to xpc_initiate_send_notify() will be called. This
 905 * allows the sender to free up or re-use any buffers referenced by the
 906 * message, but does NOT mean the message has been processed at the remote
 907 * end by a receiver.
 908 *
 909 * If this routine returns an error, the caller's function will NOT be called.
 910 *
 911 * Arguments:
 912 *
 913 *	partid - ID of partition to which the channel is connected.
 914 *	ch_number - channel # to send message on.
 915 *	flags - see xp.h for valid flags.
 916 *	payload - pointer to the payload which is to be sent.
 917 *	payload_size - size of the payload in bytes.
 918 *	func - function to call with asynchronous notification of message
 919 *		  receipt. THIS FUNCTION MUST BE NON-BLOCKING.
 920 *	key - user-defined key to be passed to the function when it's called.
 921 */
 922enum xp_retval
 923xpc_initiate_send_notify(short partid, int ch_number, u32 flags, void *payload,
 924			 u16 payload_size, xpc_notify_func func, void *key)
 925{
 926	struct xpc_partition *part = &xpc_partitions[partid];
 927	enum xp_retval ret = xpUnknownReason;
 928
 929	dev_dbg(xpc_chan, "payload=0x%p, partid=%d, channel=%d\n", payload,
 930		partid, ch_number);
 931
 932	DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
 933	DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
 934	DBUG_ON(payload == NULL);
 935	DBUG_ON(func == NULL);
 936
 937	if (xpc_part_ref(part)) {
 938		ret = xpc_arch_ops.send_payload(&part->channels[ch_number],
 939			  flags, payload, payload_size, XPC_N_CALL, func, key);
 940		xpc_part_deref(part);
 941	}
 942	return ret;
 943}
 944
 945/*
 946 * Deliver a message's payload to its intended recipient.
 947 */
 948void
 949xpc_deliver_payload(struct xpc_channel *ch)
 950{
 951	void *payload;
 952
 953	payload = xpc_arch_ops.get_deliverable_payload(ch);
 954	if (payload != NULL) {
 955
 956		/*
 957		 * This ref is taken to protect the payload itself from being
 958		 * freed before the user is finished with it, which the user
 959		 * indicates by calling xpc_initiate_received().
 960		 */
 961		xpc_msgqueue_ref(ch);
 962
 963		atomic_inc(&ch->kthreads_active);
 964
 965		if (ch->func != NULL) {
 966			dev_dbg(xpc_chan, "ch->func() called, payload=0x%p "
 967				"partid=%d channel=%d\n", payload, ch->partid,
 968				ch->number);
 969
 970			/* deliver the message to its intended recipient */
 971			ch->func(xpMsgReceived, ch->partid, ch->number, payload,
 972				 ch->key);
 973
 974			dev_dbg(xpc_chan, "ch->func() returned, payload=0x%p "
 975				"partid=%d channel=%d\n", payload, ch->partid,
 976				ch->number);
 977		}
 978
 979		atomic_dec(&ch->kthreads_active);
 980	}
 981}
 982
 983/*
 984 * Acknowledge receipt of a delivered message's payload.
 985 *
 986 * This function, although called by users, does not call xpc_part_ref() to
 987 * ensure that the partition infrastructure is in place. It relies on the
 988 * fact that we called xpc_msgqueue_ref() in xpc_deliver_payload().
 989 *
 990 * Arguments:
 991 *
 992 *	partid - ID of partition to which the channel is connected.
 993 *	ch_number - channel # message received on.
 994 *	payload - pointer to the payload area allocated via
 995 *			xpc_initiate_send() or xpc_initiate_send_notify().
 996 */
 997void
 998xpc_initiate_received(short partid, int ch_number, void *payload)
 999{
1000	struct xpc_partition *part = &xpc_partitions[partid];
1001	struct xpc_channel *ch;
1002
1003	DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1004	DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1005
1006	ch = &part->channels[ch_number];
1007	xpc_arch_ops.received_payload(ch, payload);
1008
1009	/* the call to xpc_msgqueue_ref() was done by xpc_deliver_payload()  */
1010	xpc_msgqueue_deref(ch);
1011}