Linux Audio

Check our new training course

Embedded Linux training

Mar 31-Apr 8, 2025
Register
Loading...
Note: File does not exist in v4.6.
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 *   Copyright (C) 2017, Microsoft Corporation.
   4 *
   5 *   Author(s): Long Li <longli@microsoft.com>
   6 */
   7#include <linux/module.h>
   8#include <linux/highmem.h>
   9#include "smbdirect.h"
  10#include "cifs_debug.h"
  11#include "cifsproto.h"
  12#include "smb2proto.h"
  13
  14static struct smbd_response *get_empty_queue_buffer(
  15		struct smbd_connection *info);
  16static struct smbd_response *get_receive_buffer(
  17		struct smbd_connection *info);
  18static void put_receive_buffer(
  19		struct smbd_connection *info,
  20		struct smbd_response *response);
  21static int allocate_receive_buffers(struct smbd_connection *info, int num_buf);
  22static void destroy_receive_buffers(struct smbd_connection *info);
  23
  24static void put_empty_packet(
  25		struct smbd_connection *info, struct smbd_response *response);
  26static void enqueue_reassembly(
  27		struct smbd_connection *info,
  28		struct smbd_response *response, int data_length);
  29static struct smbd_response *_get_first_reassembly(
  30		struct smbd_connection *info);
  31
  32static int smbd_post_recv(
  33		struct smbd_connection *info,
  34		struct smbd_response *response);
  35
  36static int smbd_post_send_empty(struct smbd_connection *info);
  37static int smbd_post_send_data(
  38		struct smbd_connection *info,
  39		struct kvec *iov, int n_vec, int remaining_data_length);
  40static int smbd_post_send_page(struct smbd_connection *info,
  41		struct page *page, unsigned long offset,
  42		size_t size, int remaining_data_length);
  43
  44static void destroy_mr_list(struct smbd_connection *info);
  45static int allocate_mr_list(struct smbd_connection *info);
  46
  47/* SMBD version number */
  48#define SMBD_V1	0x0100
  49
  50/* Port numbers for SMBD transport */
  51#define SMB_PORT	445
  52#define SMBD_PORT	5445
  53
  54/* Address lookup and resolve timeout in ms */
  55#define RDMA_RESOLVE_TIMEOUT	5000
  56
  57/* SMBD negotiation timeout in seconds */
  58#define SMBD_NEGOTIATE_TIMEOUT	120
  59
  60/* SMBD minimum receive size and fragmented sized defined in [MS-SMBD] */
  61#define SMBD_MIN_RECEIVE_SIZE		128
  62#define SMBD_MIN_FRAGMENTED_SIZE	131072
  63
  64/*
  65 * Default maximum number of RDMA read/write outstanding on this connection
  66 * This value is possibly decreased during QP creation on hardware limit
  67 */
  68#define SMBD_CM_RESPONDER_RESOURCES	32
  69
  70/* Maximum number of retries on data transfer operations */
  71#define SMBD_CM_RETRY			6
  72/* No need to retry on Receiver Not Ready since SMBD manages credits */
  73#define SMBD_CM_RNR_RETRY		0
  74
  75/*
  76 * User configurable initial values per SMBD transport connection
  77 * as defined in [MS-SMBD] 3.1.1.1
  78 * Those may change after a SMBD negotiation
  79 */
  80/* The local peer's maximum number of credits to grant to the peer */
  81int smbd_receive_credit_max = 255;
  82
  83/* The remote peer's credit request of local peer */
  84int smbd_send_credit_target = 255;
  85
  86/* The maximum single message size can be sent to remote peer */
  87int smbd_max_send_size = 1364;
  88
  89/*  The maximum fragmented upper-layer payload receive size supported */
  90int smbd_max_fragmented_recv_size = 1024 * 1024;
  91
  92/*  The maximum single-message size which can be received */
  93int smbd_max_receive_size = 8192;
  94
  95/* The timeout to initiate send of a keepalive message on idle */
  96int smbd_keep_alive_interval = 120;
  97
  98/*
  99 * User configurable initial values for RDMA transport
 100 * The actual values used may be lower and are limited to hardware capabilities
 101 */
 102/* Default maximum number of SGEs in a RDMA write/read */
 103int smbd_max_frmr_depth = 2048;
 104
 105/* If payload is less than this byte, use RDMA send/recv not read/write */
 106int rdma_readwrite_threshold = 4096;
 107
 108/* Transport logging functions
 109 * Logging are defined as classes. They can be OR'ed to define the actual
 110 * logging level via module parameter smbd_logging_class
 111 * e.g. cifs.smbd_logging_class=0xa0 will log all log_rdma_recv() and
 112 * log_rdma_event()
 113 */
 114#define LOG_OUTGOING			0x1
 115#define LOG_INCOMING			0x2
 116#define LOG_READ			0x4
 117#define LOG_WRITE			0x8
 118#define LOG_RDMA_SEND			0x10
 119#define LOG_RDMA_RECV			0x20
 120#define LOG_KEEP_ALIVE			0x40
 121#define LOG_RDMA_EVENT			0x80
 122#define LOG_RDMA_MR			0x100
 123static unsigned int smbd_logging_class;
 124module_param(smbd_logging_class, uint, 0644);
 125MODULE_PARM_DESC(smbd_logging_class,
 126	"Logging class for SMBD transport 0x0 to 0x100");
 127
 128#define ERR		0x0
 129#define INFO		0x1
 130static unsigned int smbd_logging_level = ERR;
 131module_param(smbd_logging_level, uint, 0644);
 132MODULE_PARM_DESC(smbd_logging_level,
 133	"Logging level for SMBD transport, 0 (default): error, 1: info");
 134
 135#define log_rdma(level, class, fmt, args...)				\
 136do {									\
 137	if (level <= smbd_logging_level || class & smbd_logging_class)	\
 138		cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
 139} while (0)
 140
 141#define log_outgoing(level, fmt, args...) \
 142		log_rdma(level, LOG_OUTGOING, fmt, ##args)
 143#define log_incoming(level, fmt, args...) \
 144		log_rdma(level, LOG_INCOMING, fmt, ##args)
 145#define log_read(level, fmt, args...)	log_rdma(level, LOG_READ, fmt, ##args)
 146#define log_write(level, fmt, args...)	log_rdma(level, LOG_WRITE, fmt, ##args)
 147#define log_rdma_send(level, fmt, args...) \
 148		log_rdma(level, LOG_RDMA_SEND, fmt, ##args)
 149#define log_rdma_recv(level, fmt, args...) \
 150		log_rdma(level, LOG_RDMA_RECV, fmt, ##args)
 151#define log_keep_alive(level, fmt, args...) \
 152		log_rdma(level, LOG_KEEP_ALIVE, fmt, ##args)
 153#define log_rdma_event(level, fmt, args...) \
 154		log_rdma(level, LOG_RDMA_EVENT, fmt, ##args)
 155#define log_rdma_mr(level, fmt, args...) \
 156		log_rdma(level, LOG_RDMA_MR, fmt, ##args)
 157
 158static void smbd_disconnect_rdma_work(struct work_struct *work)
 159{
 160	struct smbd_connection *info =
 161		container_of(work, struct smbd_connection, disconnect_work);
 162
 163	if (info->transport_status == SMBD_CONNECTED) {
 164		info->transport_status = SMBD_DISCONNECTING;
 165		rdma_disconnect(info->id);
 166	}
 167}
 168
 169static void smbd_disconnect_rdma_connection(struct smbd_connection *info)
 170{
 171	queue_work(info->workqueue, &info->disconnect_work);
 172}
 173
 174/* Upcall from RDMA CM */
 175static int smbd_conn_upcall(
 176		struct rdma_cm_id *id, struct rdma_cm_event *event)
 177{
 178	struct smbd_connection *info = id->context;
 179
 180	log_rdma_event(INFO, "event=%d status=%d\n",
 181		event->event, event->status);
 182
 183	switch (event->event) {
 184	case RDMA_CM_EVENT_ADDR_RESOLVED:
 185	case RDMA_CM_EVENT_ROUTE_RESOLVED:
 186		info->ri_rc = 0;
 187		complete(&info->ri_done);
 188		break;
 189
 190	case RDMA_CM_EVENT_ADDR_ERROR:
 191		info->ri_rc = -EHOSTUNREACH;
 192		complete(&info->ri_done);
 193		break;
 194
 195	case RDMA_CM_EVENT_ROUTE_ERROR:
 196		info->ri_rc = -ENETUNREACH;
 197		complete(&info->ri_done);
 198		break;
 199
 200	case RDMA_CM_EVENT_ESTABLISHED:
 201		log_rdma_event(INFO, "connected event=%d\n", event->event);
 202		info->transport_status = SMBD_CONNECTED;
 203		wake_up_interruptible(&info->conn_wait);
 204		break;
 205
 206	case RDMA_CM_EVENT_CONNECT_ERROR:
 207	case RDMA_CM_EVENT_UNREACHABLE:
 208	case RDMA_CM_EVENT_REJECTED:
 209		log_rdma_event(INFO, "connecting failed event=%d\n", event->event);
 210		info->transport_status = SMBD_DISCONNECTED;
 211		wake_up_interruptible(&info->conn_wait);
 212		break;
 213
 214	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 215	case RDMA_CM_EVENT_DISCONNECTED:
 216		/* This happenes when we fail the negotiation */
 217		if (info->transport_status == SMBD_NEGOTIATE_FAILED) {
 218			info->transport_status = SMBD_DISCONNECTED;
 219			wake_up(&info->conn_wait);
 220			break;
 221		}
 222
 223		info->transport_status = SMBD_DISCONNECTED;
 224		wake_up_interruptible(&info->disconn_wait);
 225		wake_up_interruptible(&info->wait_reassembly_queue);
 226		wake_up_interruptible_all(&info->wait_send_queue);
 227		break;
 228
 229	default:
 230		break;
 231	}
 232
 233	return 0;
 234}
 235
 236/* Upcall from RDMA QP */
 237static void
 238smbd_qp_async_error_upcall(struct ib_event *event, void *context)
 239{
 240	struct smbd_connection *info = context;
 241
 242	log_rdma_event(ERR, "%s on device %s info %p\n",
 243		ib_event_msg(event->event), event->device->name, info);
 244
 245	switch (event->event) {
 246	case IB_EVENT_CQ_ERR:
 247	case IB_EVENT_QP_FATAL:
 248		smbd_disconnect_rdma_connection(info);
 249
 250	default:
 251		break;
 252	}
 253}
 254
 255static inline void *smbd_request_payload(struct smbd_request *request)
 256{
 257	return (void *)request->packet;
 258}
 259
 260static inline void *smbd_response_payload(struct smbd_response *response)
 261{
 262	return (void *)response->packet;
 263}
 264
 265/* Called when a RDMA send is done */
 266static void send_done(struct ib_cq *cq, struct ib_wc *wc)
 267{
 268	int i;
 269	struct smbd_request *request =
 270		container_of(wc->wr_cqe, struct smbd_request, cqe);
 271
 272	log_rdma_send(INFO, "smbd_request %p completed wc->status=%d\n",
 273		request, wc->status);
 274
 275	if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_SEND) {
 276		log_rdma_send(ERR, "wc->status=%d wc->opcode=%d\n",
 277			wc->status, wc->opcode);
 278		smbd_disconnect_rdma_connection(request->info);
 279	}
 280
 281	for (i = 0; i < request->num_sge; i++)
 282		ib_dma_unmap_single(request->info->id->device,
 283			request->sge[i].addr,
 284			request->sge[i].length,
 285			DMA_TO_DEVICE);
 286
 287	if (atomic_dec_and_test(&request->info->send_pending))
 288		wake_up(&request->info->wait_send_pending);
 289
 290	wake_up(&request->info->wait_post_send);
 291
 292	mempool_free(request, request->info->request_mempool);
 293}
 294
 295static void dump_smbd_negotiate_resp(struct smbd_negotiate_resp *resp)
 296{
 297	log_rdma_event(INFO, "resp message min_version %u max_version %u negotiated_version %u credits_requested %u credits_granted %u status %u max_readwrite_size %u preferred_send_size %u max_receive_size %u max_fragmented_size %u\n",
 298		       resp->min_version, resp->max_version,
 299		       resp->negotiated_version, resp->credits_requested,
 300		       resp->credits_granted, resp->status,
 301		       resp->max_readwrite_size, resp->preferred_send_size,
 302		       resp->max_receive_size, resp->max_fragmented_size);
 303}
 304
 305/*
 306 * Process a negotiation response message, according to [MS-SMBD]3.1.5.7
 307 * response, packet_length: the negotiation response message
 308 * return value: true if negotiation is a success, false if failed
 309 */
 310static bool process_negotiation_response(
 311		struct smbd_response *response, int packet_length)
 312{
 313	struct smbd_connection *info = response->info;
 314	struct smbd_negotiate_resp *packet = smbd_response_payload(response);
 315
 316	if (packet_length < sizeof(struct smbd_negotiate_resp)) {
 317		log_rdma_event(ERR,
 318			"error: packet_length=%d\n", packet_length);
 319		return false;
 320	}
 321
 322	if (le16_to_cpu(packet->negotiated_version) != SMBD_V1) {
 323		log_rdma_event(ERR, "error: negotiated_version=%x\n",
 324			le16_to_cpu(packet->negotiated_version));
 325		return false;
 326	}
 327	info->protocol = le16_to_cpu(packet->negotiated_version);
 328
 329	if (packet->credits_requested == 0) {
 330		log_rdma_event(ERR, "error: credits_requested==0\n");
 331		return false;
 332	}
 333	info->receive_credit_target = le16_to_cpu(packet->credits_requested);
 334
 335	if (packet->credits_granted == 0) {
 336		log_rdma_event(ERR, "error: credits_granted==0\n");
 337		return false;
 338	}
 339	atomic_set(&info->send_credits, le16_to_cpu(packet->credits_granted));
 340
 341	atomic_set(&info->receive_credits, 0);
 342
 343	if (le32_to_cpu(packet->preferred_send_size) > info->max_receive_size) {
 344		log_rdma_event(ERR, "error: preferred_send_size=%d\n",
 345			le32_to_cpu(packet->preferred_send_size));
 346		return false;
 347	}
 348	info->max_receive_size = le32_to_cpu(packet->preferred_send_size);
 349
 350	if (le32_to_cpu(packet->max_receive_size) < SMBD_MIN_RECEIVE_SIZE) {
 351		log_rdma_event(ERR, "error: max_receive_size=%d\n",
 352			le32_to_cpu(packet->max_receive_size));
 353		return false;
 354	}
 355	info->max_send_size = min_t(int, info->max_send_size,
 356					le32_to_cpu(packet->max_receive_size));
 357
 358	if (le32_to_cpu(packet->max_fragmented_size) <
 359			SMBD_MIN_FRAGMENTED_SIZE) {
 360		log_rdma_event(ERR, "error: max_fragmented_size=%d\n",
 361			le32_to_cpu(packet->max_fragmented_size));
 362		return false;
 363	}
 364	info->max_fragmented_send_size =
 365		le32_to_cpu(packet->max_fragmented_size);
 366	info->rdma_readwrite_threshold =
 367		rdma_readwrite_threshold > info->max_fragmented_send_size ?
 368		info->max_fragmented_send_size :
 369		rdma_readwrite_threshold;
 370
 371
 372	info->max_readwrite_size = min_t(u32,
 373			le32_to_cpu(packet->max_readwrite_size),
 374			info->max_frmr_depth * PAGE_SIZE);
 375	info->max_frmr_depth = info->max_readwrite_size / PAGE_SIZE;
 376
 377	return true;
 378}
 379
 380static void smbd_post_send_credits(struct work_struct *work)
 381{
 382	int ret = 0;
 383	int use_receive_queue = 1;
 384	int rc;
 385	struct smbd_response *response;
 386	struct smbd_connection *info =
 387		container_of(work, struct smbd_connection,
 388			post_send_credits_work);
 389
 390	if (info->transport_status != SMBD_CONNECTED) {
 391		wake_up(&info->wait_receive_queues);
 392		return;
 393	}
 394
 395	if (info->receive_credit_target >
 396		atomic_read(&info->receive_credits)) {
 397		while (true) {
 398			if (use_receive_queue)
 399				response = get_receive_buffer(info);
 400			else
 401				response = get_empty_queue_buffer(info);
 402			if (!response) {
 403				/* now switch to emtpy packet queue */
 404				if (use_receive_queue) {
 405					use_receive_queue = 0;
 406					continue;
 407				} else
 408					break;
 409			}
 410
 411			response->type = SMBD_TRANSFER_DATA;
 412			response->first_segment = false;
 413			rc = smbd_post_recv(info, response);
 414			if (rc) {
 415				log_rdma_recv(ERR,
 416					"post_recv failed rc=%d\n", rc);
 417				put_receive_buffer(info, response);
 418				break;
 419			}
 420
 421			ret++;
 422		}
 423	}
 424
 425	spin_lock(&info->lock_new_credits_offered);
 426	info->new_credits_offered += ret;
 427	spin_unlock(&info->lock_new_credits_offered);
 428
 429	/* Promptly send an immediate packet as defined in [MS-SMBD] 3.1.1.1 */
 430	info->send_immediate = true;
 431	if (atomic_read(&info->receive_credits) <
 432		info->receive_credit_target - 1) {
 433		if (info->keep_alive_requested == KEEP_ALIVE_PENDING ||
 434		    info->send_immediate) {
 435			log_keep_alive(INFO, "send an empty message\n");
 436			smbd_post_send_empty(info);
 437		}
 438	}
 439}
 440
 441/* Called from softirq, when recv is done */
 442static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
 443{
 444	struct smbd_data_transfer *data_transfer;
 445	struct smbd_response *response =
 446		container_of(wc->wr_cqe, struct smbd_response, cqe);
 447	struct smbd_connection *info = response->info;
 448	int data_length = 0;
 449
 450	log_rdma_recv(INFO, "response=%p type=%d wc status=%d wc opcode %d byte_len=%d pkey_index=%x\n",
 451		      response, response->type, wc->status, wc->opcode,
 452		      wc->byte_len, wc->pkey_index);
 453
 454	if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_RECV) {
 455		log_rdma_recv(INFO, "wc->status=%d opcode=%d\n",
 456			wc->status, wc->opcode);
 457		smbd_disconnect_rdma_connection(info);
 458		goto error;
 459	}
 460
 461	ib_dma_sync_single_for_cpu(
 462		wc->qp->device,
 463		response->sge.addr,
 464		response->sge.length,
 465		DMA_FROM_DEVICE);
 466
 467	switch (response->type) {
 468	/* SMBD negotiation response */
 469	case SMBD_NEGOTIATE_RESP:
 470		dump_smbd_negotiate_resp(smbd_response_payload(response));
 471		info->full_packet_received = true;
 472		info->negotiate_done =
 473			process_negotiation_response(response, wc->byte_len);
 474		complete(&info->negotiate_completion);
 475		break;
 476
 477	/* SMBD data transfer packet */
 478	case SMBD_TRANSFER_DATA:
 479		data_transfer = smbd_response_payload(response);
 480		data_length = le32_to_cpu(data_transfer->data_length);
 481
 482		/*
 483		 * If this is a packet with data playload place the data in
 484		 * reassembly queue and wake up the reading thread
 485		 */
 486		if (data_length) {
 487			if (info->full_packet_received)
 488				response->first_segment = true;
 489
 490			if (le32_to_cpu(data_transfer->remaining_data_length))
 491				info->full_packet_received = false;
 492			else
 493				info->full_packet_received = true;
 494
 495			enqueue_reassembly(
 496				info,
 497				response,
 498				data_length);
 499		} else
 500			put_empty_packet(info, response);
 501
 502		if (data_length)
 503			wake_up_interruptible(&info->wait_reassembly_queue);
 504
 505		atomic_dec(&info->receive_credits);
 506		info->receive_credit_target =
 507			le16_to_cpu(data_transfer->credits_requested);
 508		if (le16_to_cpu(data_transfer->credits_granted)) {
 509			atomic_add(le16_to_cpu(data_transfer->credits_granted),
 510				&info->send_credits);
 511			/*
 512			 * We have new send credits granted from remote peer
 513			 * If any sender is waiting for credits, unblock it
 514			 */
 515			wake_up_interruptible(&info->wait_send_queue);
 516		}
 517
 518		log_incoming(INFO, "data flags %d data_offset %d data_length %d remaining_data_length %d\n",
 519			     le16_to_cpu(data_transfer->flags),
 520			     le32_to_cpu(data_transfer->data_offset),
 521			     le32_to_cpu(data_transfer->data_length),
 522			     le32_to_cpu(data_transfer->remaining_data_length));
 523
 524		/* Send a KEEP_ALIVE response right away if requested */
 525		info->keep_alive_requested = KEEP_ALIVE_NONE;
 526		if (le16_to_cpu(data_transfer->flags) &
 527				SMB_DIRECT_RESPONSE_REQUESTED) {
 528			info->keep_alive_requested = KEEP_ALIVE_PENDING;
 529		}
 530
 531		return;
 532
 533	default:
 534		log_rdma_recv(ERR,
 535			"unexpected response type=%d\n", response->type);
 536	}
 537
 538error:
 539	put_receive_buffer(info, response);
 540}
 541
 542static struct rdma_cm_id *smbd_create_id(
 543		struct smbd_connection *info,
 544		struct sockaddr *dstaddr, int port)
 545{
 546	struct rdma_cm_id *id;
 547	int rc;
 548	__be16 *sport;
 549
 550	id = rdma_create_id(&init_net, smbd_conn_upcall, info,
 551		RDMA_PS_TCP, IB_QPT_RC);
 552	if (IS_ERR(id)) {
 553		rc = PTR_ERR(id);
 554		log_rdma_event(ERR, "rdma_create_id() failed %i\n", rc);
 555		return id;
 556	}
 557
 558	if (dstaddr->sa_family == AF_INET6)
 559		sport = &((struct sockaddr_in6 *)dstaddr)->sin6_port;
 560	else
 561		sport = &((struct sockaddr_in *)dstaddr)->sin_port;
 562
 563	*sport = htons(port);
 564
 565	init_completion(&info->ri_done);
 566	info->ri_rc = -ETIMEDOUT;
 567
 568	rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)dstaddr,
 569		RDMA_RESOLVE_TIMEOUT);
 570	if (rc) {
 571		log_rdma_event(ERR, "rdma_resolve_addr() failed %i\n", rc);
 572		goto out;
 573	}
 574	wait_for_completion_interruptible_timeout(
 575		&info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 576	rc = info->ri_rc;
 577	if (rc) {
 578		log_rdma_event(ERR, "rdma_resolve_addr() completed %i\n", rc);
 579		goto out;
 580	}
 581
 582	info->ri_rc = -ETIMEDOUT;
 583	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 584	if (rc) {
 585		log_rdma_event(ERR, "rdma_resolve_route() failed %i\n", rc);
 586		goto out;
 587	}
 588	wait_for_completion_interruptible_timeout(
 589		&info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 590	rc = info->ri_rc;
 591	if (rc) {
 592		log_rdma_event(ERR, "rdma_resolve_route() completed %i\n", rc);
 593		goto out;
 594	}
 595
 596	return id;
 597
 598out:
 599	rdma_destroy_id(id);
 600	return ERR_PTR(rc);
 601}
 602
 603/*
 604 * Test if FRWR (Fast Registration Work Requests) is supported on the device
 605 * This implementation requries FRWR on RDMA read/write
 606 * return value: true if it is supported
 607 */
 608static bool frwr_is_supported(struct ib_device_attr *attrs)
 609{
 610	if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
 611		return false;
 612	if (attrs->max_fast_reg_page_list_len == 0)
 613		return false;
 614	return true;
 615}
 616
 617static int smbd_ia_open(
 618		struct smbd_connection *info,
 619		struct sockaddr *dstaddr, int port)
 620{
 621	int rc;
 622
 623	info->id = smbd_create_id(info, dstaddr, port);
 624	if (IS_ERR(info->id)) {
 625		rc = PTR_ERR(info->id);
 626		goto out1;
 627	}
 628
 629	if (!frwr_is_supported(&info->id->device->attrs)) {
 630		log_rdma_event(ERR, "Fast Registration Work Requests (FRWR) is not supported\n");
 631		log_rdma_event(ERR, "Device capability flags = %llx max_fast_reg_page_list_len = %u\n",
 632			       info->id->device->attrs.device_cap_flags,
 633			       info->id->device->attrs.max_fast_reg_page_list_len);
 634		rc = -EPROTONOSUPPORT;
 635		goto out2;
 636	}
 637	info->max_frmr_depth = min_t(int,
 638		smbd_max_frmr_depth,
 639		info->id->device->attrs.max_fast_reg_page_list_len);
 640	info->mr_type = IB_MR_TYPE_MEM_REG;
 641	if (info->id->device->attrs.device_cap_flags & IB_DEVICE_SG_GAPS_REG)
 642		info->mr_type = IB_MR_TYPE_SG_GAPS;
 643
 644	info->pd = ib_alloc_pd(info->id->device, 0);
 645	if (IS_ERR(info->pd)) {
 646		rc = PTR_ERR(info->pd);
 647		log_rdma_event(ERR, "ib_alloc_pd() returned %d\n", rc);
 648		goto out2;
 649	}
 650
 651	return 0;
 652
 653out2:
 654	rdma_destroy_id(info->id);
 655	info->id = NULL;
 656
 657out1:
 658	return rc;
 659}
 660
 661/*
 662 * Send a negotiation request message to the peer
 663 * The negotiation procedure is in [MS-SMBD] 3.1.5.2 and 3.1.5.3
 664 * After negotiation, the transport is connected and ready for
 665 * carrying upper layer SMB payload
 666 */
 667static int smbd_post_send_negotiate_req(struct smbd_connection *info)
 668{
 669	struct ib_send_wr send_wr;
 670	int rc = -ENOMEM;
 671	struct smbd_request *request;
 672	struct smbd_negotiate_req *packet;
 673
 674	request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 675	if (!request)
 676		return rc;
 677
 678	request->info = info;
 679
 680	packet = smbd_request_payload(request);
 681	packet->min_version = cpu_to_le16(SMBD_V1);
 682	packet->max_version = cpu_to_le16(SMBD_V1);
 683	packet->reserved = 0;
 684	packet->credits_requested = cpu_to_le16(info->send_credit_target);
 685	packet->preferred_send_size = cpu_to_le32(info->max_send_size);
 686	packet->max_receive_size = cpu_to_le32(info->max_receive_size);
 687	packet->max_fragmented_size =
 688		cpu_to_le32(info->max_fragmented_recv_size);
 689
 690	request->num_sge = 1;
 691	request->sge[0].addr = ib_dma_map_single(
 692				info->id->device, (void *)packet,
 693				sizeof(*packet), DMA_TO_DEVICE);
 694	if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 695		rc = -EIO;
 696		goto dma_mapping_failed;
 697	}
 698
 699	request->sge[0].length = sizeof(*packet);
 700	request->sge[0].lkey = info->pd->local_dma_lkey;
 701
 702	ib_dma_sync_single_for_device(
 703		info->id->device, request->sge[0].addr,
 704		request->sge[0].length, DMA_TO_DEVICE);
 705
 706	request->cqe.done = send_done;
 707
 708	send_wr.next = NULL;
 709	send_wr.wr_cqe = &request->cqe;
 710	send_wr.sg_list = request->sge;
 711	send_wr.num_sge = request->num_sge;
 712	send_wr.opcode = IB_WR_SEND;
 713	send_wr.send_flags = IB_SEND_SIGNALED;
 714
 715	log_rdma_send(INFO, "sge addr=%llx length=%x lkey=%x\n",
 716		request->sge[0].addr,
 717		request->sge[0].length, request->sge[0].lkey);
 718
 719	atomic_inc(&info->send_pending);
 720	rc = ib_post_send(info->id->qp, &send_wr, NULL);
 721	if (!rc)
 722		return 0;
 723
 724	/* if we reach here, post send failed */
 725	log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
 726	atomic_dec(&info->send_pending);
 727	ib_dma_unmap_single(info->id->device, request->sge[0].addr,
 728		request->sge[0].length, DMA_TO_DEVICE);
 729
 730	smbd_disconnect_rdma_connection(info);
 731
 732dma_mapping_failed:
 733	mempool_free(request, info->request_mempool);
 734	return rc;
 735}
 736
 737/*
 738 * Extend the credits to remote peer
 739 * This implements [MS-SMBD] 3.1.5.9
 740 * The idea is that we should extend credits to remote peer as quickly as
 741 * it's allowed, to maintain data flow. We allocate as much receive
 742 * buffer as possible, and extend the receive credits to remote peer
 743 * return value: the new credtis being granted.
 744 */
 745static int manage_credits_prior_sending(struct smbd_connection *info)
 746{
 747	int new_credits;
 748
 749	spin_lock(&info->lock_new_credits_offered);
 750	new_credits = info->new_credits_offered;
 751	info->new_credits_offered = 0;
 752	spin_unlock(&info->lock_new_credits_offered);
 753
 754	return new_credits;
 755}
 756
 757/*
 758 * Check if we need to send a KEEP_ALIVE message
 759 * The idle connection timer triggers a KEEP_ALIVE message when expires
 760 * SMB_DIRECT_RESPONSE_REQUESTED is set in the message flag to have peer send
 761 * back a response.
 762 * return value:
 763 * 1 if SMB_DIRECT_RESPONSE_REQUESTED needs to be set
 764 * 0: otherwise
 765 */
 766static int manage_keep_alive_before_sending(struct smbd_connection *info)
 767{
 768	if (info->keep_alive_requested == KEEP_ALIVE_PENDING) {
 769		info->keep_alive_requested = KEEP_ALIVE_SENT;
 770		return 1;
 771	}
 772	return 0;
 773}
 774
 775/* Post the send request */
 776static int smbd_post_send(struct smbd_connection *info,
 777		struct smbd_request *request)
 778{
 779	struct ib_send_wr send_wr;
 780	int rc, i;
 781
 782	for (i = 0; i < request->num_sge; i++) {
 783		log_rdma_send(INFO,
 784			"rdma_request sge[%d] addr=%llu length=%u\n",
 785			i, request->sge[i].addr, request->sge[i].length);
 786		ib_dma_sync_single_for_device(
 787			info->id->device,
 788			request->sge[i].addr,
 789			request->sge[i].length,
 790			DMA_TO_DEVICE);
 791	}
 792
 793	request->cqe.done = send_done;
 794
 795	send_wr.next = NULL;
 796	send_wr.wr_cqe = &request->cqe;
 797	send_wr.sg_list = request->sge;
 798	send_wr.num_sge = request->num_sge;
 799	send_wr.opcode = IB_WR_SEND;
 800	send_wr.send_flags = IB_SEND_SIGNALED;
 801
 802	rc = ib_post_send(info->id->qp, &send_wr, NULL);
 803	if (rc) {
 804		log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
 805		smbd_disconnect_rdma_connection(info);
 806		rc = -EAGAIN;
 807	} else
 808		/* Reset timer for idle connection after packet is sent */
 809		mod_delayed_work(info->workqueue, &info->idle_timer_work,
 810			info->keep_alive_interval*HZ);
 811
 812	return rc;
 813}
 814
 815static int smbd_post_send_sgl(struct smbd_connection *info,
 816	struct scatterlist *sgl, int data_length, int remaining_data_length)
 817{
 818	int num_sgs;
 819	int i, rc;
 820	int header_length;
 821	struct smbd_request *request;
 822	struct smbd_data_transfer *packet;
 823	int new_credits;
 824	struct scatterlist *sg;
 825
 826wait_credit:
 827	/* Wait for send credits. A SMBD packet needs one credit */
 828	rc = wait_event_interruptible(info->wait_send_queue,
 829		atomic_read(&info->send_credits) > 0 ||
 830		info->transport_status != SMBD_CONNECTED);
 831	if (rc)
 832		goto err_wait_credit;
 833
 834	if (info->transport_status != SMBD_CONNECTED) {
 835		log_outgoing(ERR, "disconnected not sending on wait_credit\n");
 836		rc = -EAGAIN;
 837		goto err_wait_credit;
 838	}
 839	if (unlikely(atomic_dec_return(&info->send_credits) < 0)) {
 840		atomic_inc(&info->send_credits);
 841		goto wait_credit;
 842	}
 843
 844wait_send_queue:
 845	wait_event(info->wait_post_send,
 846		atomic_read(&info->send_pending) < info->send_credit_target ||
 847		info->transport_status != SMBD_CONNECTED);
 848
 849	if (info->transport_status != SMBD_CONNECTED) {
 850		log_outgoing(ERR, "disconnected not sending on wait_send_queue\n");
 851		rc = -EAGAIN;
 852		goto err_wait_send_queue;
 853	}
 854
 855	if (unlikely(atomic_inc_return(&info->send_pending) >
 856				info->send_credit_target)) {
 857		atomic_dec(&info->send_pending);
 858		goto wait_send_queue;
 859	}
 860
 861	request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 862	if (!request) {
 863		rc = -ENOMEM;
 864		goto err_alloc;
 865	}
 866
 867	request->info = info;
 868
 869	/* Fill in the packet header */
 870	packet = smbd_request_payload(request);
 871	packet->credits_requested = cpu_to_le16(info->send_credit_target);
 872
 873	new_credits = manage_credits_prior_sending(info);
 874	atomic_add(new_credits, &info->receive_credits);
 875	packet->credits_granted = cpu_to_le16(new_credits);
 876
 877	info->send_immediate = false;
 878
 879	packet->flags = 0;
 880	if (manage_keep_alive_before_sending(info))
 881		packet->flags |= cpu_to_le16(SMB_DIRECT_RESPONSE_REQUESTED);
 882
 883	packet->reserved = 0;
 884	if (!data_length)
 885		packet->data_offset = 0;
 886	else
 887		packet->data_offset = cpu_to_le32(24);
 888	packet->data_length = cpu_to_le32(data_length);
 889	packet->remaining_data_length = cpu_to_le32(remaining_data_length);
 890	packet->padding = 0;
 891
 892	log_outgoing(INFO, "credits_requested=%d credits_granted=%d data_offset=%d data_length=%d remaining_data_length=%d\n",
 893		     le16_to_cpu(packet->credits_requested),
 894		     le16_to_cpu(packet->credits_granted),
 895		     le32_to_cpu(packet->data_offset),
 896		     le32_to_cpu(packet->data_length),
 897		     le32_to_cpu(packet->remaining_data_length));
 898
 899	/* Map the packet to DMA */
 900	header_length = sizeof(struct smbd_data_transfer);
 901	/* If this is a packet without payload, don't send padding */
 902	if (!data_length)
 903		header_length = offsetof(struct smbd_data_transfer, padding);
 904
 905	request->num_sge = 1;
 906	request->sge[0].addr = ib_dma_map_single(info->id->device,
 907						 (void *)packet,
 908						 header_length,
 909						 DMA_TO_DEVICE);
 910	if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 911		rc = -EIO;
 912		request->sge[0].addr = 0;
 913		goto err_dma;
 914	}
 915
 916	request->sge[0].length = header_length;
 917	request->sge[0].lkey = info->pd->local_dma_lkey;
 918
 919	/* Fill in the packet data payload */
 920	num_sgs = sgl ? sg_nents(sgl) : 0;
 921	for_each_sg(sgl, sg, num_sgs, i) {
 922		request->sge[i+1].addr =
 923			ib_dma_map_page(info->id->device, sg_page(sg),
 924			       sg->offset, sg->length, DMA_TO_DEVICE);
 925		if (ib_dma_mapping_error(
 926				info->id->device, request->sge[i+1].addr)) {
 927			rc = -EIO;
 928			request->sge[i+1].addr = 0;
 929			goto err_dma;
 930		}
 931		request->sge[i+1].length = sg->length;
 932		request->sge[i+1].lkey = info->pd->local_dma_lkey;
 933		request->num_sge++;
 934	}
 935
 936	rc = smbd_post_send(info, request);
 937	if (!rc)
 938		return 0;
 939
 940err_dma:
 941	for (i = 0; i < request->num_sge; i++)
 942		if (request->sge[i].addr)
 943			ib_dma_unmap_single(info->id->device,
 944					    request->sge[i].addr,
 945					    request->sge[i].length,
 946					    DMA_TO_DEVICE);
 947	mempool_free(request, info->request_mempool);
 948
 949	/* roll back receive credits and credits to be offered */
 950	spin_lock(&info->lock_new_credits_offered);
 951	info->new_credits_offered += new_credits;
 952	spin_unlock(&info->lock_new_credits_offered);
 953	atomic_sub(new_credits, &info->receive_credits);
 954
 955err_alloc:
 956	if (atomic_dec_and_test(&info->send_pending))
 957		wake_up(&info->wait_send_pending);
 958
 959err_wait_send_queue:
 960	/* roll back send credits and pending */
 961	atomic_inc(&info->send_credits);
 962
 963err_wait_credit:
 964	return rc;
 965}
 966
 967/*
 968 * Send a page
 969 * page: the page to send
 970 * offset: offset in the page to send
 971 * size: length in the page to send
 972 * remaining_data_length: remaining data to send in this payload
 973 */
 974static int smbd_post_send_page(struct smbd_connection *info, struct page *page,
 975		unsigned long offset, size_t size, int remaining_data_length)
 976{
 977	struct scatterlist sgl;
 978
 979	sg_init_table(&sgl, 1);
 980	sg_set_page(&sgl, page, size, offset);
 981
 982	return smbd_post_send_sgl(info, &sgl, size, remaining_data_length);
 983}
 984
 985/*
 986 * Send an empty message
 987 * Empty message is used to extend credits to peer to for keep live
 988 * while there is no upper layer payload to send at the time
 989 */
 990static int smbd_post_send_empty(struct smbd_connection *info)
 991{
 992	info->count_send_empty++;
 993	return smbd_post_send_sgl(info, NULL, 0, 0);
 994}
 995
 996/*
 997 * Send a data buffer
 998 * iov: the iov array describing the data buffers
 999 * n_vec: number of iov array
1000 * remaining_data_length: remaining data to send following this packet
1001 * in segmented SMBD packet
1002 */
1003static int smbd_post_send_data(
1004	struct smbd_connection *info, struct kvec *iov, int n_vec,
1005	int remaining_data_length)
1006{
1007	int i;
1008	u32 data_length = 0;
1009	struct scatterlist sgl[SMBDIRECT_MAX_SGE];
1010
1011	if (n_vec > SMBDIRECT_MAX_SGE) {
1012		cifs_dbg(VFS, "Can't fit data to SGL, n_vec=%d\n", n_vec);
1013		return -EINVAL;
1014	}
1015
1016	sg_init_table(sgl, n_vec);
1017	for (i = 0; i < n_vec; i++) {
1018		data_length += iov[i].iov_len;
1019		sg_set_buf(&sgl[i], iov[i].iov_base, iov[i].iov_len);
1020	}
1021
1022	return smbd_post_send_sgl(info, sgl, data_length, remaining_data_length);
1023}
1024
1025/*
1026 * Post a receive request to the transport
1027 * The remote peer can only send data when a receive request is posted
1028 * The interaction is controlled by send/receive credit system
1029 */
1030static int smbd_post_recv(
1031		struct smbd_connection *info, struct smbd_response *response)
1032{
1033	struct ib_recv_wr recv_wr;
1034	int rc = -EIO;
1035
1036	response->sge.addr = ib_dma_map_single(
1037				info->id->device, response->packet,
1038				info->max_receive_size, DMA_FROM_DEVICE);
1039	if (ib_dma_mapping_error(info->id->device, response->sge.addr))
1040		return rc;
1041
1042	response->sge.length = info->max_receive_size;
1043	response->sge.lkey = info->pd->local_dma_lkey;
1044
1045	response->cqe.done = recv_done;
1046
1047	recv_wr.wr_cqe = &response->cqe;
1048	recv_wr.next = NULL;
1049	recv_wr.sg_list = &response->sge;
1050	recv_wr.num_sge = 1;
1051
1052	rc = ib_post_recv(info->id->qp, &recv_wr, NULL);
1053	if (rc) {
1054		ib_dma_unmap_single(info->id->device, response->sge.addr,
1055				    response->sge.length, DMA_FROM_DEVICE);
1056		smbd_disconnect_rdma_connection(info);
1057		log_rdma_recv(ERR, "ib_post_recv failed rc=%d\n", rc);
1058	}
1059
1060	return rc;
1061}
1062
1063/* Perform SMBD negotiate according to [MS-SMBD] 3.1.5.2 */
1064static int smbd_negotiate(struct smbd_connection *info)
1065{
1066	int rc;
1067	struct smbd_response *response = get_receive_buffer(info);
1068
1069	response->type = SMBD_NEGOTIATE_RESP;
1070	rc = smbd_post_recv(info, response);
1071	log_rdma_event(INFO, "smbd_post_recv rc=%d iov.addr=%llx iov.length=%x iov.lkey=%x\n",
1072		       rc, response->sge.addr,
1073		       response->sge.length, response->sge.lkey);
1074	if (rc)
1075		return rc;
1076
1077	init_completion(&info->negotiate_completion);
1078	info->negotiate_done = false;
1079	rc = smbd_post_send_negotiate_req(info);
1080	if (rc)
1081		return rc;
1082
1083	rc = wait_for_completion_interruptible_timeout(
1084		&info->negotiate_completion, SMBD_NEGOTIATE_TIMEOUT * HZ);
1085	log_rdma_event(INFO, "wait_for_completion_timeout rc=%d\n", rc);
1086
1087	if (info->negotiate_done)
1088		return 0;
1089
1090	if (rc == 0)
1091		rc = -ETIMEDOUT;
1092	else if (rc == -ERESTARTSYS)
1093		rc = -EINTR;
1094	else
1095		rc = -ENOTCONN;
1096
1097	return rc;
1098}
1099
1100static void put_empty_packet(
1101		struct smbd_connection *info, struct smbd_response *response)
1102{
1103	spin_lock(&info->empty_packet_queue_lock);
1104	list_add_tail(&response->list, &info->empty_packet_queue);
1105	info->count_empty_packet_queue++;
1106	spin_unlock(&info->empty_packet_queue_lock);
1107
1108	queue_work(info->workqueue, &info->post_send_credits_work);
1109}
1110
1111/*
1112 * Implement Connection.FragmentReassemblyBuffer defined in [MS-SMBD] 3.1.1.1
1113 * This is a queue for reassembling upper layer payload and present to upper
1114 * layer. All the inncoming payload go to the reassembly queue, regardless of
1115 * if reassembly is required. The uuper layer code reads from the queue for all
1116 * incoming payloads.
1117 * Put a received packet to the reassembly queue
1118 * response: the packet received
1119 * data_length: the size of payload in this packet
1120 */
1121static void enqueue_reassembly(
1122	struct smbd_connection *info,
1123	struct smbd_response *response,
1124	int data_length)
1125{
1126	spin_lock(&info->reassembly_queue_lock);
1127	list_add_tail(&response->list, &info->reassembly_queue);
1128	info->reassembly_queue_length++;
1129	/*
1130	 * Make sure reassembly_data_length is updated after list and
1131	 * reassembly_queue_length are updated. On the dequeue side
1132	 * reassembly_data_length is checked without a lock to determine
1133	 * if reassembly_queue_length and list is up to date
1134	 */
1135	virt_wmb();
1136	info->reassembly_data_length += data_length;
1137	spin_unlock(&info->reassembly_queue_lock);
1138	info->count_reassembly_queue++;
1139	info->count_enqueue_reassembly_queue++;
1140}
1141
1142/*
1143 * Get the first entry at the front of reassembly queue
1144 * Caller is responsible for locking
1145 * return value: the first entry if any, NULL if queue is empty
1146 */
1147static struct smbd_response *_get_first_reassembly(struct smbd_connection *info)
1148{
1149	struct smbd_response *ret = NULL;
1150
1151	if (!list_empty(&info->reassembly_queue)) {
1152		ret = list_first_entry(
1153			&info->reassembly_queue,
1154			struct smbd_response, list);
1155	}
1156	return ret;
1157}
1158
1159static struct smbd_response *get_empty_queue_buffer(
1160		struct smbd_connection *info)
1161{
1162	struct smbd_response *ret = NULL;
1163	unsigned long flags;
1164
1165	spin_lock_irqsave(&info->empty_packet_queue_lock, flags);
1166	if (!list_empty(&info->empty_packet_queue)) {
1167		ret = list_first_entry(
1168			&info->empty_packet_queue,
1169			struct smbd_response, list);
1170		list_del(&ret->list);
1171		info->count_empty_packet_queue--;
1172	}
1173	spin_unlock_irqrestore(&info->empty_packet_queue_lock, flags);
1174
1175	return ret;
1176}
1177
1178/*
1179 * Get a receive buffer
1180 * For each remote send, we need to post a receive. The receive buffers are
1181 * pre-allocated in advance.
1182 * return value: the receive buffer, NULL if none is available
1183 */
1184static struct smbd_response *get_receive_buffer(struct smbd_connection *info)
1185{
1186	struct smbd_response *ret = NULL;
1187	unsigned long flags;
1188
1189	spin_lock_irqsave(&info->receive_queue_lock, flags);
1190	if (!list_empty(&info->receive_queue)) {
1191		ret = list_first_entry(
1192			&info->receive_queue,
1193			struct smbd_response, list);
1194		list_del(&ret->list);
1195		info->count_receive_queue--;
1196		info->count_get_receive_buffer++;
1197	}
1198	spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1199
1200	return ret;
1201}
1202
1203/*
1204 * Return a receive buffer
1205 * Upon returning of a receive buffer, we can post new receive and extend
1206 * more receive credits to remote peer. This is done immediately after a
1207 * receive buffer is returned.
1208 */
1209static void put_receive_buffer(
1210	struct smbd_connection *info, struct smbd_response *response)
1211{
1212	unsigned long flags;
1213
1214	ib_dma_unmap_single(info->id->device, response->sge.addr,
1215		response->sge.length, DMA_FROM_DEVICE);
1216
1217	spin_lock_irqsave(&info->receive_queue_lock, flags);
1218	list_add_tail(&response->list, &info->receive_queue);
1219	info->count_receive_queue++;
1220	info->count_put_receive_buffer++;
1221	spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1222
1223	queue_work(info->workqueue, &info->post_send_credits_work);
1224}
1225
1226/* Preallocate all receive buffer on transport establishment */
1227static int allocate_receive_buffers(struct smbd_connection *info, int num_buf)
1228{
1229	int i;
1230	struct smbd_response *response;
1231
1232	INIT_LIST_HEAD(&info->reassembly_queue);
1233	spin_lock_init(&info->reassembly_queue_lock);
1234	info->reassembly_data_length = 0;
1235	info->reassembly_queue_length = 0;
1236
1237	INIT_LIST_HEAD(&info->receive_queue);
1238	spin_lock_init(&info->receive_queue_lock);
1239	info->count_receive_queue = 0;
1240
1241	INIT_LIST_HEAD(&info->empty_packet_queue);
1242	spin_lock_init(&info->empty_packet_queue_lock);
1243	info->count_empty_packet_queue = 0;
1244
1245	init_waitqueue_head(&info->wait_receive_queues);
1246
1247	for (i = 0; i < num_buf; i++) {
1248		response = mempool_alloc(info->response_mempool, GFP_KERNEL);
1249		if (!response)
1250			goto allocate_failed;
1251
1252		response->info = info;
1253		list_add_tail(&response->list, &info->receive_queue);
1254		info->count_receive_queue++;
1255	}
1256
1257	return 0;
1258
1259allocate_failed:
1260	while (!list_empty(&info->receive_queue)) {
1261		response = list_first_entry(
1262				&info->receive_queue,
1263				struct smbd_response, list);
1264		list_del(&response->list);
1265		info->count_receive_queue--;
1266
1267		mempool_free(response, info->response_mempool);
1268	}
1269	return -ENOMEM;
1270}
1271
1272static void destroy_receive_buffers(struct smbd_connection *info)
1273{
1274	struct smbd_response *response;
1275
1276	while ((response = get_receive_buffer(info)))
1277		mempool_free(response, info->response_mempool);
1278
1279	while ((response = get_empty_queue_buffer(info)))
1280		mempool_free(response, info->response_mempool);
1281}
1282
1283/* Implement idle connection timer [MS-SMBD] 3.1.6.2 */
1284static void idle_connection_timer(struct work_struct *work)
1285{
1286	struct smbd_connection *info = container_of(
1287					work, struct smbd_connection,
1288					idle_timer_work.work);
1289
1290	if (info->keep_alive_requested != KEEP_ALIVE_NONE) {
1291		log_keep_alive(ERR,
1292			"error status info->keep_alive_requested=%d\n",
1293			info->keep_alive_requested);
1294		smbd_disconnect_rdma_connection(info);
1295		return;
1296	}
1297
1298	log_keep_alive(INFO, "about to send an empty idle message\n");
1299	smbd_post_send_empty(info);
1300
1301	/* Setup the next idle timeout work */
1302	queue_delayed_work(info->workqueue, &info->idle_timer_work,
1303			info->keep_alive_interval*HZ);
1304}
1305
1306/*
1307 * Destroy the transport and related RDMA and memory resources
1308 * Need to go through all the pending counters and make sure on one is using
1309 * the transport while it is destroyed
1310 */
1311void smbd_destroy(struct TCP_Server_Info *server)
1312{
1313	struct smbd_connection *info = server->smbd_conn;
1314	struct smbd_response *response;
1315	unsigned long flags;
1316
1317	if (!info) {
1318		log_rdma_event(INFO, "rdma session already destroyed\n");
1319		return;
1320	}
1321
1322	log_rdma_event(INFO, "destroying rdma session\n");
1323	if (info->transport_status != SMBD_DISCONNECTED) {
1324		rdma_disconnect(server->smbd_conn->id);
1325		log_rdma_event(INFO, "wait for transport being disconnected\n");
1326		wait_event_interruptible(
1327			info->disconn_wait,
1328			info->transport_status == SMBD_DISCONNECTED);
1329	}
1330
1331	log_rdma_event(INFO, "destroying qp\n");
1332	ib_drain_qp(info->id->qp);
1333	rdma_destroy_qp(info->id);
1334
1335	log_rdma_event(INFO, "cancelling idle timer\n");
1336	cancel_delayed_work_sync(&info->idle_timer_work);
1337
1338	log_rdma_event(INFO, "wait for all send posted to IB to finish\n");
1339	wait_event(info->wait_send_pending,
1340		atomic_read(&info->send_pending) == 0);
1341
1342	/* It's not posssible for upper layer to get to reassembly */
1343	log_rdma_event(INFO, "drain the reassembly queue\n");
1344	do {
1345		spin_lock_irqsave(&info->reassembly_queue_lock, flags);
1346		response = _get_first_reassembly(info);
1347		if (response) {
1348			list_del(&response->list);
1349			spin_unlock_irqrestore(
1350				&info->reassembly_queue_lock, flags);
1351			put_receive_buffer(info, response);
1352		} else
1353			spin_unlock_irqrestore(
1354				&info->reassembly_queue_lock, flags);
1355	} while (response);
1356	info->reassembly_data_length = 0;
1357
1358	log_rdma_event(INFO, "free receive buffers\n");
1359	wait_event(info->wait_receive_queues,
1360		info->count_receive_queue + info->count_empty_packet_queue
1361			== info->receive_credit_max);
1362	destroy_receive_buffers(info);
1363
1364	/*
1365	 * For performance reasons, memory registration and deregistration
1366	 * are not locked by srv_mutex. It is possible some processes are
1367	 * blocked on transport srv_mutex while holding memory registration.
1368	 * Release the transport srv_mutex to allow them to hit the failure
1369	 * path when sending data, and then release memory registartions.
1370	 */
1371	log_rdma_event(INFO, "freeing mr list\n");
1372	wake_up_interruptible_all(&info->wait_mr);
1373	while (atomic_read(&info->mr_used_count)) {
1374		mutex_unlock(&server->srv_mutex);
1375		msleep(1000);
1376		mutex_lock(&server->srv_mutex);
1377	}
1378	destroy_mr_list(info);
1379
1380	ib_free_cq(info->send_cq);
1381	ib_free_cq(info->recv_cq);
1382	ib_dealloc_pd(info->pd);
1383	rdma_destroy_id(info->id);
1384
1385	/* free mempools */
1386	mempool_destroy(info->request_mempool);
1387	kmem_cache_destroy(info->request_cache);
1388
1389	mempool_destroy(info->response_mempool);
1390	kmem_cache_destroy(info->response_cache);
1391
1392	info->transport_status = SMBD_DESTROYED;
1393
1394	destroy_workqueue(info->workqueue);
1395	log_rdma_event(INFO,  "rdma session destroyed\n");
1396	kfree(info);
1397}
1398
1399/*
1400 * Reconnect this SMBD connection, called from upper layer
1401 * return value: 0 on success, or actual error code
1402 */
1403int smbd_reconnect(struct TCP_Server_Info *server)
1404{
1405	log_rdma_event(INFO, "reconnecting rdma session\n");
1406
1407	if (!server->smbd_conn) {
1408		log_rdma_event(INFO, "rdma session already destroyed\n");
1409		goto create_conn;
1410	}
1411
1412	/*
1413	 * This is possible if transport is disconnected and we haven't received
1414	 * notification from RDMA, but upper layer has detected timeout
1415	 */
1416	if (server->smbd_conn->transport_status == SMBD_CONNECTED) {
1417		log_rdma_event(INFO, "disconnecting transport\n");
1418		smbd_destroy(server);
1419	}
1420
1421create_conn:
1422	log_rdma_event(INFO, "creating rdma session\n");
1423	server->smbd_conn = smbd_get_connection(
1424		server, (struct sockaddr *) &server->dstaddr);
1425
1426	if (server->smbd_conn)
1427		cifs_dbg(VFS, "RDMA transport re-established\n");
1428
1429	return server->smbd_conn ? 0 : -ENOENT;
1430}
1431
1432static void destroy_caches_and_workqueue(struct smbd_connection *info)
1433{
1434	destroy_receive_buffers(info);
1435	destroy_workqueue(info->workqueue);
1436	mempool_destroy(info->response_mempool);
1437	kmem_cache_destroy(info->response_cache);
1438	mempool_destroy(info->request_mempool);
1439	kmem_cache_destroy(info->request_cache);
1440}
1441
1442#define MAX_NAME_LEN	80
1443static int allocate_caches_and_workqueue(struct smbd_connection *info)
1444{
1445	char name[MAX_NAME_LEN];
1446	int rc;
1447
1448	scnprintf(name, MAX_NAME_LEN, "smbd_request_%p", info);
1449	info->request_cache =
1450		kmem_cache_create(
1451			name,
1452			sizeof(struct smbd_request) +
1453				sizeof(struct smbd_data_transfer),
1454			0, SLAB_HWCACHE_ALIGN, NULL);
1455	if (!info->request_cache)
1456		return -ENOMEM;
1457
1458	info->request_mempool =
1459		mempool_create(info->send_credit_target, mempool_alloc_slab,
1460			mempool_free_slab, info->request_cache);
1461	if (!info->request_mempool)
1462		goto out1;
1463
1464	scnprintf(name, MAX_NAME_LEN, "smbd_response_%p", info);
1465	info->response_cache =
1466		kmem_cache_create(
1467			name,
1468			sizeof(struct smbd_response) +
1469				info->max_receive_size,
1470			0, SLAB_HWCACHE_ALIGN, NULL);
1471	if (!info->response_cache)
1472		goto out2;
1473
1474	info->response_mempool =
1475		mempool_create(info->receive_credit_max, mempool_alloc_slab,
1476		       mempool_free_slab, info->response_cache);
1477	if (!info->response_mempool)
1478		goto out3;
1479
1480	scnprintf(name, MAX_NAME_LEN, "smbd_%p", info);
1481	info->workqueue = create_workqueue(name);
1482	if (!info->workqueue)
1483		goto out4;
1484
1485	rc = allocate_receive_buffers(info, info->receive_credit_max);
1486	if (rc) {
1487		log_rdma_event(ERR, "failed to allocate receive buffers\n");
1488		goto out5;
1489	}
1490
1491	return 0;
1492
1493out5:
1494	destroy_workqueue(info->workqueue);
1495out4:
1496	mempool_destroy(info->response_mempool);
1497out3:
1498	kmem_cache_destroy(info->response_cache);
1499out2:
1500	mempool_destroy(info->request_mempool);
1501out1:
1502	kmem_cache_destroy(info->request_cache);
1503	return -ENOMEM;
1504}
1505
1506/* Create a SMBD connection, called by upper layer */
1507static struct smbd_connection *_smbd_get_connection(
1508	struct TCP_Server_Info *server, struct sockaddr *dstaddr, int port)
1509{
1510	int rc;
1511	struct smbd_connection *info;
1512	struct rdma_conn_param conn_param;
1513	struct ib_qp_init_attr qp_attr;
1514	struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr;
1515	struct ib_port_immutable port_immutable;
1516	u32 ird_ord_hdr[2];
1517
1518	info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL);
1519	if (!info)
1520		return NULL;
1521
1522	info->transport_status = SMBD_CONNECTING;
1523	rc = smbd_ia_open(info, dstaddr, port);
1524	if (rc) {
1525		log_rdma_event(INFO, "smbd_ia_open rc=%d\n", rc);
1526		goto create_id_failed;
1527	}
1528
1529	if (smbd_send_credit_target > info->id->device->attrs.max_cqe ||
1530	    smbd_send_credit_target > info->id->device->attrs.max_qp_wr) {
1531		log_rdma_event(ERR, "consider lowering send_credit_target = %d. Possible CQE overrun, device reporting max_cpe %d max_qp_wr %d\n",
1532			       smbd_send_credit_target,
1533			       info->id->device->attrs.max_cqe,
1534			       info->id->device->attrs.max_qp_wr);
1535		goto config_failed;
1536	}
1537
1538	if (smbd_receive_credit_max > info->id->device->attrs.max_cqe ||
1539	    smbd_receive_credit_max > info->id->device->attrs.max_qp_wr) {
1540		log_rdma_event(ERR, "consider lowering receive_credit_max = %d. Possible CQE overrun, device reporting max_cpe %d max_qp_wr %d\n",
1541			       smbd_receive_credit_max,
1542			       info->id->device->attrs.max_cqe,
1543			       info->id->device->attrs.max_qp_wr);
1544		goto config_failed;
1545	}
1546
1547	info->receive_credit_max = smbd_receive_credit_max;
1548	info->send_credit_target = smbd_send_credit_target;
1549	info->max_send_size = smbd_max_send_size;
1550	info->max_fragmented_recv_size = smbd_max_fragmented_recv_size;
1551	info->max_receive_size = smbd_max_receive_size;
1552	info->keep_alive_interval = smbd_keep_alive_interval;
1553
1554	if (info->id->device->attrs.max_send_sge < SMBDIRECT_MAX_SGE) {
1555		log_rdma_event(ERR,
1556			"warning: device max_send_sge = %d too small\n",
1557			info->id->device->attrs.max_send_sge);
1558		log_rdma_event(ERR, "Queue Pair creation may fail\n");
1559	}
1560	if (info->id->device->attrs.max_recv_sge < SMBDIRECT_MAX_SGE) {
1561		log_rdma_event(ERR,
1562			"warning: device max_recv_sge = %d too small\n",
1563			info->id->device->attrs.max_recv_sge);
1564		log_rdma_event(ERR, "Queue Pair creation may fail\n");
1565	}
1566
1567	info->send_cq = NULL;
1568	info->recv_cq = NULL;
1569	info->send_cq =
1570		ib_alloc_cq_any(info->id->device, info,
1571				info->send_credit_target, IB_POLL_SOFTIRQ);
1572	if (IS_ERR(info->send_cq)) {
1573		info->send_cq = NULL;
1574		goto alloc_cq_failed;
1575	}
1576
1577	info->recv_cq =
1578		ib_alloc_cq_any(info->id->device, info,
1579				info->receive_credit_max, IB_POLL_SOFTIRQ);
1580	if (IS_ERR(info->recv_cq)) {
1581		info->recv_cq = NULL;
1582		goto alloc_cq_failed;
1583	}
1584
1585	memset(&qp_attr, 0, sizeof(qp_attr));
1586	qp_attr.event_handler = smbd_qp_async_error_upcall;
1587	qp_attr.qp_context = info;
1588	qp_attr.cap.max_send_wr = info->send_credit_target;
1589	qp_attr.cap.max_recv_wr = info->receive_credit_max;
1590	qp_attr.cap.max_send_sge = SMBDIRECT_MAX_SGE;
1591	qp_attr.cap.max_recv_sge = SMBDIRECT_MAX_SGE;
1592	qp_attr.cap.max_inline_data = 0;
1593	qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
1594	qp_attr.qp_type = IB_QPT_RC;
1595	qp_attr.send_cq = info->send_cq;
1596	qp_attr.recv_cq = info->recv_cq;
1597	qp_attr.port_num = ~0;
1598
1599	rc = rdma_create_qp(info->id, info->pd, &qp_attr);
1600	if (rc) {
1601		log_rdma_event(ERR, "rdma_create_qp failed %i\n", rc);
1602		goto create_qp_failed;
1603	}
1604
1605	memset(&conn_param, 0, sizeof(conn_param));
1606	conn_param.initiator_depth = 0;
1607
1608	conn_param.responder_resources =
1609		info->id->device->attrs.max_qp_rd_atom
1610			< SMBD_CM_RESPONDER_RESOURCES ?
1611		info->id->device->attrs.max_qp_rd_atom :
1612		SMBD_CM_RESPONDER_RESOURCES;
1613	info->responder_resources = conn_param.responder_resources;
1614	log_rdma_mr(INFO, "responder_resources=%d\n",
1615		info->responder_resources);
1616
1617	/* Need to send IRD/ORD in private data for iWARP */
1618	info->id->device->ops.get_port_immutable(
1619		info->id->device, info->id->port_num, &port_immutable);
1620	if (port_immutable.core_cap_flags & RDMA_CORE_PORT_IWARP) {
1621		ird_ord_hdr[0] = info->responder_resources;
1622		ird_ord_hdr[1] = 1;
1623		conn_param.private_data = ird_ord_hdr;
1624		conn_param.private_data_len = sizeof(ird_ord_hdr);
1625	} else {
1626		conn_param.private_data = NULL;
1627		conn_param.private_data_len = 0;
1628	}
1629
1630	conn_param.retry_count = SMBD_CM_RETRY;
1631	conn_param.rnr_retry_count = SMBD_CM_RNR_RETRY;
1632	conn_param.flow_control = 0;
1633
1634	log_rdma_event(INFO, "connecting to IP %pI4 port %d\n",
1635		&addr_in->sin_addr, port);
1636
1637	init_waitqueue_head(&info->conn_wait);
1638	init_waitqueue_head(&info->disconn_wait);
1639	init_waitqueue_head(&info->wait_reassembly_queue);
1640	rc = rdma_connect(info->id, &conn_param);
1641	if (rc) {
1642		log_rdma_event(ERR, "rdma_connect() failed with %i\n", rc);
1643		goto rdma_connect_failed;
1644	}
1645
1646	wait_event_interruptible(
1647		info->conn_wait, info->transport_status != SMBD_CONNECTING);
1648
1649	if (info->transport_status != SMBD_CONNECTED) {
1650		log_rdma_event(ERR, "rdma_connect failed port=%d\n", port);
1651		goto rdma_connect_failed;
1652	}
1653
1654	log_rdma_event(INFO, "rdma_connect connected\n");
1655
1656	rc = allocate_caches_and_workqueue(info);
1657	if (rc) {
1658		log_rdma_event(ERR, "cache allocation failed\n");
1659		goto allocate_cache_failed;
1660	}
1661
1662	init_waitqueue_head(&info->wait_send_queue);
1663	INIT_DELAYED_WORK(&info->idle_timer_work, idle_connection_timer);
1664	queue_delayed_work(info->workqueue, &info->idle_timer_work,
1665		info->keep_alive_interval*HZ);
1666
1667	init_waitqueue_head(&info->wait_send_pending);
1668	atomic_set(&info->send_pending, 0);
1669
1670	init_waitqueue_head(&info->wait_post_send);
1671
1672	INIT_WORK(&info->disconnect_work, smbd_disconnect_rdma_work);
1673	INIT_WORK(&info->post_send_credits_work, smbd_post_send_credits);
1674	info->new_credits_offered = 0;
1675	spin_lock_init(&info->lock_new_credits_offered);
1676
1677	rc = smbd_negotiate(info);
1678	if (rc) {
1679		log_rdma_event(ERR, "smbd_negotiate rc=%d\n", rc);
1680		goto negotiation_failed;
1681	}
1682
1683	rc = allocate_mr_list(info);
1684	if (rc) {
1685		log_rdma_mr(ERR, "memory registration allocation failed\n");
1686		goto allocate_mr_failed;
1687	}
1688
1689	return info;
1690
1691allocate_mr_failed:
1692	/* At this point, need to a full transport shutdown */
1693	smbd_destroy(server);
1694	return NULL;
1695
1696negotiation_failed:
1697	cancel_delayed_work_sync(&info->idle_timer_work);
1698	destroy_caches_and_workqueue(info);
1699	info->transport_status = SMBD_NEGOTIATE_FAILED;
1700	init_waitqueue_head(&info->conn_wait);
1701	rdma_disconnect(info->id);
1702	wait_event(info->conn_wait,
1703		info->transport_status == SMBD_DISCONNECTED);
1704
1705allocate_cache_failed:
1706rdma_connect_failed:
1707	rdma_destroy_qp(info->id);
1708
1709create_qp_failed:
1710alloc_cq_failed:
1711	if (info->send_cq)
1712		ib_free_cq(info->send_cq);
1713	if (info->recv_cq)
1714		ib_free_cq(info->recv_cq);
1715
1716config_failed:
1717	ib_dealloc_pd(info->pd);
1718	rdma_destroy_id(info->id);
1719
1720create_id_failed:
1721	kfree(info);
1722	return NULL;
1723}
1724
1725struct smbd_connection *smbd_get_connection(
1726	struct TCP_Server_Info *server, struct sockaddr *dstaddr)
1727{
1728	struct smbd_connection *ret;
1729	int port = SMBD_PORT;
1730
1731try_again:
1732	ret = _smbd_get_connection(server, dstaddr, port);
1733
1734	/* Try SMB_PORT if SMBD_PORT doesn't work */
1735	if (!ret && port == SMBD_PORT) {
1736		port = SMB_PORT;
1737		goto try_again;
1738	}
1739	return ret;
1740}
1741
1742/*
1743 * Receive data from receive reassembly queue
1744 * All the incoming data packets are placed in reassembly queue
1745 * buf: the buffer to read data into
1746 * size: the length of data to read
1747 * return value: actual data read
1748 * Note: this implementation copies the data from reassebmly queue to receive
1749 * buffers used by upper layer. This is not the optimal code path. A better way
1750 * to do it is to not have upper layer allocate its receive buffers but rather
1751 * borrow the buffer from reassembly queue, and return it after data is
1752 * consumed. But this will require more changes to upper layer code, and also
1753 * need to consider packet boundaries while they still being reassembled.
1754 */
1755static int smbd_recv_buf(struct smbd_connection *info, char *buf,
1756		unsigned int size)
1757{
1758	struct smbd_response *response;
1759	struct smbd_data_transfer *data_transfer;
1760	int to_copy, to_read, data_read, offset;
1761	u32 data_length, remaining_data_length, data_offset;
1762	int rc;
1763
1764again:
1765	/*
1766	 * No need to hold the reassembly queue lock all the time as we are
1767	 * the only one reading from the front of the queue. The transport
1768	 * may add more entries to the back of the queue at the same time
1769	 */
1770	log_read(INFO, "size=%d info->reassembly_data_length=%d\n", size,
1771		info->reassembly_data_length);
1772	if (info->reassembly_data_length >= size) {
1773		int queue_length;
1774		int queue_removed = 0;
1775
1776		/*
1777		 * Need to make sure reassembly_data_length is read before
1778		 * reading reassembly_queue_length and calling
1779		 * _get_first_reassembly. This call is lock free
1780		 * as we never read at the end of the queue which are being
1781		 * updated in SOFTIRQ as more data is received
1782		 */
1783		virt_rmb();
1784		queue_length = info->reassembly_queue_length;
1785		data_read = 0;
1786		to_read = size;
1787		offset = info->first_entry_offset;
1788		while (data_read < size) {
1789			response = _get_first_reassembly(info);
1790			data_transfer = smbd_response_payload(response);
1791			data_length = le32_to_cpu(data_transfer->data_length);
1792			remaining_data_length =
1793				le32_to_cpu(
1794					data_transfer->remaining_data_length);
1795			data_offset = le32_to_cpu(data_transfer->data_offset);
1796
1797			/*
1798			 * The upper layer expects RFC1002 length at the
1799			 * beginning of the payload. Return it to indicate
1800			 * the total length of the packet. This minimize the
1801			 * change to upper layer packet processing logic. This
1802			 * will be eventually remove when an intermediate
1803			 * transport layer is added
1804			 */
1805			if (response->first_segment && size == 4) {
1806				unsigned int rfc1002_len =
1807					data_length + remaining_data_length;
1808				*((__be32 *)buf) = cpu_to_be32(rfc1002_len);
1809				data_read = 4;
1810				response->first_segment = false;
1811				log_read(INFO, "returning rfc1002 length %d\n",
1812					rfc1002_len);
1813				goto read_rfc1002_done;
1814			}
1815
1816			to_copy = min_t(int, data_length - offset, to_read);
1817			memcpy(
1818				buf + data_read,
1819				(char *)data_transfer + data_offset + offset,
1820				to_copy);
1821
1822			/* move on to the next buffer? */
1823			if (to_copy == data_length - offset) {
1824				queue_length--;
1825				/*
1826				 * No need to lock if we are not at the
1827				 * end of the queue
1828				 */
1829				if (queue_length)
1830					list_del(&response->list);
1831				else {
1832					spin_lock_irq(
1833						&info->reassembly_queue_lock);
1834					list_del(&response->list);
1835					spin_unlock_irq(
1836						&info->reassembly_queue_lock);
1837				}
1838				queue_removed++;
1839				info->count_reassembly_queue--;
1840				info->count_dequeue_reassembly_queue++;
1841				put_receive_buffer(info, response);
1842				offset = 0;
1843				log_read(INFO, "put_receive_buffer offset=0\n");
1844			} else
1845				offset += to_copy;
1846
1847			to_read -= to_copy;
1848			data_read += to_copy;
1849
1850			log_read(INFO, "_get_first_reassembly memcpy %d bytes data_transfer_length-offset=%d after that to_read=%d data_read=%d offset=%d\n",
1851				 to_copy, data_length - offset,
1852				 to_read, data_read, offset);
1853		}
1854
1855		spin_lock_irq(&info->reassembly_queue_lock);
1856		info->reassembly_data_length -= data_read;
1857		info->reassembly_queue_length -= queue_removed;
1858		spin_unlock_irq(&info->reassembly_queue_lock);
1859
1860		info->first_entry_offset = offset;
1861		log_read(INFO, "returning to thread data_read=%d reassembly_data_length=%d first_entry_offset=%d\n",
1862			 data_read, info->reassembly_data_length,
1863			 info->first_entry_offset);
1864read_rfc1002_done:
1865		return data_read;
1866	}
1867
1868	log_read(INFO, "wait_event on more data\n");
1869	rc = wait_event_interruptible(
1870		info->wait_reassembly_queue,
1871		info->reassembly_data_length >= size ||
1872			info->transport_status != SMBD_CONNECTED);
1873	/* Don't return any data if interrupted */
1874	if (rc)
1875		return rc;
1876
1877	if (info->transport_status != SMBD_CONNECTED) {
1878		log_read(ERR, "disconnected\n");
1879		return -ECONNABORTED;
1880	}
1881
1882	goto again;
1883}
1884
1885/*
1886 * Receive a page from receive reassembly queue
1887 * page: the page to read data into
1888 * to_read: the length of data to read
1889 * return value: actual data read
1890 */
1891static int smbd_recv_page(struct smbd_connection *info,
1892		struct page *page, unsigned int page_offset,
1893		unsigned int to_read)
1894{
1895	int ret;
1896	char *to_address;
1897	void *page_address;
1898
1899	/* make sure we have the page ready for read */
1900	ret = wait_event_interruptible(
1901		info->wait_reassembly_queue,
1902		info->reassembly_data_length >= to_read ||
1903			info->transport_status != SMBD_CONNECTED);
1904	if (ret)
1905		return ret;
1906
1907	/* now we can read from reassembly queue and not sleep */
1908	page_address = kmap_atomic(page);
1909	to_address = (char *) page_address + page_offset;
1910
1911	log_read(INFO, "reading from page=%p address=%p to_read=%d\n",
1912		page, to_address, to_read);
1913
1914	ret = smbd_recv_buf(info, to_address, to_read);
1915	kunmap_atomic(page_address);
1916
1917	return ret;
1918}
1919
1920/*
1921 * Receive data from transport
1922 * msg: a msghdr point to the buffer, can be ITER_KVEC or ITER_BVEC
1923 * return: total bytes read, or 0. SMB Direct will not do partial read.
1924 */
1925int smbd_recv(struct smbd_connection *info, struct msghdr *msg)
1926{
1927	char *buf;
1928	struct page *page;
1929	unsigned int to_read, page_offset;
1930	int rc;
1931
1932	if (iov_iter_rw(&msg->msg_iter) == WRITE) {
1933		/* It's a bug in upper layer to get there */
1934		cifs_dbg(VFS, "Invalid msg iter dir %u\n",
1935			 iov_iter_rw(&msg->msg_iter));
1936		rc = -EINVAL;
1937		goto out;
1938	}
1939
1940	switch (iov_iter_type(&msg->msg_iter)) {
1941	case ITER_KVEC:
1942		buf = msg->msg_iter.kvec->iov_base;
1943		to_read = msg->msg_iter.kvec->iov_len;
1944		rc = smbd_recv_buf(info, buf, to_read);
1945		break;
1946
1947	case ITER_BVEC:
1948		page = msg->msg_iter.bvec->bv_page;
1949		page_offset = msg->msg_iter.bvec->bv_offset;
1950		to_read = msg->msg_iter.bvec->bv_len;
1951		rc = smbd_recv_page(info, page, page_offset, to_read);
1952		break;
1953
1954	default:
1955		/* It's a bug in upper layer to get there */
1956		cifs_dbg(VFS, "Invalid msg type %d\n",
1957			 iov_iter_type(&msg->msg_iter));
1958		rc = -EINVAL;
1959	}
1960
1961out:
1962	/* SMBDirect will read it all or nothing */
1963	if (rc > 0)
1964		msg->msg_iter.count = 0;
1965	return rc;
1966}
1967
1968/*
1969 * Send data to transport
1970 * Each rqst is transported as a SMBDirect payload
1971 * rqst: the data to write
1972 * return value: 0 if successfully write, otherwise error code
1973 */
1974int smbd_send(struct TCP_Server_Info *server,
1975	int num_rqst, struct smb_rqst *rqst_array)
1976{
1977	struct smbd_connection *info = server->smbd_conn;
1978	struct kvec vec;
1979	int nvecs;
1980	int size;
1981	unsigned int buflen, remaining_data_length;
1982	int start, i, j;
1983	int max_iov_size =
1984		info->max_send_size - sizeof(struct smbd_data_transfer);
1985	struct kvec *iov;
1986	int rc;
1987	struct smb_rqst *rqst;
1988	int rqst_idx;
1989
1990	if (info->transport_status != SMBD_CONNECTED) {
1991		rc = -EAGAIN;
1992		goto done;
1993	}
1994
1995	/*
1996	 * Add in the page array if there is one. The caller needs to set
1997	 * rq_tailsz to PAGE_SIZE when the buffer has multiple pages and
1998	 * ends at page boundary
1999	 */
2000	remaining_data_length = 0;
2001	for (i = 0; i < num_rqst; i++)
2002		remaining_data_length += smb_rqst_len(server, &rqst_array[i]);
2003
2004	if (remaining_data_length > info->max_fragmented_send_size) {
2005		log_write(ERR, "payload size %d > max size %d\n",
2006			remaining_data_length, info->max_fragmented_send_size);
2007		rc = -EINVAL;
2008		goto done;
2009	}
2010
2011	log_write(INFO, "num_rqst=%d total length=%u\n",
2012			num_rqst, remaining_data_length);
2013
2014	rqst_idx = 0;
2015next_rqst:
2016	rqst = &rqst_array[rqst_idx];
2017	iov = rqst->rq_iov;
2018
2019	cifs_dbg(FYI, "Sending smb (RDMA): idx=%d smb_len=%lu\n",
2020		rqst_idx, smb_rqst_len(server, rqst));
2021	for (i = 0; i < rqst->rq_nvec; i++)
2022		dump_smb(iov[i].iov_base, iov[i].iov_len);
2023
2024
2025	log_write(INFO, "rqst_idx=%d nvec=%d rqst->rq_npages=%d rq_pagesz=%d rq_tailsz=%d buflen=%lu\n",
2026		  rqst_idx, rqst->rq_nvec, rqst->rq_npages, rqst->rq_pagesz,
2027		  rqst->rq_tailsz, smb_rqst_len(server, rqst));
2028
2029	start = i = 0;
2030	buflen = 0;
2031	while (true) {
2032		buflen += iov[i].iov_len;
2033		if (buflen > max_iov_size) {
2034			if (i > start) {
2035				remaining_data_length -=
2036					(buflen-iov[i].iov_len);
2037				log_write(INFO, "sending iov[] from start=%d i=%d nvecs=%d remaining_data_length=%d\n",
2038					  start, i, i - start,
2039					  remaining_data_length);
2040				rc = smbd_post_send_data(
2041					info, &iov[start], i-start,
2042					remaining_data_length);
2043				if (rc)
2044					goto done;
2045			} else {
2046				/* iov[start] is too big, break it */
2047				nvecs = (buflen+max_iov_size-1)/max_iov_size;
2048				log_write(INFO, "iov[%d] iov_base=%p buflen=%d break to %d vectors\n",
2049					  start, iov[start].iov_base,
2050					  buflen, nvecs);
2051				for (j = 0; j < nvecs; j++) {
2052					vec.iov_base =
2053						(char *)iov[start].iov_base +
2054						j*max_iov_size;
2055					vec.iov_len = max_iov_size;
2056					if (j == nvecs-1)
2057						vec.iov_len =
2058							buflen -
2059							max_iov_size*(nvecs-1);
2060					remaining_data_length -= vec.iov_len;
2061					log_write(INFO,
2062						"sending vec j=%d iov_base=%p iov_len=%zu remaining_data_length=%d\n",
2063						  j, vec.iov_base, vec.iov_len,
2064						  remaining_data_length);
2065					rc = smbd_post_send_data(
2066						info, &vec, 1,
2067						remaining_data_length);
2068					if (rc)
2069						goto done;
2070				}
2071				i++;
2072				if (i == rqst->rq_nvec)
2073					break;
2074			}
2075			start = i;
2076			buflen = 0;
2077		} else {
2078			i++;
2079			if (i == rqst->rq_nvec) {
2080				/* send out all remaining vecs */
2081				remaining_data_length -= buflen;
2082				log_write(INFO, "sending iov[] from start=%d i=%d nvecs=%d remaining_data_length=%d\n",
2083					  start, i, i - start,
2084					  remaining_data_length);
2085				rc = smbd_post_send_data(info, &iov[start],
2086					i-start, remaining_data_length);
2087				if (rc)
2088					goto done;
2089				break;
2090			}
2091		}
2092		log_write(INFO, "looping i=%d buflen=%d\n", i, buflen);
2093	}
2094
2095	/* now sending pages if there are any */
2096	for (i = 0; i < rqst->rq_npages; i++) {
2097		unsigned int offset;
2098
2099		rqst_page_get_length(rqst, i, &buflen, &offset);
2100		nvecs = (buflen + max_iov_size - 1) / max_iov_size;
2101		log_write(INFO, "sending pages buflen=%d nvecs=%d\n",
2102			buflen, nvecs);
2103		for (j = 0; j < nvecs; j++) {
2104			size = max_iov_size;
2105			if (j == nvecs-1)
2106				size = buflen - j*max_iov_size;
2107			remaining_data_length -= size;
2108			log_write(INFO, "sending pages i=%d offset=%d size=%d remaining_data_length=%d\n",
2109				  i, j * max_iov_size + offset, size,
2110				  remaining_data_length);
2111			rc = smbd_post_send_page(
2112				info, rqst->rq_pages[i],
2113				j*max_iov_size + offset,
2114				size, remaining_data_length);
2115			if (rc)
2116				goto done;
2117		}
2118	}
2119
2120	rqst_idx++;
2121	if (rqst_idx < num_rqst)
2122		goto next_rqst;
2123
2124done:
2125	/*
2126	 * As an optimization, we don't wait for individual I/O to finish
2127	 * before sending the next one.
2128	 * Send them all and wait for pending send count to get to 0
2129	 * that means all the I/Os have been out and we are good to return
2130	 */
2131
2132	wait_event(info->wait_send_pending,
2133		atomic_read(&info->send_pending) == 0);
2134
2135	return rc;
2136}
2137
2138static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
2139{
2140	struct smbd_mr *mr;
2141	struct ib_cqe *cqe;
2142
2143	if (wc->status) {
2144		log_rdma_mr(ERR, "status=%d\n", wc->status);
2145		cqe = wc->wr_cqe;
2146		mr = container_of(cqe, struct smbd_mr, cqe);
2147		smbd_disconnect_rdma_connection(mr->conn);
2148	}
2149}
2150
2151/*
2152 * The work queue function that recovers MRs
2153 * We need to call ib_dereg_mr() and ib_alloc_mr() before this MR can be used
2154 * again. Both calls are slow, so finish them in a workqueue. This will not
2155 * block I/O path.
2156 * There is one workqueue that recovers MRs, there is no need to lock as the
2157 * I/O requests calling smbd_register_mr will never update the links in the
2158 * mr_list.
2159 */
2160static void smbd_mr_recovery_work(struct work_struct *work)
2161{
2162	struct smbd_connection *info =
2163		container_of(work, struct smbd_connection, mr_recovery_work);
2164	struct smbd_mr *smbdirect_mr;
2165	int rc;
2166
2167	list_for_each_entry(smbdirect_mr, &info->mr_list, list) {
2168		if (smbdirect_mr->state == MR_ERROR) {
2169
2170			/* recover this MR entry */
2171			rc = ib_dereg_mr(smbdirect_mr->mr);
2172			if (rc) {
2173				log_rdma_mr(ERR,
2174					"ib_dereg_mr failed rc=%x\n",
2175					rc);
2176				smbd_disconnect_rdma_connection(info);
2177				continue;
2178			}
2179
2180			smbdirect_mr->mr = ib_alloc_mr(
2181				info->pd, info->mr_type,
2182				info->max_frmr_depth);
2183			if (IS_ERR(smbdirect_mr->mr)) {
2184				log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x max_frmr_depth=%x\n",
2185					    info->mr_type,
2186					    info->max_frmr_depth);
2187				smbd_disconnect_rdma_connection(info);
2188				continue;
2189			}
2190		} else
2191			/* This MR is being used, don't recover it */
2192			continue;
2193
2194		smbdirect_mr->state = MR_READY;
2195
2196		/* smbdirect_mr->state is updated by this function
2197		 * and is read and updated by I/O issuing CPUs trying
2198		 * to get a MR, the call to atomic_inc_return
2199		 * implicates a memory barrier and guarantees this
2200		 * value is updated before waking up any calls to
2201		 * get_mr() from the I/O issuing CPUs
2202		 */
2203		if (atomic_inc_return(&info->mr_ready_count) == 1)
2204			wake_up_interruptible(&info->wait_mr);
2205	}
2206}
2207
2208static void destroy_mr_list(struct smbd_connection *info)
2209{
2210	struct smbd_mr *mr, *tmp;
2211
2212	cancel_work_sync(&info->mr_recovery_work);
2213	list_for_each_entry_safe(mr, tmp, &info->mr_list, list) {
2214		if (mr->state == MR_INVALIDATED)
2215			ib_dma_unmap_sg(info->id->device, mr->sgl,
2216				mr->sgl_count, mr->dir);
2217		ib_dereg_mr(mr->mr);
2218		kfree(mr->sgl);
2219		kfree(mr);
2220	}
2221}
2222
2223/*
2224 * Allocate MRs used for RDMA read/write
2225 * The number of MRs will not exceed hardware capability in responder_resources
2226 * All MRs are kept in mr_list. The MR can be recovered after it's used
2227 * Recovery is done in smbd_mr_recovery_work. The content of list entry changes
2228 * as MRs are used and recovered for I/O, but the list links will not change
2229 */
2230static int allocate_mr_list(struct smbd_connection *info)
2231{
2232	int i;
2233	struct smbd_mr *smbdirect_mr, *tmp;
2234
2235	INIT_LIST_HEAD(&info->mr_list);
2236	init_waitqueue_head(&info->wait_mr);
2237	spin_lock_init(&info->mr_list_lock);
2238	atomic_set(&info->mr_ready_count, 0);
2239	atomic_set(&info->mr_used_count, 0);
2240	init_waitqueue_head(&info->wait_for_mr_cleanup);
2241	/* Allocate more MRs (2x) than hardware responder_resources */
2242	for (i = 0; i < info->responder_resources * 2; i++) {
2243		smbdirect_mr = kzalloc(sizeof(*smbdirect_mr), GFP_KERNEL);
2244		if (!smbdirect_mr)
2245			goto out;
2246		smbdirect_mr->mr = ib_alloc_mr(info->pd, info->mr_type,
2247					info->max_frmr_depth);
2248		if (IS_ERR(smbdirect_mr->mr)) {
2249			log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x max_frmr_depth=%x\n",
2250				    info->mr_type, info->max_frmr_depth);
2251			goto out;
2252		}
2253		smbdirect_mr->sgl = kcalloc(
2254					info->max_frmr_depth,
2255					sizeof(struct scatterlist),
2256					GFP_KERNEL);
2257		if (!smbdirect_mr->sgl) {
2258			log_rdma_mr(ERR, "failed to allocate sgl\n");
2259			ib_dereg_mr(smbdirect_mr->mr);
2260			goto out;
2261		}
2262		smbdirect_mr->state = MR_READY;
2263		smbdirect_mr->conn = info;
2264
2265		list_add_tail(&smbdirect_mr->list, &info->mr_list);
2266		atomic_inc(&info->mr_ready_count);
2267	}
2268	INIT_WORK(&info->mr_recovery_work, smbd_mr_recovery_work);
2269	return 0;
2270
2271out:
2272	kfree(smbdirect_mr);
2273
2274	list_for_each_entry_safe(smbdirect_mr, tmp, &info->mr_list, list) {
2275		ib_dereg_mr(smbdirect_mr->mr);
2276		kfree(smbdirect_mr->sgl);
2277		kfree(smbdirect_mr);
2278	}
2279	return -ENOMEM;
2280}
2281
2282/*
2283 * Get a MR from mr_list. This function waits until there is at least one
2284 * MR available in the list. It may access the list while the
2285 * smbd_mr_recovery_work is recovering the MR list. This doesn't need a lock
2286 * as they never modify the same places. However, there may be several CPUs
2287 * issueing I/O trying to get MR at the same time, mr_list_lock is used to
2288 * protect this situation.
2289 */
2290static struct smbd_mr *get_mr(struct smbd_connection *info)
2291{
2292	struct smbd_mr *ret;
2293	int rc;
2294again:
2295	rc = wait_event_interruptible(info->wait_mr,
2296		atomic_read(&info->mr_ready_count) ||
2297		info->transport_status != SMBD_CONNECTED);
2298	if (rc) {
2299		log_rdma_mr(ERR, "wait_event_interruptible rc=%x\n", rc);
2300		return NULL;
2301	}
2302
2303	if (info->transport_status != SMBD_CONNECTED) {
2304		log_rdma_mr(ERR, "info->transport_status=%x\n",
2305			info->transport_status);
2306		return NULL;
2307	}
2308
2309	spin_lock(&info->mr_list_lock);
2310	list_for_each_entry(ret, &info->mr_list, list) {
2311		if (ret->state == MR_READY) {
2312			ret->state = MR_REGISTERED;
2313			spin_unlock(&info->mr_list_lock);
2314			atomic_dec(&info->mr_ready_count);
2315			atomic_inc(&info->mr_used_count);
2316			return ret;
2317		}
2318	}
2319
2320	spin_unlock(&info->mr_list_lock);
2321	/*
2322	 * It is possible that we could fail to get MR because other processes may
2323	 * try to acquire a MR at the same time. If this is the case, retry it.
2324	 */
2325	goto again;
2326}
2327
2328/*
2329 * Register memory for RDMA read/write
2330 * pages[]: the list of pages to register memory with
2331 * num_pages: the number of pages to register
2332 * tailsz: if non-zero, the bytes to register in the last page
2333 * writing: true if this is a RDMA write (SMB read), false for RDMA read
2334 * need_invalidate: true if this MR needs to be locally invalidated after I/O
2335 * return value: the MR registered, NULL if failed.
2336 */
2337struct smbd_mr *smbd_register_mr(
2338	struct smbd_connection *info, struct page *pages[], int num_pages,
2339	int offset, int tailsz, bool writing, bool need_invalidate)
2340{
2341	struct smbd_mr *smbdirect_mr;
2342	int rc, i;
2343	enum dma_data_direction dir;
2344	struct ib_reg_wr *reg_wr;
2345
2346	if (num_pages > info->max_frmr_depth) {
2347		log_rdma_mr(ERR, "num_pages=%d max_frmr_depth=%d\n",
2348			num_pages, info->max_frmr_depth);
2349		return NULL;
2350	}
2351
2352	smbdirect_mr = get_mr(info);
2353	if (!smbdirect_mr) {
2354		log_rdma_mr(ERR, "get_mr returning NULL\n");
2355		return NULL;
2356	}
2357	smbdirect_mr->need_invalidate = need_invalidate;
2358	smbdirect_mr->sgl_count = num_pages;
2359	sg_init_table(smbdirect_mr->sgl, num_pages);
2360
2361	log_rdma_mr(INFO, "num_pages=0x%x offset=0x%x tailsz=0x%x\n",
2362			num_pages, offset, tailsz);
2363
2364	if (num_pages == 1) {
2365		sg_set_page(&smbdirect_mr->sgl[0], pages[0], tailsz, offset);
2366		goto skip_multiple_pages;
2367	}
2368
2369	/* We have at least two pages to register */
2370	sg_set_page(
2371		&smbdirect_mr->sgl[0], pages[0], PAGE_SIZE - offset, offset);
2372	i = 1;
2373	while (i < num_pages - 1) {
2374		sg_set_page(&smbdirect_mr->sgl[i], pages[i], PAGE_SIZE, 0);
2375		i++;
2376	}
2377	sg_set_page(&smbdirect_mr->sgl[i], pages[i],
2378		tailsz ? tailsz : PAGE_SIZE, 0);
2379
2380skip_multiple_pages:
2381	dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
2382	smbdirect_mr->dir = dir;
2383	rc = ib_dma_map_sg(info->id->device, smbdirect_mr->sgl, num_pages, dir);
2384	if (!rc) {
2385		log_rdma_mr(ERR, "ib_dma_map_sg num_pages=%x dir=%x rc=%x\n",
2386			num_pages, dir, rc);
2387		goto dma_map_error;
2388	}
2389
2390	rc = ib_map_mr_sg(smbdirect_mr->mr, smbdirect_mr->sgl, num_pages,
2391		NULL, PAGE_SIZE);
2392	if (rc != num_pages) {
2393		log_rdma_mr(ERR,
2394			"ib_map_mr_sg failed rc = %d num_pages = %x\n",
2395			rc, num_pages);
2396		goto map_mr_error;
2397	}
2398
2399	ib_update_fast_reg_key(smbdirect_mr->mr,
2400		ib_inc_rkey(smbdirect_mr->mr->rkey));
2401	reg_wr = &smbdirect_mr->wr;
2402	reg_wr->wr.opcode = IB_WR_REG_MR;
2403	smbdirect_mr->cqe.done = register_mr_done;
2404	reg_wr->wr.wr_cqe = &smbdirect_mr->cqe;
2405	reg_wr->wr.num_sge = 0;
2406	reg_wr->wr.send_flags = IB_SEND_SIGNALED;
2407	reg_wr->mr = smbdirect_mr->mr;
2408	reg_wr->key = smbdirect_mr->mr->rkey;
2409	reg_wr->access = writing ?
2410			IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
2411			IB_ACCESS_REMOTE_READ;
2412
2413	/*
2414	 * There is no need for waiting for complemtion on ib_post_send
2415	 * on IB_WR_REG_MR. Hardware enforces a barrier and order of execution
2416	 * on the next ib_post_send when we actaully send I/O to remote peer
2417	 */
2418	rc = ib_post_send(info->id->qp, &reg_wr->wr, NULL);
2419	if (!rc)
2420		return smbdirect_mr;
2421
2422	log_rdma_mr(ERR, "ib_post_send failed rc=%x reg_wr->key=%x\n",
2423		rc, reg_wr->key);
2424
2425	/* If all failed, attempt to recover this MR by setting it MR_ERROR*/
2426map_mr_error:
2427	ib_dma_unmap_sg(info->id->device, smbdirect_mr->sgl,
2428		smbdirect_mr->sgl_count, smbdirect_mr->dir);
2429
2430dma_map_error:
2431	smbdirect_mr->state = MR_ERROR;
2432	if (atomic_dec_and_test(&info->mr_used_count))
2433		wake_up(&info->wait_for_mr_cleanup);
2434
2435	smbd_disconnect_rdma_connection(info);
2436
2437	return NULL;
2438}
2439
2440static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
2441{
2442	struct smbd_mr *smbdirect_mr;
2443	struct ib_cqe *cqe;
2444
2445	cqe = wc->wr_cqe;
2446	smbdirect_mr = container_of(cqe, struct smbd_mr, cqe);
2447	smbdirect_mr->state = MR_INVALIDATED;
2448	if (wc->status != IB_WC_SUCCESS) {
2449		log_rdma_mr(ERR, "invalidate failed status=%x\n", wc->status);
2450		smbdirect_mr->state = MR_ERROR;
2451	}
2452	complete(&smbdirect_mr->invalidate_done);
2453}
2454
2455/*
2456 * Deregister a MR after I/O is done
2457 * This function may wait if remote invalidation is not used
2458 * and we have to locally invalidate the buffer to prevent data is being
2459 * modified by remote peer after upper layer consumes it
2460 */
2461int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
2462{
2463	struct ib_send_wr *wr;
2464	struct smbd_connection *info = smbdirect_mr->conn;
2465	int rc = 0;
2466
2467	if (smbdirect_mr->need_invalidate) {
2468		/* Need to finish local invalidation before returning */
2469		wr = &smbdirect_mr->inv_wr;
2470		wr->opcode = IB_WR_LOCAL_INV;
2471		smbdirect_mr->cqe.done = local_inv_done;
2472		wr->wr_cqe = &smbdirect_mr->cqe;
2473		wr->num_sge = 0;
2474		wr->ex.invalidate_rkey = smbdirect_mr->mr->rkey;
2475		wr->send_flags = IB_SEND_SIGNALED;
2476
2477		init_completion(&smbdirect_mr->invalidate_done);
2478		rc = ib_post_send(info->id->qp, wr, NULL);
2479		if (rc) {
2480			log_rdma_mr(ERR, "ib_post_send failed rc=%x\n", rc);
2481			smbd_disconnect_rdma_connection(info);
2482			goto done;
2483		}
2484		wait_for_completion(&smbdirect_mr->invalidate_done);
2485		smbdirect_mr->need_invalidate = false;
2486	} else
2487		/*
2488		 * For remote invalidation, just set it to MR_INVALIDATED
2489		 * and defer to mr_recovery_work to recover the MR for next use
2490		 */
2491		smbdirect_mr->state = MR_INVALIDATED;
2492
2493	if (smbdirect_mr->state == MR_INVALIDATED) {
2494		ib_dma_unmap_sg(
2495			info->id->device, smbdirect_mr->sgl,
2496			smbdirect_mr->sgl_count,
2497			smbdirect_mr->dir);
2498		smbdirect_mr->state = MR_READY;
2499		if (atomic_inc_return(&info->mr_ready_count) == 1)
2500			wake_up_interruptible(&info->wait_mr);
2501	} else
2502		/*
2503		 * Schedule the work to do MR recovery for future I/Os MR
2504		 * recovery is slow and don't want it to block current I/O
2505		 */
2506		queue_work(info->workqueue, &info->mr_recovery_work);
2507
2508done:
2509	if (atomic_dec_and_test(&info->mr_used_count))
2510		wake_up(&info->wait_for_mr_cleanup);
2511
2512	return rc;
2513}