Linux Audio

Check our new training course

Loading...
Note: File does not exist in v3.5.6.
   1/*
   2 *   Copyright (C) 2017, Microsoft Corporation.
   3 *
   4 *   Author(s): Long Li <longli@microsoft.com>
   5 *
   6 *   This program is free software;  you can redistribute it and/or modify
   7 *   it under the terms of the GNU General Public License as published by
   8 *   the Free Software Foundation; either version 2 of the License, or
   9 *   (at your option) any later version.
  10 *
  11 *   This program is distributed in the hope that it will be useful,
  12 *   but WITHOUT ANY WARRANTY;  without even the implied warranty of
  13 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See
  14 *   the GNU General Public License for more details.
  15 */
  16#include <linux/module.h>
  17#include <linux/highmem.h>
  18#include "smbdirect.h"
  19#include "cifs_debug.h"
  20
  21static struct smbd_response *get_empty_queue_buffer(
  22		struct smbd_connection *info);
  23static struct smbd_response *get_receive_buffer(
  24		struct smbd_connection *info);
  25static void put_receive_buffer(
  26		struct smbd_connection *info,
  27		struct smbd_response *response);
  28static int allocate_receive_buffers(struct smbd_connection *info, int num_buf);
  29static void destroy_receive_buffers(struct smbd_connection *info);
  30
  31static void put_empty_packet(
  32		struct smbd_connection *info, struct smbd_response *response);
  33static void enqueue_reassembly(
  34		struct smbd_connection *info,
  35		struct smbd_response *response, int data_length);
  36static struct smbd_response *_get_first_reassembly(
  37		struct smbd_connection *info);
  38
  39static int smbd_post_recv(
  40		struct smbd_connection *info,
  41		struct smbd_response *response);
  42
  43static int smbd_post_send_empty(struct smbd_connection *info);
  44static int smbd_post_send_data(
  45		struct smbd_connection *info,
  46		struct kvec *iov, int n_vec, int remaining_data_length);
  47static int smbd_post_send_page(struct smbd_connection *info,
  48		struct page *page, unsigned long offset,
  49		size_t size, int remaining_data_length);
  50
  51static void destroy_mr_list(struct smbd_connection *info);
  52static int allocate_mr_list(struct smbd_connection *info);
  53
  54/* SMBD version number */
  55#define SMBD_V1	0x0100
  56
  57/* Port numbers for SMBD transport */
  58#define SMB_PORT	445
  59#define SMBD_PORT	5445
  60
  61/* Address lookup and resolve timeout in ms */
  62#define RDMA_RESOLVE_TIMEOUT	5000
  63
  64/* SMBD negotiation timeout in seconds */
  65#define SMBD_NEGOTIATE_TIMEOUT	120
  66
  67/* SMBD minimum receive size and fragmented sized defined in [MS-SMBD] */
  68#define SMBD_MIN_RECEIVE_SIZE		128
  69#define SMBD_MIN_FRAGMENTED_SIZE	131072
  70
  71/*
  72 * Default maximum number of RDMA read/write outstanding on this connection
  73 * This value is possibly decreased during QP creation on hardware limit
  74 */
  75#define SMBD_CM_RESPONDER_RESOURCES	32
  76
  77/* Maximum number of retries on data transfer operations */
  78#define SMBD_CM_RETRY			6
  79/* No need to retry on Receiver Not Ready since SMBD manages credits */
  80#define SMBD_CM_RNR_RETRY		0
  81
  82/*
  83 * User configurable initial values per SMBD transport connection
  84 * as defined in [MS-SMBD] 3.1.1.1
  85 * Those may change after a SMBD negotiation
  86 */
  87/* The local peer's maximum number of credits to grant to the peer */
  88int smbd_receive_credit_max = 255;
  89
  90/* The remote peer's credit request of local peer */
  91int smbd_send_credit_target = 255;
  92
  93/* The maximum single message size can be sent to remote peer */
  94int smbd_max_send_size = 1364;
  95
  96/*  The maximum fragmented upper-layer payload receive size supported */
  97int smbd_max_fragmented_recv_size = 1024 * 1024;
  98
  99/*  The maximum single-message size which can be received */
 100int smbd_max_receive_size = 8192;
 101
 102/* The timeout to initiate send of a keepalive message on idle */
 103int smbd_keep_alive_interval = 120;
 104
 105/*
 106 * User configurable initial values for RDMA transport
 107 * The actual values used may be lower and are limited to hardware capabilities
 108 */
 109/* Default maximum number of SGEs in a RDMA write/read */
 110int smbd_max_frmr_depth = 2048;
 111
 112/* If payload is less than this byte, use RDMA send/recv not read/write */
 113int rdma_readwrite_threshold = 4096;
 114
 115/* Transport logging functions
 116 * Logging are defined as classes. They can be OR'ed to define the actual
 117 * logging level via module parameter smbd_logging_class
 118 * e.g. cifs.smbd_logging_class=0xa0 will log all log_rdma_recv() and
 119 * log_rdma_event()
 120 */
 121#define LOG_OUTGOING			0x1
 122#define LOG_INCOMING			0x2
 123#define LOG_READ			0x4
 124#define LOG_WRITE			0x8
 125#define LOG_RDMA_SEND			0x10
 126#define LOG_RDMA_RECV			0x20
 127#define LOG_KEEP_ALIVE			0x40
 128#define LOG_RDMA_EVENT			0x80
 129#define LOG_RDMA_MR			0x100
 130static unsigned int smbd_logging_class;
 131module_param(smbd_logging_class, uint, 0644);
 132MODULE_PARM_DESC(smbd_logging_class,
 133	"Logging class for SMBD transport 0x0 to 0x100");
 134
 135#define ERR		0x0
 136#define INFO		0x1
 137static unsigned int smbd_logging_level = ERR;
 138module_param(smbd_logging_level, uint, 0644);
 139MODULE_PARM_DESC(smbd_logging_level,
 140	"Logging level for SMBD transport, 0 (default): error, 1: info");
 141
 142#define log_rdma(level, class, fmt, args...)				\
 143do {									\
 144	if (level <= smbd_logging_level || class & smbd_logging_class)	\
 145		cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
 146} while (0)
 147
 148#define log_outgoing(level, fmt, args...) \
 149		log_rdma(level, LOG_OUTGOING, fmt, ##args)
 150#define log_incoming(level, fmt, args...) \
 151		log_rdma(level, LOG_INCOMING, fmt, ##args)
 152#define log_read(level, fmt, args...)	log_rdma(level, LOG_READ, fmt, ##args)
 153#define log_write(level, fmt, args...)	log_rdma(level, LOG_WRITE, fmt, ##args)
 154#define log_rdma_send(level, fmt, args...) \
 155		log_rdma(level, LOG_RDMA_SEND, fmt, ##args)
 156#define log_rdma_recv(level, fmt, args...) \
 157		log_rdma(level, LOG_RDMA_RECV, fmt, ##args)
 158#define log_keep_alive(level, fmt, args...) \
 159		log_rdma(level, LOG_KEEP_ALIVE, fmt, ##args)
 160#define log_rdma_event(level, fmt, args...) \
 161		log_rdma(level, LOG_RDMA_EVENT, fmt, ##args)
 162#define log_rdma_mr(level, fmt, args...) \
 163		log_rdma(level, LOG_RDMA_MR, fmt, ##args)
 164
 165/*
 166 * Destroy the transport and related RDMA and memory resources
 167 * Need to go through all the pending counters and make sure on one is using
 168 * the transport while it is destroyed
 169 */
 170static void smbd_destroy_rdma_work(struct work_struct *work)
 171{
 172	struct smbd_response *response;
 173	struct smbd_connection *info =
 174		container_of(work, struct smbd_connection, destroy_work);
 175	unsigned long flags;
 176
 177	log_rdma_event(INFO, "destroying qp\n");
 178	ib_drain_qp(info->id->qp);
 179	rdma_destroy_qp(info->id);
 180
 181	/* Unblock all I/O waiting on the send queue */
 182	wake_up_interruptible_all(&info->wait_send_queue);
 183
 184	log_rdma_event(INFO, "cancelling idle timer\n");
 185	cancel_delayed_work_sync(&info->idle_timer_work);
 186	log_rdma_event(INFO, "cancelling send immediate work\n");
 187	cancel_delayed_work_sync(&info->send_immediate_work);
 188
 189	log_rdma_event(INFO, "wait for all send to finish\n");
 190	wait_event(info->wait_smbd_send_pending,
 191		info->smbd_send_pending == 0);
 192
 193	log_rdma_event(INFO, "wait for all recv to finish\n");
 194	wake_up_interruptible(&info->wait_reassembly_queue);
 195	wait_event(info->wait_smbd_recv_pending,
 196		info->smbd_recv_pending == 0);
 197
 198	log_rdma_event(INFO, "wait for all send posted to IB to finish\n");
 199	wait_event(info->wait_send_pending,
 200		atomic_read(&info->send_pending) == 0);
 201	wait_event(info->wait_send_payload_pending,
 202		atomic_read(&info->send_payload_pending) == 0);
 203
 204	log_rdma_event(INFO, "freeing mr list\n");
 205	wake_up_interruptible_all(&info->wait_mr);
 206	wait_event(info->wait_for_mr_cleanup,
 207		atomic_read(&info->mr_used_count) == 0);
 208	destroy_mr_list(info);
 209
 210	/* It's not posssible for upper layer to get to reassembly */
 211	log_rdma_event(INFO, "drain the reassembly queue\n");
 212	do {
 213		spin_lock_irqsave(&info->reassembly_queue_lock, flags);
 214		response = _get_first_reassembly(info);
 215		if (response) {
 216			list_del(&response->list);
 217			spin_unlock_irqrestore(
 218				&info->reassembly_queue_lock, flags);
 219			put_receive_buffer(info, response);
 220		} else
 221			spin_unlock_irqrestore(&info->reassembly_queue_lock, flags);
 222	} while (response);
 223
 224	info->reassembly_data_length = 0;
 225
 226	log_rdma_event(INFO, "free receive buffers\n");
 227	wait_event(info->wait_receive_queues,
 228		info->count_receive_queue + info->count_empty_packet_queue
 229			== info->receive_credit_max);
 230	destroy_receive_buffers(info);
 231
 232	ib_free_cq(info->send_cq);
 233	ib_free_cq(info->recv_cq);
 234	ib_dealloc_pd(info->pd);
 235	rdma_destroy_id(info->id);
 236
 237	/* free mempools */
 238	mempool_destroy(info->request_mempool);
 239	kmem_cache_destroy(info->request_cache);
 240
 241	mempool_destroy(info->response_mempool);
 242	kmem_cache_destroy(info->response_cache);
 243
 244	info->transport_status = SMBD_DESTROYED;
 245	wake_up_all(&info->wait_destroy);
 246}
 247
 248static int smbd_process_disconnected(struct smbd_connection *info)
 249{
 250	schedule_work(&info->destroy_work);
 251	return 0;
 252}
 253
 254static void smbd_disconnect_rdma_work(struct work_struct *work)
 255{
 256	struct smbd_connection *info =
 257		container_of(work, struct smbd_connection, disconnect_work);
 258
 259	if (info->transport_status == SMBD_CONNECTED) {
 260		info->transport_status = SMBD_DISCONNECTING;
 261		rdma_disconnect(info->id);
 262	}
 263}
 264
 265static void smbd_disconnect_rdma_connection(struct smbd_connection *info)
 266{
 267	queue_work(info->workqueue, &info->disconnect_work);
 268}
 269
 270/* Upcall from RDMA CM */
 271static int smbd_conn_upcall(
 272		struct rdma_cm_id *id, struct rdma_cm_event *event)
 273{
 274	struct smbd_connection *info = id->context;
 275
 276	log_rdma_event(INFO, "event=%d status=%d\n",
 277		event->event, event->status);
 278
 279	switch (event->event) {
 280	case RDMA_CM_EVENT_ADDR_RESOLVED:
 281	case RDMA_CM_EVENT_ROUTE_RESOLVED:
 282		info->ri_rc = 0;
 283		complete(&info->ri_done);
 284		break;
 285
 286	case RDMA_CM_EVENT_ADDR_ERROR:
 287		info->ri_rc = -EHOSTUNREACH;
 288		complete(&info->ri_done);
 289		break;
 290
 291	case RDMA_CM_EVENT_ROUTE_ERROR:
 292		info->ri_rc = -ENETUNREACH;
 293		complete(&info->ri_done);
 294		break;
 295
 296	case RDMA_CM_EVENT_ESTABLISHED:
 297		log_rdma_event(INFO, "connected event=%d\n", event->event);
 298		info->transport_status = SMBD_CONNECTED;
 299		wake_up_interruptible(&info->conn_wait);
 300		break;
 301
 302	case RDMA_CM_EVENT_CONNECT_ERROR:
 303	case RDMA_CM_EVENT_UNREACHABLE:
 304	case RDMA_CM_EVENT_REJECTED:
 305		log_rdma_event(INFO, "connecting failed event=%d\n", event->event);
 306		info->transport_status = SMBD_DISCONNECTED;
 307		wake_up_interruptible(&info->conn_wait);
 308		break;
 309
 310	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 311	case RDMA_CM_EVENT_DISCONNECTED:
 312		/* This happenes when we fail the negotiation */
 313		if (info->transport_status == SMBD_NEGOTIATE_FAILED) {
 314			info->transport_status = SMBD_DISCONNECTED;
 315			wake_up(&info->conn_wait);
 316			break;
 317		}
 318
 319		info->transport_status = SMBD_DISCONNECTED;
 320		smbd_process_disconnected(info);
 321		break;
 322
 323	default:
 324		break;
 325	}
 326
 327	return 0;
 328}
 329
 330/* Upcall from RDMA QP */
 331static void
 332smbd_qp_async_error_upcall(struct ib_event *event, void *context)
 333{
 334	struct smbd_connection *info = context;
 335
 336	log_rdma_event(ERR, "%s on device %s info %p\n",
 337		ib_event_msg(event->event), event->device->name, info);
 338
 339	switch (event->event) {
 340	case IB_EVENT_CQ_ERR:
 341	case IB_EVENT_QP_FATAL:
 342		smbd_disconnect_rdma_connection(info);
 343
 344	default:
 345		break;
 346	}
 347}
 348
 349static inline void *smbd_request_payload(struct smbd_request *request)
 350{
 351	return (void *)request->packet;
 352}
 353
 354static inline void *smbd_response_payload(struct smbd_response *response)
 355{
 356	return (void *)response->packet;
 357}
 358
 359/* Called when a RDMA send is done */
 360static void send_done(struct ib_cq *cq, struct ib_wc *wc)
 361{
 362	int i;
 363	struct smbd_request *request =
 364		container_of(wc->wr_cqe, struct smbd_request, cqe);
 365
 366	log_rdma_send(INFO, "smbd_request %p completed wc->status=%d\n",
 367		request, wc->status);
 368
 369	if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_SEND) {
 370		log_rdma_send(ERR, "wc->status=%d wc->opcode=%d\n",
 371			wc->status, wc->opcode);
 372		smbd_disconnect_rdma_connection(request->info);
 373	}
 374
 375	for (i = 0; i < request->num_sge; i++)
 376		ib_dma_unmap_single(request->info->id->device,
 377			request->sge[i].addr,
 378			request->sge[i].length,
 379			DMA_TO_DEVICE);
 380
 381	if (request->has_payload) {
 382		if (atomic_dec_and_test(&request->info->send_payload_pending))
 383			wake_up(&request->info->wait_send_payload_pending);
 384	} else {
 385		if (atomic_dec_and_test(&request->info->send_pending))
 386			wake_up(&request->info->wait_send_pending);
 387	}
 388
 389	mempool_free(request, request->info->request_mempool);
 390}
 391
 392static void dump_smbd_negotiate_resp(struct smbd_negotiate_resp *resp)
 393{
 394	log_rdma_event(INFO, "resp message min_version %u max_version %u "
 395		"negotiated_version %u credits_requested %u "
 396		"credits_granted %u status %u max_readwrite_size %u "
 397		"preferred_send_size %u max_receive_size %u "
 398		"max_fragmented_size %u\n",
 399		resp->min_version, resp->max_version, resp->negotiated_version,
 400		resp->credits_requested, resp->credits_granted, resp->status,
 401		resp->max_readwrite_size, resp->preferred_send_size,
 402		resp->max_receive_size, resp->max_fragmented_size);
 403}
 404
 405/*
 406 * Process a negotiation response message, according to [MS-SMBD]3.1.5.7
 407 * response, packet_length: the negotiation response message
 408 * return value: true if negotiation is a success, false if failed
 409 */
 410static bool process_negotiation_response(
 411		struct smbd_response *response, int packet_length)
 412{
 413	struct smbd_connection *info = response->info;
 414	struct smbd_negotiate_resp *packet = smbd_response_payload(response);
 415
 416	if (packet_length < sizeof(struct smbd_negotiate_resp)) {
 417		log_rdma_event(ERR,
 418			"error: packet_length=%d\n", packet_length);
 419		return false;
 420	}
 421
 422	if (le16_to_cpu(packet->negotiated_version) != SMBD_V1) {
 423		log_rdma_event(ERR, "error: negotiated_version=%x\n",
 424			le16_to_cpu(packet->negotiated_version));
 425		return false;
 426	}
 427	info->protocol = le16_to_cpu(packet->negotiated_version);
 428
 429	if (packet->credits_requested == 0) {
 430		log_rdma_event(ERR, "error: credits_requested==0\n");
 431		return false;
 432	}
 433	info->receive_credit_target = le16_to_cpu(packet->credits_requested);
 434
 435	if (packet->credits_granted == 0) {
 436		log_rdma_event(ERR, "error: credits_granted==0\n");
 437		return false;
 438	}
 439	atomic_set(&info->send_credits, le16_to_cpu(packet->credits_granted));
 440
 441	atomic_set(&info->receive_credits, 0);
 442
 443	if (le32_to_cpu(packet->preferred_send_size) > info->max_receive_size) {
 444		log_rdma_event(ERR, "error: preferred_send_size=%d\n",
 445			le32_to_cpu(packet->preferred_send_size));
 446		return false;
 447	}
 448	info->max_receive_size = le32_to_cpu(packet->preferred_send_size);
 449
 450	if (le32_to_cpu(packet->max_receive_size) < SMBD_MIN_RECEIVE_SIZE) {
 451		log_rdma_event(ERR, "error: max_receive_size=%d\n",
 452			le32_to_cpu(packet->max_receive_size));
 453		return false;
 454	}
 455	info->max_send_size = min_t(int, info->max_send_size,
 456					le32_to_cpu(packet->max_receive_size));
 457
 458	if (le32_to_cpu(packet->max_fragmented_size) <
 459			SMBD_MIN_FRAGMENTED_SIZE) {
 460		log_rdma_event(ERR, "error: max_fragmented_size=%d\n",
 461			le32_to_cpu(packet->max_fragmented_size));
 462		return false;
 463	}
 464	info->max_fragmented_send_size =
 465		le32_to_cpu(packet->max_fragmented_size);
 466	info->rdma_readwrite_threshold =
 467		rdma_readwrite_threshold > info->max_fragmented_send_size ?
 468		info->max_fragmented_send_size :
 469		rdma_readwrite_threshold;
 470
 471
 472	info->max_readwrite_size = min_t(u32,
 473			le32_to_cpu(packet->max_readwrite_size),
 474			info->max_frmr_depth * PAGE_SIZE);
 475	info->max_frmr_depth = info->max_readwrite_size / PAGE_SIZE;
 476
 477	return true;
 478}
 479
 480/*
 481 * Check and schedule to send an immediate packet
 482 * This is used to extend credtis to remote peer to keep the transport busy
 483 */
 484static void check_and_send_immediate(struct smbd_connection *info)
 485{
 486	if (info->transport_status != SMBD_CONNECTED)
 487		return;
 488
 489	info->send_immediate = true;
 490
 491	/*
 492	 * Promptly send a packet if our peer is running low on receive
 493	 * credits
 494	 */
 495	if (atomic_read(&info->receive_credits) <
 496		info->receive_credit_target - 1)
 497		queue_delayed_work(
 498			info->workqueue, &info->send_immediate_work, 0);
 499}
 500
 501static void smbd_post_send_credits(struct work_struct *work)
 502{
 503	int ret = 0;
 504	int use_receive_queue = 1;
 505	int rc;
 506	struct smbd_response *response;
 507	struct smbd_connection *info =
 508		container_of(work, struct smbd_connection,
 509			post_send_credits_work);
 510
 511	if (info->transport_status != SMBD_CONNECTED) {
 512		wake_up(&info->wait_receive_queues);
 513		return;
 514	}
 515
 516	if (info->receive_credit_target >
 517		atomic_read(&info->receive_credits)) {
 518		while (true) {
 519			if (use_receive_queue)
 520				response = get_receive_buffer(info);
 521			else
 522				response = get_empty_queue_buffer(info);
 523			if (!response) {
 524				/* now switch to emtpy packet queue */
 525				if (use_receive_queue) {
 526					use_receive_queue = 0;
 527					continue;
 528				} else
 529					break;
 530			}
 531
 532			response->type = SMBD_TRANSFER_DATA;
 533			response->first_segment = false;
 534			rc = smbd_post_recv(info, response);
 535			if (rc) {
 536				log_rdma_recv(ERR,
 537					"post_recv failed rc=%d\n", rc);
 538				put_receive_buffer(info, response);
 539				break;
 540			}
 541
 542			ret++;
 543		}
 544	}
 545
 546	spin_lock(&info->lock_new_credits_offered);
 547	info->new_credits_offered += ret;
 548	spin_unlock(&info->lock_new_credits_offered);
 549
 550	atomic_add(ret, &info->receive_credits);
 551
 552	/* Check if we can post new receive and grant credits to peer */
 553	check_and_send_immediate(info);
 554}
 555
 556static void smbd_recv_done_work(struct work_struct *work)
 557{
 558	struct smbd_connection *info =
 559		container_of(work, struct smbd_connection, recv_done_work);
 560
 561	/*
 562	 * We may have new send credits granted from remote peer
 563	 * If any sender is blcoked on lack of credets, unblock it
 564	 */
 565	if (atomic_read(&info->send_credits))
 566		wake_up_interruptible(&info->wait_send_queue);
 567
 568	/*
 569	 * Check if we need to send something to remote peer to
 570	 * grant more credits or respond to KEEP_ALIVE packet
 571	 */
 572	check_and_send_immediate(info);
 573}
 574
 575/* Called from softirq, when recv is done */
 576static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
 577{
 578	struct smbd_data_transfer *data_transfer;
 579	struct smbd_response *response =
 580		container_of(wc->wr_cqe, struct smbd_response, cqe);
 581	struct smbd_connection *info = response->info;
 582	int data_length = 0;
 583
 584	log_rdma_recv(INFO, "response=%p type=%d wc status=%d wc opcode %d "
 585		      "byte_len=%d pkey_index=%x\n",
 586		response, response->type, wc->status, wc->opcode,
 587		wc->byte_len, wc->pkey_index);
 588
 589	if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_RECV) {
 590		log_rdma_recv(INFO, "wc->status=%d opcode=%d\n",
 591			wc->status, wc->opcode);
 592		smbd_disconnect_rdma_connection(info);
 593		goto error;
 594	}
 595
 596	ib_dma_sync_single_for_cpu(
 597		wc->qp->device,
 598		response->sge.addr,
 599		response->sge.length,
 600		DMA_FROM_DEVICE);
 601
 602	switch (response->type) {
 603	/* SMBD negotiation response */
 604	case SMBD_NEGOTIATE_RESP:
 605		dump_smbd_negotiate_resp(smbd_response_payload(response));
 606		info->full_packet_received = true;
 607		info->negotiate_done =
 608			process_negotiation_response(response, wc->byte_len);
 609		complete(&info->negotiate_completion);
 610		break;
 611
 612	/* SMBD data transfer packet */
 613	case SMBD_TRANSFER_DATA:
 614		data_transfer = smbd_response_payload(response);
 615		data_length = le32_to_cpu(data_transfer->data_length);
 616
 617		/*
 618		 * If this is a packet with data playload place the data in
 619		 * reassembly queue and wake up the reading thread
 620		 */
 621		if (data_length) {
 622			if (info->full_packet_received)
 623				response->first_segment = true;
 624
 625			if (le32_to_cpu(data_transfer->remaining_data_length))
 626				info->full_packet_received = false;
 627			else
 628				info->full_packet_received = true;
 629
 630			enqueue_reassembly(
 631				info,
 632				response,
 633				data_length);
 634		} else
 635			put_empty_packet(info, response);
 636
 637		if (data_length)
 638			wake_up_interruptible(&info->wait_reassembly_queue);
 639
 640		atomic_dec(&info->receive_credits);
 641		info->receive_credit_target =
 642			le16_to_cpu(data_transfer->credits_requested);
 643		atomic_add(le16_to_cpu(data_transfer->credits_granted),
 644			&info->send_credits);
 645
 646		log_incoming(INFO, "data flags %d data_offset %d "
 647			"data_length %d remaining_data_length %d\n",
 648			le16_to_cpu(data_transfer->flags),
 649			le32_to_cpu(data_transfer->data_offset),
 650			le32_to_cpu(data_transfer->data_length),
 651			le32_to_cpu(data_transfer->remaining_data_length));
 652
 653		/* Send a KEEP_ALIVE response right away if requested */
 654		info->keep_alive_requested = KEEP_ALIVE_NONE;
 655		if (le16_to_cpu(data_transfer->flags) &
 656				SMB_DIRECT_RESPONSE_REQUESTED) {
 657			info->keep_alive_requested = KEEP_ALIVE_PENDING;
 658		}
 659
 660		queue_work(info->workqueue, &info->recv_done_work);
 661		return;
 662
 663	default:
 664		log_rdma_recv(ERR,
 665			"unexpected response type=%d\n", response->type);
 666	}
 667
 668error:
 669	put_receive_buffer(info, response);
 670}
 671
 672static struct rdma_cm_id *smbd_create_id(
 673		struct smbd_connection *info,
 674		struct sockaddr *dstaddr, int port)
 675{
 676	struct rdma_cm_id *id;
 677	int rc;
 678	__be16 *sport;
 679
 680	id = rdma_create_id(&init_net, smbd_conn_upcall, info,
 681		RDMA_PS_TCP, IB_QPT_RC);
 682	if (IS_ERR(id)) {
 683		rc = PTR_ERR(id);
 684		log_rdma_event(ERR, "rdma_create_id() failed %i\n", rc);
 685		return id;
 686	}
 687
 688	if (dstaddr->sa_family == AF_INET6)
 689		sport = &((struct sockaddr_in6 *)dstaddr)->sin6_port;
 690	else
 691		sport = &((struct sockaddr_in *)dstaddr)->sin_port;
 692
 693	*sport = htons(port);
 694
 695	init_completion(&info->ri_done);
 696	info->ri_rc = -ETIMEDOUT;
 697
 698	rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)dstaddr,
 699		RDMA_RESOLVE_TIMEOUT);
 700	if (rc) {
 701		log_rdma_event(ERR, "rdma_resolve_addr() failed %i\n", rc);
 702		goto out;
 703	}
 704	wait_for_completion_interruptible_timeout(
 705		&info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 706	rc = info->ri_rc;
 707	if (rc) {
 708		log_rdma_event(ERR, "rdma_resolve_addr() completed %i\n", rc);
 709		goto out;
 710	}
 711
 712	info->ri_rc = -ETIMEDOUT;
 713	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 714	if (rc) {
 715		log_rdma_event(ERR, "rdma_resolve_route() failed %i\n", rc);
 716		goto out;
 717	}
 718	wait_for_completion_interruptible_timeout(
 719		&info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
 720	rc = info->ri_rc;
 721	if (rc) {
 722		log_rdma_event(ERR, "rdma_resolve_route() completed %i\n", rc);
 723		goto out;
 724	}
 725
 726	return id;
 727
 728out:
 729	rdma_destroy_id(id);
 730	return ERR_PTR(rc);
 731}
 732
 733/*
 734 * Test if FRWR (Fast Registration Work Requests) is supported on the device
 735 * This implementation requries FRWR on RDMA read/write
 736 * return value: true if it is supported
 737 */
 738static bool frwr_is_supported(struct ib_device_attr *attrs)
 739{
 740	if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
 741		return false;
 742	if (attrs->max_fast_reg_page_list_len == 0)
 743		return false;
 744	return true;
 745}
 746
 747static int smbd_ia_open(
 748		struct smbd_connection *info,
 749		struct sockaddr *dstaddr, int port)
 750{
 751	int rc;
 752
 753	info->id = smbd_create_id(info, dstaddr, port);
 754	if (IS_ERR(info->id)) {
 755		rc = PTR_ERR(info->id);
 756		goto out1;
 757	}
 758
 759	if (!frwr_is_supported(&info->id->device->attrs)) {
 760		log_rdma_event(ERR,
 761			"Fast Registration Work Requests "
 762			"(FRWR) is not supported\n");
 763		log_rdma_event(ERR,
 764			"Device capability flags = %llx "
 765			"max_fast_reg_page_list_len = %u\n",
 766			info->id->device->attrs.device_cap_flags,
 767			info->id->device->attrs.max_fast_reg_page_list_len);
 768		rc = -EPROTONOSUPPORT;
 769		goto out2;
 770	}
 771	info->max_frmr_depth = min_t(int,
 772		smbd_max_frmr_depth,
 773		info->id->device->attrs.max_fast_reg_page_list_len);
 774	info->mr_type = IB_MR_TYPE_MEM_REG;
 775	if (info->id->device->attrs.device_cap_flags & IB_DEVICE_SG_GAPS_REG)
 776		info->mr_type = IB_MR_TYPE_SG_GAPS;
 777
 778	info->pd = ib_alloc_pd(info->id->device, 0);
 779	if (IS_ERR(info->pd)) {
 780		rc = PTR_ERR(info->pd);
 781		log_rdma_event(ERR, "ib_alloc_pd() returned %d\n", rc);
 782		goto out2;
 783	}
 784
 785	return 0;
 786
 787out2:
 788	rdma_destroy_id(info->id);
 789	info->id = NULL;
 790
 791out1:
 792	return rc;
 793}
 794
 795/*
 796 * Send a negotiation request message to the peer
 797 * The negotiation procedure is in [MS-SMBD] 3.1.5.2 and 3.1.5.3
 798 * After negotiation, the transport is connected and ready for
 799 * carrying upper layer SMB payload
 800 */
 801static int smbd_post_send_negotiate_req(struct smbd_connection *info)
 802{
 803	struct ib_send_wr send_wr, *send_wr_fail;
 804	int rc = -ENOMEM;
 805	struct smbd_request *request;
 806	struct smbd_negotiate_req *packet;
 807
 808	request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 809	if (!request)
 810		return rc;
 811
 812	request->info = info;
 813
 814	packet = smbd_request_payload(request);
 815	packet->min_version = cpu_to_le16(SMBD_V1);
 816	packet->max_version = cpu_to_le16(SMBD_V1);
 817	packet->reserved = 0;
 818	packet->credits_requested = cpu_to_le16(info->send_credit_target);
 819	packet->preferred_send_size = cpu_to_le32(info->max_send_size);
 820	packet->max_receive_size = cpu_to_le32(info->max_receive_size);
 821	packet->max_fragmented_size =
 822		cpu_to_le32(info->max_fragmented_recv_size);
 823
 824	request->num_sge = 1;
 825	request->sge[0].addr = ib_dma_map_single(
 826				info->id->device, (void *)packet,
 827				sizeof(*packet), DMA_TO_DEVICE);
 828	if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 829		rc = -EIO;
 830		goto dma_mapping_failed;
 831	}
 832
 833	request->sge[0].length = sizeof(*packet);
 834	request->sge[0].lkey = info->pd->local_dma_lkey;
 835
 836	ib_dma_sync_single_for_device(
 837		info->id->device, request->sge[0].addr,
 838		request->sge[0].length, DMA_TO_DEVICE);
 839
 840	request->cqe.done = send_done;
 841
 842	send_wr.next = NULL;
 843	send_wr.wr_cqe = &request->cqe;
 844	send_wr.sg_list = request->sge;
 845	send_wr.num_sge = request->num_sge;
 846	send_wr.opcode = IB_WR_SEND;
 847	send_wr.send_flags = IB_SEND_SIGNALED;
 848
 849	log_rdma_send(INFO, "sge addr=%llx length=%x lkey=%x\n",
 850		request->sge[0].addr,
 851		request->sge[0].length, request->sge[0].lkey);
 852
 853	request->has_payload = false;
 854	atomic_inc(&info->send_pending);
 855	rc = ib_post_send(info->id->qp, &send_wr, &send_wr_fail);
 856	if (!rc)
 857		return 0;
 858
 859	/* if we reach here, post send failed */
 860	log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
 861	atomic_dec(&info->send_pending);
 862	ib_dma_unmap_single(info->id->device, request->sge[0].addr,
 863		request->sge[0].length, DMA_TO_DEVICE);
 864
 865	smbd_disconnect_rdma_connection(info);
 866
 867dma_mapping_failed:
 868	mempool_free(request, info->request_mempool);
 869	return rc;
 870}
 871
 872/*
 873 * Extend the credits to remote peer
 874 * This implements [MS-SMBD] 3.1.5.9
 875 * The idea is that we should extend credits to remote peer as quickly as
 876 * it's allowed, to maintain data flow. We allocate as much receive
 877 * buffer as possible, and extend the receive credits to remote peer
 878 * return value: the new credtis being granted.
 879 */
 880static int manage_credits_prior_sending(struct smbd_connection *info)
 881{
 882	int new_credits;
 883
 884	spin_lock(&info->lock_new_credits_offered);
 885	new_credits = info->new_credits_offered;
 886	info->new_credits_offered = 0;
 887	spin_unlock(&info->lock_new_credits_offered);
 888
 889	return new_credits;
 890}
 891
 892/*
 893 * Check if we need to send a KEEP_ALIVE message
 894 * The idle connection timer triggers a KEEP_ALIVE message when expires
 895 * SMB_DIRECT_RESPONSE_REQUESTED is set in the message flag to have peer send
 896 * back a response.
 897 * return value:
 898 * 1 if SMB_DIRECT_RESPONSE_REQUESTED needs to be set
 899 * 0: otherwise
 900 */
 901static int manage_keep_alive_before_sending(struct smbd_connection *info)
 902{
 903	if (info->keep_alive_requested == KEEP_ALIVE_PENDING) {
 904		info->keep_alive_requested = KEEP_ALIVE_SENT;
 905		return 1;
 906	}
 907	return 0;
 908}
 909
 910/*
 911 * Build and prepare the SMBD packet header
 912 * This function waits for avaialbe send credits and build a SMBD packet
 913 * header. The caller then optional append payload to the packet after
 914 * the header
 915 * intput values
 916 * size: the size of the payload
 917 * remaining_data_length: remaining data to send if this is part of a
 918 * fragmented packet
 919 * output values
 920 * request_out: the request allocated from this function
 921 * return values: 0 on success, otherwise actual error code returned
 922 */
 923static int smbd_create_header(struct smbd_connection *info,
 924		int size, int remaining_data_length,
 925		struct smbd_request **request_out)
 926{
 927	struct smbd_request *request;
 928	struct smbd_data_transfer *packet;
 929	int header_length;
 930	int rc;
 931
 932	/* Wait for send credits. A SMBD packet needs one credit */
 933	rc = wait_event_interruptible(info->wait_send_queue,
 934		atomic_read(&info->send_credits) > 0 ||
 935		info->transport_status != SMBD_CONNECTED);
 936	if (rc)
 937		return rc;
 938
 939	if (info->transport_status != SMBD_CONNECTED) {
 940		log_outgoing(ERR, "disconnected not sending\n");
 941		return -ENOENT;
 942	}
 943	atomic_dec(&info->send_credits);
 944
 945	request = mempool_alloc(info->request_mempool, GFP_KERNEL);
 946	if (!request) {
 947		rc = -ENOMEM;
 948		goto err;
 949	}
 950
 951	request->info = info;
 952
 953	/* Fill in the packet header */
 954	packet = smbd_request_payload(request);
 955	packet->credits_requested = cpu_to_le16(info->send_credit_target);
 956	packet->credits_granted =
 957		cpu_to_le16(manage_credits_prior_sending(info));
 958	info->send_immediate = false;
 959
 960	packet->flags = 0;
 961	if (manage_keep_alive_before_sending(info))
 962		packet->flags |= cpu_to_le16(SMB_DIRECT_RESPONSE_REQUESTED);
 963
 964	packet->reserved = 0;
 965	if (!size)
 966		packet->data_offset = 0;
 967	else
 968		packet->data_offset = cpu_to_le32(24);
 969	packet->data_length = cpu_to_le32(size);
 970	packet->remaining_data_length = cpu_to_le32(remaining_data_length);
 971	packet->padding = 0;
 972
 973	log_outgoing(INFO, "credits_requested=%d credits_granted=%d "
 974		"data_offset=%d data_length=%d remaining_data_length=%d\n",
 975		le16_to_cpu(packet->credits_requested),
 976		le16_to_cpu(packet->credits_granted),
 977		le32_to_cpu(packet->data_offset),
 978		le32_to_cpu(packet->data_length),
 979		le32_to_cpu(packet->remaining_data_length));
 980
 981	/* Map the packet to DMA */
 982	header_length = sizeof(struct smbd_data_transfer);
 983	/* If this is a packet without payload, don't send padding */
 984	if (!size)
 985		header_length = offsetof(struct smbd_data_transfer, padding);
 986
 987	request->num_sge = 1;
 988	request->sge[0].addr = ib_dma_map_single(info->id->device,
 989						 (void *)packet,
 990						 header_length,
 991						 DMA_BIDIRECTIONAL);
 992	if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
 993		mempool_free(request, info->request_mempool);
 994		rc = -EIO;
 995		goto err;
 996	}
 997
 998	request->sge[0].length = header_length;
 999	request->sge[0].lkey = info->pd->local_dma_lkey;
1000
1001	*request_out = request;
1002	return 0;
1003
1004err:
1005	atomic_inc(&info->send_credits);
1006	return rc;
1007}
1008
1009static void smbd_destroy_header(struct smbd_connection *info,
1010		struct smbd_request *request)
1011{
1012
1013	ib_dma_unmap_single(info->id->device,
1014			    request->sge[0].addr,
1015			    request->sge[0].length,
1016			    DMA_TO_DEVICE);
1017	mempool_free(request, info->request_mempool);
1018	atomic_inc(&info->send_credits);
1019}
1020
1021/* Post the send request */
1022static int smbd_post_send(struct smbd_connection *info,
1023		struct smbd_request *request, bool has_payload)
1024{
1025	struct ib_send_wr send_wr, *send_wr_fail;
1026	int rc, i;
1027
1028	for (i = 0; i < request->num_sge; i++) {
1029		log_rdma_send(INFO,
1030			"rdma_request sge[%d] addr=%llu length=%u\n",
1031			i, request->sge[i].addr, request->sge[i].length);
1032		ib_dma_sync_single_for_device(
1033			info->id->device,
1034			request->sge[i].addr,
1035			request->sge[i].length,
1036			DMA_TO_DEVICE);
1037	}
1038
1039	request->cqe.done = send_done;
1040
1041	send_wr.next = NULL;
1042	send_wr.wr_cqe = &request->cqe;
1043	send_wr.sg_list = request->sge;
1044	send_wr.num_sge = request->num_sge;
1045	send_wr.opcode = IB_WR_SEND;
1046	send_wr.send_flags = IB_SEND_SIGNALED;
1047
1048	if (has_payload) {
1049		request->has_payload = true;
1050		atomic_inc(&info->send_payload_pending);
1051	} else {
1052		request->has_payload = false;
1053		atomic_inc(&info->send_pending);
1054	}
1055
1056	rc = ib_post_send(info->id->qp, &send_wr, &send_wr_fail);
1057	if (rc) {
1058		log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
1059		if (has_payload) {
1060			if (atomic_dec_and_test(&info->send_payload_pending))
1061				wake_up(&info->wait_send_payload_pending);
1062		} else {
1063			if (atomic_dec_and_test(&info->send_pending))
1064				wake_up(&info->wait_send_pending);
1065		}
1066		smbd_disconnect_rdma_connection(info);
1067	} else
1068		/* Reset timer for idle connection after packet is sent */
1069		mod_delayed_work(info->workqueue, &info->idle_timer_work,
1070			info->keep_alive_interval*HZ);
1071
1072	return rc;
1073}
1074
1075static int smbd_post_send_sgl(struct smbd_connection *info,
1076	struct scatterlist *sgl, int data_length, int remaining_data_length)
1077{
1078	int num_sgs;
1079	int i, rc;
1080	struct smbd_request *request;
1081	struct scatterlist *sg;
1082
1083	rc = smbd_create_header(
1084		info, data_length, remaining_data_length, &request);
1085	if (rc)
1086		return rc;
1087
1088	num_sgs = sgl ? sg_nents(sgl) : 0;
1089	for_each_sg(sgl, sg, num_sgs, i) {
1090		request->sge[i+1].addr =
1091			ib_dma_map_page(info->id->device, sg_page(sg),
1092			       sg->offset, sg->length, DMA_BIDIRECTIONAL);
1093		if (ib_dma_mapping_error(
1094				info->id->device, request->sge[i+1].addr)) {
1095			rc = -EIO;
1096			request->sge[i+1].addr = 0;
1097			goto dma_mapping_failure;
1098		}
1099		request->sge[i+1].length = sg->length;
1100		request->sge[i+1].lkey = info->pd->local_dma_lkey;
1101		request->num_sge++;
1102	}
1103
1104	rc = smbd_post_send(info, request, data_length);
1105	if (!rc)
1106		return 0;
1107
1108dma_mapping_failure:
1109	for (i = 1; i < request->num_sge; i++)
1110		if (request->sge[i].addr)
1111			ib_dma_unmap_single(info->id->device,
1112					    request->sge[i].addr,
1113					    request->sge[i].length,
1114					    DMA_TO_DEVICE);
1115	smbd_destroy_header(info, request);
1116	return rc;
1117}
1118
1119/*
1120 * Send a page
1121 * page: the page to send
1122 * offset: offset in the page to send
1123 * size: length in the page to send
1124 * remaining_data_length: remaining data to send in this payload
1125 */
1126static int smbd_post_send_page(struct smbd_connection *info, struct page *page,
1127		unsigned long offset, size_t size, int remaining_data_length)
1128{
1129	struct scatterlist sgl;
1130
1131	sg_init_table(&sgl, 1);
1132	sg_set_page(&sgl, page, size, offset);
1133
1134	return smbd_post_send_sgl(info, &sgl, size, remaining_data_length);
1135}
1136
1137/*
1138 * Send an empty message
1139 * Empty message is used to extend credits to peer to for keep live
1140 * while there is no upper layer payload to send at the time
1141 */
1142static int smbd_post_send_empty(struct smbd_connection *info)
1143{
1144	info->count_send_empty++;
1145	return smbd_post_send_sgl(info, NULL, 0, 0);
1146}
1147
1148/*
1149 * Send a data buffer
1150 * iov: the iov array describing the data buffers
1151 * n_vec: number of iov array
1152 * remaining_data_length: remaining data to send following this packet
1153 * in segmented SMBD packet
1154 */
1155static int smbd_post_send_data(
1156	struct smbd_connection *info, struct kvec *iov, int n_vec,
1157	int remaining_data_length)
1158{
1159	int i;
1160	u32 data_length = 0;
1161	struct scatterlist sgl[SMBDIRECT_MAX_SGE];
1162
1163	if (n_vec > SMBDIRECT_MAX_SGE) {
1164		cifs_dbg(VFS, "Can't fit data to SGL, n_vec=%d\n", n_vec);
1165		return -ENOMEM;
1166	}
1167
1168	sg_init_table(sgl, n_vec);
1169	for (i = 0; i < n_vec; i++) {
1170		data_length += iov[i].iov_len;
1171		sg_set_buf(&sgl[i], iov[i].iov_base, iov[i].iov_len);
1172	}
1173
1174	return smbd_post_send_sgl(info, sgl, data_length, remaining_data_length);
1175}
1176
1177/*
1178 * Post a receive request to the transport
1179 * The remote peer can only send data when a receive request is posted
1180 * The interaction is controlled by send/receive credit system
1181 */
1182static int smbd_post_recv(
1183		struct smbd_connection *info, struct smbd_response *response)
1184{
1185	struct ib_recv_wr recv_wr, *recv_wr_fail = NULL;
1186	int rc = -EIO;
1187
1188	response->sge.addr = ib_dma_map_single(
1189				info->id->device, response->packet,
1190				info->max_receive_size, DMA_FROM_DEVICE);
1191	if (ib_dma_mapping_error(info->id->device, response->sge.addr))
1192		return rc;
1193
1194	response->sge.length = info->max_receive_size;
1195	response->sge.lkey = info->pd->local_dma_lkey;
1196
1197	response->cqe.done = recv_done;
1198
1199	recv_wr.wr_cqe = &response->cqe;
1200	recv_wr.next = NULL;
1201	recv_wr.sg_list = &response->sge;
1202	recv_wr.num_sge = 1;
1203
1204	rc = ib_post_recv(info->id->qp, &recv_wr, &recv_wr_fail);
1205	if (rc) {
1206		ib_dma_unmap_single(info->id->device, response->sge.addr,
1207				    response->sge.length, DMA_FROM_DEVICE);
1208		smbd_disconnect_rdma_connection(info);
1209		log_rdma_recv(ERR, "ib_post_recv failed rc=%d\n", rc);
1210	}
1211
1212	return rc;
1213}
1214
1215/* Perform SMBD negotiate according to [MS-SMBD] 3.1.5.2 */
1216static int smbd_negotiate(struct smbd_connection *info)
1217{
1218	int rc;
1219	struct smbd_response *response = get_receive_buffer(info);
1220
1221	response->type = SMBD_NEGOTIATE_RESP;
1222	rc = smbd_post_recv(info, response);
1223	log_rdma_event(INFO,
1224		"smbd_post_recv rc=%d iov.addr=%llx iov.length=%x "
1225		"iov.lkey=%x\n",
1226		rc, response->sge.addr,
1227		response->sge.length, response->sge.lkey);
1228	if (rc)
1229		return rc;
1230
1231	init_completion(&info->negotiate_completion);
1232	info->negotiate_done = false;
1233	rc = smbd_post_send_negotiate_req(info);
1234	if (rc)
1235		return rc;
1236
1237	rc = wait_for_completion_interruptible_timeout(
1238		&info->negotiate_completion, SMBD_NEGOTIATE_TIMEOUT * HZ);
1239	log_rdma_event(INFO, "wait_for_completion_timeout rc=%d\n", rc);
1240
1241	if (info->negotiate_done)
1242		return 0;
1243
1244	if (rc == 0)
1245		rc = -ETIMEDOUT;
1246	else if (rc == -ERESTARTSYS)
1247		rc = -EINTR;
1248	else
1249		rc = -ENOTCONN;
1250
1251	return rc;
1252}
1253
1254static void put_empty_packet(
1255		struct smbd_connection *info, struct smbd_response *response)
1256{
1257	spin_lock(&info->empty_packet_queue_lock);
1258	list_add_tail(&response->list, &info->empty_packet_queue);
1259	info->count_empty_packet_queue++;
1260	spin_unlock(&info->empty_packet_queue_lock);
1261
1262	queue_work(info->workqueue, &info->post_send_credits_work);
1263}
1264
1265/*
1266 * Implement Connection.FragmentReassemblyBuffer defined in [MS-SMBD] 3.1.1.1
1267 * This is a queue for reassembling upper layer payload and present to upper
1268 * layer. All the inncoming payload go to the reassembly queue, regardless of
1269 * if reassembly is required. The uuper layer code reads from the queue for all
1270 * incoming payloads.
1271 * Put a received packet to the reassembly queue
1272 * response: the packet received
1273 * data_length: the size of payload in this packet
1274 */
1275static void enqueue_reassembly(
1276	struct smbd_connection *info,
1277	struct smbd_response *response,
1278	int data_length)
1279{
1280	spin_lock(&info->reassembly_queue_lock);
1281	list_add_tail(&response->list, &info->reassembly_queue);
1282	info->reassembly_queue_length++;
1283	/*
1284	 * Make sure reassembly_data_length is updated after list and
1285	 * reassembly_queue_length are updated. On the dequeue side
1286	 * reassembly_data_length is checked without a lock to determine
1287	 * if reassembly_queue_length and list is up to date
1288	 */
1289	virt_wmb();
1290	info->reassembly_data_length += data_length;
1291	spin_unlock(&info->reassembly_queue_lock);
1292	info->count_reassembly_queue++;
1293	info->count_enqueue_reassembly_queue++;
1294}
1295
1296/*
1297 * Get the first entry at the front of reassembly queue
1298 * Caller is responsible for locking
1299 * return value: the first entry if any, NULL if queue is empty
1300 */
1301static struct smbd_response *_get_first_reassembly(struct smbd_connection *info)
1302{
1303	struct smbd_response *ret = NULL;
1304
1305	if (!list_empty(&info->reassembly_queue)) {
1306		ret = list_first_entry(
1307			&info->reassembly_queue,
1308			struct smbd_response, list);
1309	}
1310	return ret;
1311}
1312
1313static struct smbd_response *get_empty_queue_buffer(
1314		struct smbd_connection *info)
1315{
1316	struct smbd_response *ret = NULL;
1317	unsigned long flags;
1318
1319	spin_lock_irqsave(&info->empty_packet_queue_lock, flags);
1320	if (!list_empty(&info->empty_packet_queue)) {
1321		ret = list_first_entry(
1322			&info->empty_packet_queue,
1323			struct smbd_response, list);
1324		list_del(&ret->list);
1325		info->count_empty_packet_queue--;
1326	}
1327	spin_unlock_irqrestore(&info->empty_packet_queue_lock, flags);
1328
1329	return ret;
1330}
1331
1332/*
1333 * Get a receive buffer
1334 * For each remote send, we need to post a receive. The receive buffers are
1335 * pre-allocated in advance.
1336 * return value: the receive buffer, NULL if none is available
1337 */
1338static struct smbd_response *get_receive_buffer(struct smbd_connection *info)
1339{
1340	struct smbd_response *ret = NULL;
1341	unsigned long flags;
1342
1343	spin_lock_irqsave(&info->receive_queue_lock, flags);
1344	if (!list_empty(&info->receive_queue)) {
1345		ret = list_first_entry(
1346			&info->receive_queue,
1347			struct smbd_response, list);
1348		list_del(&ret->list);
1349		info->count_receive_queue--;
1350		info->count_get_receive_buffer++;
1351	}
1352	spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1353
1354	return ret;
1355}
1356
1357/*
1358 * Return a receive buffer
1359 * Upon returning of a receive buffer, we can post new receive and extend
1360 * more receive credits to remote peer. This is done immediately after a
1361 * receive buffer is returned.
1362 */
1363static void put_receive_buffer(
1364	struct smbd_connection *info, struct smbd_response *response)
1365{
1366	unsigned long flags;
1367
1368	ib_dma_unmap_single(info->id->device, response->sge.addr,
1369		response->sge.length, DMA_FROM_DEVICE);
1370
1371	spin_lock_irqsave(&info->receive_queue_lock, flags);
1372	list_add_tail(&response->list, &info->receive_queue);
1373	info->count_receive_queue++;
1374	info->count_put_receive_buffer++;
1375	spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1376
1377	queue_work(info->workqueue, &info->post_send_credits_work);
1378}
1379
1380/* Preallocate all receive buffer on transport establishment */
1381static int allocate_receive_buffers(struct smbd_connection *info, int num_buf)
1382{
1383	int i;
1384	struct smbd_response *response;
1385
1386	INIT_LIST_HEAD(&info->reassembly_queue);
1387	spin_lock_init(&info->reassembly_queue_lock);
1388	info->reassembly_data_length = 0;
1389	info->reassembly_queue_length = 0;
1390
1391	INIT_LIST_HEAD(&info->receive_queue);
1392	spin_lock_init(&info->receive_queue_lock);
1393	info->count_receive_queue = 0;
1394
1395	INIT_LIST_HEAD(&info->empty_packet_queue);
1396	spin_lock_init(&info->empty_packet_queue_lock);
1397	info->count_empty_packet_queue = 0;
1398
1399	init_waitqueue_head(&info->wait_receive_queues);
1400
1401	for (i = 0; i < num_buf; i++) {
1402		response = mempool_alloc(info->response_mempool, GFP_KERNEL);
1403		if (!response)
1404			goto allocate_failed;
1405
1406		response->info = info;
1407		list_add_tail(&response->list, &info->receive_queue);
1408		info->count_receive_queue++;
1409	}
1410
1411	return 0;
1412
1413allocate_failed:
1414	while (!list_empty(&info->receive_queue)) {
1415		response = list_first_entry(
1416				&info->receive_queue,
1417				struct smbd_response, list);
1418		list_del(&response->list);
1419		info->count_receive_queue--;
1420
1421		mempool_free(response, info->response_mempool);
1422	}
1423	return -ENOMEM;
1424}
1425
1426static void destroy_receive_buffers(struct smbd_connection *info)
1427{
1428	struct smbd_response *response;
1429
1430	while ((response = get_receive_buffer(info)))
1431		mempool_free(response, info->response_mempool);
1432
1433	while ((response = get_empty_queue_buffer(info)))
1434		mempool_free(response, info->response_mempool);
1435}
1436
1437/*
1438 * Check and send an immediate or keep alive packet
1439 * The condition to send those packets are defined in [MS-SMBD] 3.1.1.1
1440 * Connection.KeepaliveRequested and Connection.SendImmediate
1441 * The idea is to extend credits to server as soon as it becomes available
1442 */
1443static void send_immediate_work(struct work_struct *work)
1444{
1445	struct smbd_connection *info = container_of(
1446					work, struct smbd_connection,
1447					send_immediate_work.work);
1448
1449	if (info->keep_alive_requested == KEEP_ALIVE_PENDING ||
1450	    info->send_immediate) {
1451		log_keep_alive(INFO, "send an empty message\n");
1452		smbd_post_send_empty(info);
1453	}
1454}
1455
1456/* Implement idle connection timer [MS-SMBD] 3.1.6.2 */
1457static void idle_connection_timer(struct work_struct *work)
1458{
1459	struct smbd_connection *info = container_of(
1460					work, struct smbd_connection,
1461					idle_timer_work.work);
1462
1463	if (info->keep_alive_requested != KEEP_ALIVE_NONE) {
1464		log_keep_alive(ERR,
1465			"error status info->keep_alive_requested=%d\n",
1466			info->keep_alive_requested);
1467		smbd_disconnect_rdma_connection(info);
1468		return;
1469	}
1470
1471	log_keep_alive(INFO, "about to send an empty idle message\n");
1472	smbd_post_send_empty(info);
1473
1474	/* Setup the next idle timeout work */
1475	queue_delayed_work(info->workqueue, &info->idle_timer_work,
1476			info->keep_alive_interval*HZ);
1477}
1478
1479/* Destroy this SMBD connection, called from upper layer */
1480void smbd_destroy(struct smbd_connection *info)
1481{
1482	log_rdma_event(INFO, "destroying rdma session\n");
1483
1484	/* Kick off the disconnection process */
1485	smbd_disconnect_rdma_connection(info);
1486
1487	log_rdma_event(INFO, "wait for transport being destroyed\n");
1488	wait_event(info->wait_destroy,
1489		info->transport_status == SMBD_DESTROYED);
1490
1491	destroy_workqueue(info->workqueue);
1492	kfree(info);
1493}
1494
1495/*
1496 * Reconnect this SMBD connection, called from upper layer
1497 * return value: 0 on success, or actual error code
1498 */
1499int smbd_reconnect(struct TCP_Server_Info *server)
1500{
1501	log_rdma_event(INFO, "reconnecting rdma session\n");
1502
1503	if (!server->smbd_conn) {
1504		log_rdma_event(INFO, "rdma session already destroyed\n");
1505		goto create_conn;
1506	}
1507
1508	/*
1509	 * This is possible if transport is disconnected and we haven't received
1510	 * notification from RDMA, but upper layer has detected timeout
1511	 */
1512	if (server->smbd_conn->transport_status == SMBD_CONNECTED) {
1513		log_rdma_event(INFO, "disconnecting transport\n");
1514		smbd_disconnect_rdma_connection(server->smbd_conn);
1515	}
1516
1517	/* wait until the transport is destroyed */
1518	if (!wait_event_timeout(server->smbd_conn->wait_destroy,
1519		server->smbd_conn->transport_status == SMBD_DESTROYED, 5*HZ))
1520		return -EAGAIN;
1521
1522	destroy_workqueue(server->smbd_conn->workqueue);
1523	kfree(server->smbd_conn);
1524
1525create_conn:
1526	log_rdma_event(INFO, "creating rdma session\n");
1527	server->smbd_conn = smbd_get_connection(
1528		server, (struct sockaddr *) &server->dstaddr);
1529	log_rdma_event(INFO, "created rdma session info=%p\n",
1530		server->smbd_conn);
1531
1532	return server->smbd_conn ? 0 : -ENOENT;
1533}
1534
1535static void destroy_caches_and_workqueue(struct smbd_connection *info)
1536{
1537	destroy_receive_buffers(info);
1538	destroy_workqueue(info->workqueue);
1539	mempool_destroy(info->response_mempool);
1540	kmem_cache_destroy(info->response_cache);
1541	mempool_destroy(info->request_mempool);
1542	kmem_cache_destroy(info->request_cache);
1543}
1544
1545#define MAX_NAME_LEN	80
1546static int allocate_caches_and_workqueue(struct smbd_connection *info)
1547{
1548	char name[MAX_NAME_LEN];
1549	int rc;
1550
1551	snprintf(name, MAX_NAME_LEN, "smbd_request_%p", info);
1552	info->request_cache =
1553		kmem_cache_create(
1554			name,
1555			sizeof(struct smbd_request) +
1556				sizeof(struct smbd_data_transfer),
1557			0, SLAB_HWCACHE_ALIGN, NULL);
1558	if (!info->request_cache)
1559		return -ENOMEM;
1560
1561	info->request_mempool =
1562		mempool_create(info->send_credit_target, mempool_alloc_slab,
1563			mempool_free_slab, info->request_cache);
1564	if (!info->request_mempool)
1565		goto out1;
1566
1567	snprintf(name, MAX_NAME_LEN, "smbd_response_%p", info);
1568	info->response_cache =
1569		kmem_cache_create(
1570			name,
1571			sizeof(struct smbd_response) +
1572				info->max_receive_size,
1573			0, SLAB_HWCACHE_ALIGN, NULL);
1574	if (!info->response_cache)
1575		goto out2;
1576
1577	info->response_mempool =
1578		mempool_create(info->receive_credit_max, mempool_alloc_slab,
1579		       mempool_free_slab, info->response_cache);
1580	if (!info->response_mempool)
1581		goto out3;
1582
1583	snprintf(name, MAX_NAME_LEN, "smbd_%p", info);
1584	info->workqueue = create_workqueue(name);
1585	if (!info->workqueue)
1586		goto out4;
1587
1588	rc = allocate_receive_buffers(info, info->receive_credit_max);
1589	if (rc) {
1590		log_rdma_event(ERR, "failed to allocate receive buffers\n");
1591		goto out5;
1592	}
1593
1594	return 0;
1595
1596out5:
1597	destroy_workqueue(info->workqueue);
1598out4:
1599	mempool_destroy(info->response_mempool);
1600out3:
1601	kmem_cache_destroy(info->response_cache);
1602out2:
1603	mempool_destroy(info->request_mempool);
1604out1:
1605	kmem_cache_destroy(info->request_cache);
1606	return -ENOMEM;
1607}
1608
1609/* Create a SMBD connection, called by upper layer */
1610static struct smbd_connection *_smbd_get_connection(
1611	struct TCP_Server_Info *server, struct sockaddr *dstaddr, int port)
1612{
1613	int rc;
1614	struct smbd_connection *info;
1615	struct rdma_conn_param conn_param;
1616	struct ib_qp_init_attr qp_attr;
1617	struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr;
1618	struct ib_port_immutable port_immutable;
1619	u32 ird_ord_hdr[2];
1620
1621	info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL);
1622	if (!info)
1623		return NULL;
1624
1625	info->transport_status = SMBD_CONNECTING;
1626	rc = smbd_ia_open(info, dstaddr, port);
1627	if (rc) {
1628		log_rdma_event(INFO, "smbd_ia_open rc=%d\n", rc);
1629		goto create_id_failed;
1630	}
1631
1632	if (smbd_send_credit_target > info->id->device->attrs.max_cqe ||
1633	    smbd_send_credit_target > info->id->device->attrs.max_qp_wr) {
1634		log_rdma_event(ERR,
1635			"consider lowering send_credit_target = %d. "
1636			"Possible CQE overrun, device "
1637			"reporting max_cpe %d max_qp_wr %d\n",
1638			smbd_send_credit_target,
1639			info->id->device->attrs.max_cqe,
1640			info->id->device->attrs.max_qp_wr);
1641		goto config_failed;
1642	}
1643
1644	if (smbd_receive_credit_max > info->id->device->attrs.max_cqe ||
1645	    smbd_receive_credit_max > info->id->device->attrs.max_qp_wr) {
1646		log_rdma_event(ERR,
1647			"consider lowering receive_credit_max = %d. "
1648			"Possible CQE overrun, device "
1649			"reporting max_cpe %d max_qp_wr %d\n",
1650			smbd_receive_credit_max,
1651			info->id->device->attrs.max_cqe,
1652			info->id->device->attrs.max_qp_wr);
1653		goto config_failed;
1654	}
1655
1656	info->receive_credit_max = smbd_receive_credit_max;
1657	info->send_credit_target = smbd_send_credit_target;
1658	info->max_send_size = smbd_max_send_size;
1659	info->max_fragmented_recv_size = smbd_max_fragmented_recv_size;
1660	info->max_receive_size = smbd_max_receive_size;
1661	info->keep_alive_interval = smbd_keep_alive_interval;
1662
1663	if (info->id->device->attrs.max_sge < SMBDIRECT_MAX_SGE) {
1664		log_rdma_event(ERR, "warning: device max_sge = %d too small\n",
1665			info->id->device->attrs.max_sge);
1666		log_rdma_event(ERR, "Queue Pair creation may fail\n");
1667	}
1668
1669	info->send_cq = NULL;
1670	info->recv_cq = NULL;
1671	info->send_cq = ib_alloc_cq(info->id->device, info,
1672			info->send_credit_target, 0, IB_POLL_SOFTIRQ);
1673	if (IS_ERR(info->send_cq)) {
1674		info->send_cq = NULL;
1675		goto alloc_cq_failed;
1676	}
1677
1678	info->recv_cq = ib_alloc_cq(info->id->device, info,
1679			info->receive_credit_max, 0, IB_POLL_SOFTIRQ);
1680	if (IS_ERR(info->recv_cq)) {
1681		info->recv_cq = NULL;
1682		goto alloc_cq_failed;
1683	}
1684
1685	memset(&qp_attr, 0, sizeof(qp_attr));
1686	qp_attr.event_handler = smbd_qp_async_error_upcall;
1687	qp_attr.qp_context = info;
1688	qp_attr.cap.max_send_wr = info->send_credit_target;
1689	qp_attr.cap.max_recv_wr = info->receive_credit_max;
1690	qp_attr.cap.max_send_sge = SMBDIRECT_MAX_SGE;
1691	qp_attr.cap.max_recv_sge = SMBDIRECT_MAX_SGE;
1692	qp_attr.cap.max_inline_data = 0;
1693	qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
1694	qp_attr.qp_type = IB_QPT_RC;
1695	qp_attr.send_cq = info->send_cq;
1696	qp_attr.recv_cq = info->recv_cq;
1697	qp_attr.port_num = ~0;
1698
1699	rc = rdma_create_qp(info->id, info->pd, &qp_attr);
1700	if (rc) {
1701		log_rdma_event(ERR, "rdma_create_qp failed %i\n", rc);
1702		goto create_qp_failed;
1703	}
1704
1705	memset(&conn_param, 0, sizeof(conn_param));
1706	conn_param.initiator_depth = 0;
1707
1708	conn_param.responder_resources =
1709		info->id->device->attrs.max_qp_rd_atom
1710			< SMBD_CM_RESPONDER_RESOURCES ?
1711		info->id->device->attrs.max_qp_rd_atom :
1712		SMBD_CM_RESPONDER_RESOURCES;
1713	info->responder_resources = conn_param.responder_resources;
1714	log_rdma_mr(INFO, "responder_resources=%d\n",
1715		info->responder_resources);
1716
1717	/* Need to send IRD/ORD in private data for iWARP */
1718	info->id->device->get_port_immutable(
1719		info->id->device, info->id->port_num, &port_immutable);
1720	if (port_immutable.core_cap_flags & RDMA_CORE_PORT_IWARP) {
1721		ird_ord_hdr[0] = info->responder_resources;
1722		ird_ord_hdr[1] = 1;
1723		conn_param.private_data = ird_ord_hdr;
1724		conn_param.private_data_len = sizeof(ird_ord_hdr);
1725	} else {
1726		conn_param.private_data = NULL;
1727		conn_param.private_data_len = 0;
1728	}
1729
1730	conn_param.retry_count = SMBD_CM_RETRY;
1731	conn_param.rnr_retry_count = SMBD_CM_RNR_RETRY;
1732	conn_param.flow_control = 0;
1733	init_waitqueue_head(&info->wait_destroy);
1734
1735	log_rdma_event(INFO, "connecting to IP %pI4 port %d\n",
1736		&addr_in->sin_addr, port);
1737
1738	init_waitqueue_head(&info->conn_wait);
1739	rc = rdma_connect(info->id, &conn_param);
1740	if (rc) {
1741		log_rdma_event(ERR, "rdma_connect() failed with %i\n", rc);
1742		goto rdma_connect_failed;
1743	}
1744
1745	wait_event_interruptible(
1746		info->conn_wait, info->transport_status != SMBD_CONNECTING);
1747
1748	if (info->transport_status != SMBD_CONNECTED) {
1749		log_rdma_event(ERR, "rdma_connect failed port=%d\n", port);
1750		goto rdma_connect_failed;
1751	}
1752
1753	log_rdma_event(INFO, "rdma_connect connected\n");
1754
1755	rc = allocate_caches_and_workqueue(info);
1756	if (rc) {
1757		log_rdma_event(ERR, "cache allocation failed\n");
1758		goto allocate_cache_failed;
1759	}
1760
1761	init_waitqueue_head(&info->wait_send_queue);
1762	init_waitqueue_head(&info->wait_reassembly_queue);
1763
1764	INIT_DELAYED_WORK(&info->idle_timer_work, idle_connection_timer);
1765	INIT_DELAYED_WORK(&info->send_immediate_work, send_immediate_work);
1766	queue_delayed_work(info->workqueue, &info->idle_timer_work,
1767		info->keep_alive_interval*HZ);
1768
1769	init_waitqueue_head(&info->wait_smbd_send_pending);
1770	info->smbd_send_pending = 0;
1771
1772	init_waitqueue_head(&info->wait_smbd_recv_pending);
1773	info->smbd_recv_pending = 0;
1774
1775	init_waitqueue_head(&info->wait_send_pending);
1776	atomic_set(&info->send_pending, 0);
1777
1778	init_waitqueue_head(&info->wait_send_payload_pending);
1779	atomic_set(&info->send_payload_pending, 0);
1780
1781	INIT_WORK(&info->disconnect_work, smbd_disconnect_rdma_work);
1782	INIT_WORK(&info->destroy_work, smbd_destroy_rdma_work);
1783	INIT_WORK(&info->recv_done_work, smbd_recv_done_work);
1784	INIT_WORK(&info->post_send_credits_work, smbd_post_send_credits);
1785	info->new_credits_offered = 0;
1786	spin_lock_init(&info->lock_new_credits_offered);
1787
1788	rc = smbd_negotiate(info);
1789	if (rc) {
1790		log_rdma_event(ERR, "smbd_negotiate rc=%d\n", rc);
1791		goto negotiation_failed;
1792	}
1793
1794	rc = allocate_mr_list(info);
1795	if (rc) {
1796		log_rdma_mr(ERR, "memory registration allocation failed\n");
1797		goto allocate_mr_failed;
1798	}
1799
1800	return info;
1801
1802allocate_mr_failed:
1803	/* At this point, need to a full transport shutdown */
1804	smbd_destroy(info);
1805	return NULL;
1806
1807negotiation_failed:
1808	cancel_delayed_work_sync(&info->idle_timer_work);
1809	destroy_caches_and_workqueue(info);
1810	info->transport_status = SMBD_NEGOTIATE_FAILED;
1811	init_waitqueue_head(&info->conn_wait);
1812	rdma_disconnect(info->id);
1813	wait_event(info->conn_wait,
1814		info->transport_status == SMBD_DISCONNECTED);
1815
1816allocate_cache_failed:
1817rdma_connect_failed:
1818	rdma_destroy_qp(info->id);
1819
1820create_qp_failed:
1821alloc_cq_failed:
1822	if (info->send_cq)
1823		ib_free_cq(info->send_cq);
1824	if (info->recv_cq)
1825		ib_free_cq(info->recv_cq);
1826
1827config_failed:
1828	ib_dealloc_pd(info->pd);
1829	rdma_destroy_id(info->id);
1830
1831create_id_failed:
1832	kfree(info);
1833	return NULL;
1834}
1835
1836struct smbd_connection *smbd_get_connection(
1837	struct TCP_Server_Info *server, struct sockaddr *dstaddr)
1838{
1839	struct smbd_connection *ret;
1840	int port = SMBD_PORT;
1841
1842try_again:
1843	ret = _smbd_get_connection(server, dstaddr, port);
1844
1845	/* Try SMB_PORT if SMBD_PORT doesn't work */
1846	if (!ret && port == SMBD_PORT) {
1847		port = SMB_PORT;
1848		goto try_again;
1849	}
1850	return ret;
1851}
1852
1853/*
1854 * Receive data from receive reassembly queue
1855 * All the incoming data packets are placed in reassembly queue
1856 * buf: the buffer to read data into
1857 * size: the length of data to read
1858 * return value: actual data read
1859 * Note: this implementation copies the data from reassebmly queue to receive
1860 * buffers used by upper layer. This is not the optimal code path. A better way
1861 * to do it is to not have upper layer allocate its receive buffers but rather
1862 * borrow the buffer from reassembly queue, and return it after data is
1863 * consumed. But this will require more changes to upper layer code, and also
1864 * need to consider packet boundaries while they still being reassembled.
1865 */
1866static int smbd_recv_buf(struct smbd_connection *info, char *buf,
1867		unsigned int size)
1868{
1869	struct smbd_response *response;
1870	struct smbd_data_transfer *data_transfer;
1871	int to_copy, to_read, data_read, offset;
1872	u32 data_length, remaining_data_length, data_offset;
1873	int rc;
1874
1875again:
1876	if (info->transport_status != SMBD_CONNECTED) {
1877		log_read(ERR, "disconnected\n");
1878		return -ENODEV;
1879	}
1880
1881	/*
1882	 * No need to hold the reassembly queue lock all the time as we are
1883	 * the only one reading from the front of the queue. The transport
1884	 * may add more entries to the back of the queue at the same time
1885	 */
1886	log_read(INFO, "size=%d info->reassembly_data_length=%d\n", size,
1887		info->reassembly_data_length);
1888	if (info->reassembly_data_length >= size) {
1889		int queue_length;
1890		int queue_removed = 0;
1891
1892		/*
1893		 * Need to make sure reassembly_data_length is read before
1894		 * reading reassembly_queue_length and calling
1895		 * _get_first_reassembly. This call is lock free
1896		 * as we never read at the end of the queue which are being
1897		 * updated in SOFTIRQ as more data is received
1898		 */
1899		virt_rmb();
1900		queue_length = info->reassembly_queue_length;
1901		data_read = 0;
1902		to_read = size;
1903		offset = info->first_entry_offset;
1904		while (data_read < size) {
1905			response = _get_first_reassembly(info);
1906			data_transfer = smbd_response_payload(response);
1907			data_length = le32_to_cpu(data_transfer->data_length);
1908			remaining_data_length =
1909				le32_to_cpu(
1910					data_transfer->remaining_data_length);
1911			data_offset = le32_to_cpu(data_transfer->data_offset);
1912
1913			/*
1914			 * The upper layer expects RFC1002 length at the
1915			 * beginning of the payload. Return it to indicate
1916			 * the total length of the packet. This minimize the
1917			 * change to upper layer packet processing logic. This
1918			 * will be eventually remove when an intermediate
1919			 * transport layer is added
1920			 */
1921			if (response->first_segment && size == 4) {
1922				unsigned int rfc1002_len =
1923					data_length + remaining_data_length;
1924				*((__be32 *)buf) = cpu_to_be32(rfc1002_len);
1925				data_read = 4;
1926				response->first_segment = false;
1927				log_read(INFO, "returning rfc1002 length %d\n",
1928					rfc1002_len);
1929				goto read_rfc1002_done;
1930			}
1931
1932			to_copy = min_t(int, data_length - offset, to_read);
1933			memcpy(
1934				buf + data_read,
1935				(char *)data_transfer + data_offset + offset,
1936				to_copy);
1937
1938			/* move on to the next buffer? */
1939			if (to_copy == data_length - offset) {
1940				queue_length--;
1941				/*
1942				 * No need to lock if we are not at the
1943				 * end of the queue
1944				 */
1945				if (queue_length)
1946					list_del(&response->list);
1947				else {
1948					spin_lock_irq(
1949						&info->reassembly_queue_lock);
1950					list_del(&response->list);
1951					spin_unlock_irq(
1952						&info->reassembly_queue_lock);
1953				}
1954				queue_removed++;
1955				info->count_reassembly_queue--;
1956				info->count_dequeue_reassembly_queue++;
1957				put_receive_buffer(info, response);
1958				offset = 0;
1959				log_read(INFO, "put_receive_buffer offset=0\n");
1960			} else
1961				offset += to_copy;
1962
1963			to_read -= to_copy;
1964			data_read += to_copy;
1965
1966			log_read(INFO, "_get_first_reassembly memcpy %d bytes "
1967				"data_transfer_length-offset=%d after that "
1968				"to_read=%d data_read=%d offset=%d\n",
1969				to_copy, data_length - offset,
1970				to_read, data_read, offset);
1971		}
1972
1973		spin_lock_irq(&info->reassembly_queue_lock);
1974		info->reassembly_data_length -= data_read;
1975		info->reassembly_queue_length -= queue_removed;
1976		spin_unlock_irq(&info->reassembly_queue_lock);
1977
1978		info->first_entry_offset = offset;
1979		log_read(INFO, "returning to thread data_read=%d "
1980			"reassembly_data_length=%d first_entry_offset=%d\n",
1981			data_read, info->reassembly_data_length,
1982			info->first_entry_offset);
1983read_rfc1002_done:
1984		return data_read;
1985	}
1986
1987	log_read(INFO, "wait_event on more data\n");
1988	rc = wait_event_interruptible(
1989		info->wait_reassembly_queue,
1990		info->reassembly_data_length >= size ||
1991			info->transport_status != SMBD_CONNECTED);
1992	/* Don't return any data if interrupted */
1993	if (rc)
1994		return -ENODEV;
1995
1996	goto again;
1997}
1998
1999/*
2000 * Receive a page from receive reassembly queue
2001 * page: the page to read data into
2002 * to_read: the length of data to read
2003 * return value: actual data read
2004 */
2005static int smbd_recv_page(struct smbd_connection *info,
2006		struct page *page, unsigned int to_read)
2007{
2008	int ret;
2009	char *to_address;
2010
2011	/* make sure we have the page ready for read */
2012	ret = wait_event_interruptible(
2013		info->wait_reassembly_queue,
2014		info->reassembly_data_length >= to_read ||
2015			info->transport_status != SMBD_CONNECTED);
2016	if (ret)
2017		return 0;
2018
2019	/* now we can read from reassembly queue and not sleep */
2020	to_address = kmap_atomic(page);
2021
2022	log_read(INFO, "reading from page=%p address=%p to_read=%d\n",
2023		page, to_address, to_read);
2024
2025	ret = smbd_recv_buf(info, to_address, to_read);
2026	kunmap_atomic(to_address);
2027
2028	return ret;
2029}
2030
2031/*
2032 * Receive data from transport
2033 * msg: a msghdr point to the buffer, can be ITER_KVEC or ITER_BVEC
2034 * return: total bytes read, or 0. SMB Direct will not do partial read.
2035 */
2036int smbd_recv(struct smbd_connection *info, struct msghdr *msg)
2037{
2038	char *buf;
2039	struct page *page;
2040	unsigned int to_read;
2041	int rc;
2042
2043	info->smbd_recv_pending++;
2044
2045	switch (msg->msg_iter.type) {
2046	case READ | ITER_KVEC:
2047		buf = msg->msg_iter.kvec->iov_base;
2048		to_read = msg->msg_iter.kvec->iov_len;
2049		rc = smbd_recv_buf(info, buf, to_read);
2050		break;
2051
2052	case READ | ITER_BVEC:
2053		page = msg->msg_iter.bvec->bv_page;
2054		to_read = msg->msg_iter.bvec->bv_len;
2055		rc = smbd_recv_page(info, page, to_read);
2056		break;
2057
2058	default:
2059		/* It's a bug in upper layer to get there */
2060		cifs_dbg(VFS, "CIFS: invalid msg type %d\n",
2061			msg->msg_iter.type);
2062		rc = -EIO;
2063	}
2064
2065	info->smbd_recv_pending--;
2066	wake_up(&info->wait_smbd_recv_pending);
2067
2068	/* SMBDirect will read it all or nothing */
2069	if (rc > 0)
2070		msg->msg_iter.count = 0;
2071	return rc;
2072}
2073
2074/*
2075 * Send data to transport
2076 * Each rqst is transported as a SMBDirect payload
2077 * rqst: the data to write
2078 * return value: 0 if successfully write, otherwise error code
2079 */
2080int smbd_send(struct smbd_connection *info, struct smb_rqst *rqst)
2081{
2082	struct kvec vec;
2083	int nvecs;
2084	int size;
2085	int buflen = 0, remaining_data_length;
2086	int start, i, j;
2087	int max_iov_size =
2088		info->max_send_size - sizeof(struct smbd_data_transfer);
2089	struct kvec *iov;
2090	int rc;
2091
2092	info->smbd_send_pending++;
2093	if (info->transport_status != SMBD_CONNECTED) {
2094		rc = -ENODEV;
2095		goto done;
2096	}
2097
2098	/*
2099	 * Skip the RFC1002 length defined in MS-SMB2 section 2.1
2100	 * It is used only for TCP transport in the iov[0]
2101	 * In future we may want to add a transport layer under protocol
2102	 * layer so this will only be issued to TCP transport
2103	 */
2104
2105	if (rqst->rq_iov[0].iov_len != 4) {
2106		log_write(ERR, "expected the pdu length in 1st iov, but got %zu\n", rqst->rq_iov[0].iov_len);
2107		return -EINVAL;
2108	}
2109	iov = &rqst->rq_iov[1];
2110
2111	/* total up iov array first */
2112	for (i = 0; i < rqst->rq_nvec-1; i++) {
2113		buflen += iov[i].iov_len;
2114	}
2115
2116	/* add in the page array if there is one */
2117	if (rqst->rq_npages) {
2118		buflen += rqst->rq_pagesz * (rqst->rq_npages - 1);
2119		buflen += rqst->rq_tailsz;
2120	}
2121
2122	if (buflen + sizeof(struct smbd_data_transfer) >
2123		info->max_fragmented_send_size) {
2124		log_write(ERR, "payload size %d > max size %d\n",
2125			buflen, info->max_fragmented_send_size);
2126		rc = -EINVAL;
2127		goto done;
2128	}
2129
2130	cifs_dbg(FYI, "Sending smb (RDMA): smb_len=%u\n", buflen);
2131	for (i = 0; i < rqst->rq_nvec-1; i++)
2132		dump_smb(iov[i].iov_base, iov[i].iov_len);
2133
2134	remaining_data_length = buflen;
2135
2136	log_write(INFO, "rqst->rq_nvec=%d rqst->rq_npages=%d rq_pagesz=%d "
2137		"rq_tailsz=%d buflen=%d\n",
2138		rqst->rq_nvec, rqst->rq_npages, rqst->rq_pagesz,
2139		rqst->rq_tailsz, buflen);
2140
2141	start = i = iov[0].iov_len ? 0 : 1;
2142	buflen = 0;
2143	while (true) {
2144		buflen += iov[i].iov_len;
2145		if (buflen > max_iov_size) {
2146			if (i > start) {
2147				remaining_data_length -=
2148					(buflen-iov[i].iov_len);
2149				log_write(INFO, "sending iov[] from start=%d "
2150					"i=%d nvecs=%d "
2151					"remaining_data_length=%d\n",
2152					start, i, i-start,
2153					remaining_data_length);
2154				rc = smbd_post_send_data(
2155					info, &iov[start], i-start,
2156					remaining_data_length);
2157				if (rc)
2158					goto done;
2159			} else {
2160				/* iov[start] is too big, break it */
2161				nvecs = (buflen+max_iov_size-1)/max_iov_size;
2162				log_write(INFO, "iov[%d] iov_base=%p buflen=%d"
2163					" break to %d vectors\n",
2164					start, iov[start].iov_base,
2165					buflen, nvecs);
2166				for (j = 0; j < nvecs; j++) {
2167					vec.iov_base =
2168						(char *)iov[start].iov_base +
2169						j*max_iov_size;
2170					vec.iov_len = max_iov_size;
2171					if (j == nvecs-1)
2172						vec.iov_len =
2173							buflen -
2174							max_iov_size*(nvecs-1);
2175					remaining_data_length -= vec.iov_len;
2176					log_write(INFO,
2177						"sending vec j=%d iov_base=%p"
2178						" iov_len=%zu "
2179						"remaining_data_length=%d\n",
2180						j, vec.iov_base, vec.iov_len,
2181						remaining_data_length);
2182					rc = smbd_post_send_data(
2183						info, &vec, 1,
2184						remaining_data_length);
2185					if (rc)
2186						goto done;
2187				}
2188				i++;
2189				if (i == rqst->rq_nvec-1)
2190					break;
2191			}
2192			start = i;
2193			buflen = 0;
2194		} else {
2195			i++;
2196			if (i == rqst->rq_nvec-1) {
2197				/* send out all remaining vecs */
2198				remaining_data_length -= buflen;
2199				log_write(INFO,
2200					"sending iov[] from start=%d i=%d "
2201					"nvecs=%d remaining_data_length=%d\n",
2202					start, i, i-start,
2203					remaining_data_length);
2204				rc = smbd_post_send_data(info, &iov[start],
2205					i-start, remaining_data_length);
2206				if (rc)
2207					goto done;
2208				break;
2209			}
2210		}
2211		log_write(INFO, "looping i=%d buflen=%d\n", i, buflen);
2212	}
2213
2214	/* now sending pages if there are any */
2215	for (i = 0; i < rqst->rq_npages; i++) {
2216		buflen = (i == rqst->rq_npages-1) ?
2217			rqst->rq_tailsz : rqst->rq_pagesz;
2218		nvecs = (buflen + max_iov_size - 1) / max_iov_size;
2219		log_write(INFO, "sending pages buflen=%d nvecs=%d\n",
2220			buflen, nvecs);
2221		for (j = 0; j < nvecs; j++) {
2222			size = max_iov_size;
2223			if (j == nvecs-1)
2224				size = buflen - j*max_iov_size;
2225			remaining_data_length -= size;
2226			log_write(INFO, "sending pages i=%d offset=%d size=%d"
2227				" remaining_data_length=%d\n",
2228				i, j*max_iov_size, size, remaining_data_length);
2229			rc = smbd_post_send_page(
2230				info, rqst->rq_pages[i], j*max_iov_size,
2231				size, remaining_data_length);
2232			if (rc)
2233				goto done;
2234		}
2235	}
2236
2237done:
2238	/*
2239	 * As an optimization, we don't wait for individual I/O to finish
2240	 * before sending the next one.
2241	 * Send them all and wait for pending send count to get to 0
2242	 * that means all the I/Os have been out and we are good to return
2243	 */
2244
2245	wait_event(info->wait_send_payload_pending,
2246		atomic_read(&info->send_payload_pending) == 0);
2247
2248	info->smbd_send_pending--;
2249	wake_up(&info->wait_smbd_send_pending);
2250
2251	return rc;
2252}
2253
2254static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
2255{
2256	struct smbd_mr *mr;
2257	struct ib_cqe *cqe;
2258
2259	if (wc->status) {
2260		log_rdma_mr(ERR, "status=%d\n", wc->status);
2261		cqe = wc->wr_cqe;
2262		mr = container_of(cqe, struct smbd_mr, cqe);
2263		smbd_disconnect_rdma_connection(mr->conn);
2264	}
2265}
2266
2267/*
2268 * The work queue function that recovers MRs
2269 * We need to call ib_dereg_mr() and ib_alloc_mr() before this MR can be used
2270 * again. Both calls are slow, so finish them in a workqueue. This will not
2271 * block I/O path.
2272 * There is one workqueue that recovers MRs, there is no need to lock as the
2273 * I/O requests calling smbd_register_mr will never update the links in the
2274 * mr_list.
2275 */
2276static void smbd_mr_recovery_work(struct work_struct *work)
2277{
2278	struct smbd_connection *info =
2279		container_of(work, struct smbd_connection, mr_recovery_work);
2280	struct smbd_mr *smbdirect_mr;
2281	int rc;
2282
2283	list_for_each_entry(smbdirect_mr, &info->mr_list, list) {
2284		if (smbdirect_mr->state == MR_INVALIDATED ||
2285			smbdirect_mr->state == MR_ERROR) {
2286
2287			if (smbdirect_mr->state == MR_INVALIDATED) {
2288				ib_dma_unmap_sg(
2289					info->id->device, smbdirect_mr->sgl,
2290					smbdirect_mr->sgl_count,
2291					smbdirect_mr->dir);
2292				smbdirect_mr->state = MR_READY;
2293			} else if (smbdirect_mr->state == MR_ERROR) {
2294
2295				/* recover this MR entry */
2296				rc = ib_dereg_mr(smbdirect_mr->mr);
2297				if (rc) {
2298					log_rdma_mr(ERR,
2299						"ib_dereg_mr failed rc=%x\n",
2300						rc);
2301					smbd_disconnect_rdma_connection(info);
2302				}
2303
2304				smbdirect_mr->mr = ib_alloc_mr(
2305					info->pd, info->mr_type,
2306					info->max_frmr_depth);
2307				if (IS_ERR(smbdirect_mr->mr)) {
2308					log_rdma_mr(ERR,
2309						"ib_alloc_mr failed mr_type=%x "
2310						"max_frmr_depth=%x\n",
2311						info->mr_type,
2312						info->max_frmr_depth);
2313					smbd_disconnect_rdma_connection(info);
2314				}
2315
2316				smbdirect_mr->state = MR_READY;
2317			}
2318			/* smbdirect_mr->state is updated by this function
2319			 * and is read and updated by I/O issuing CPUs trying
2320			 * to get a MR, the call to atomic_inc_return
2321			 * implicates a memory barrier and guarantees this
2322			 * value is updated before waking up any calls to
2323			 * get_mr() from the I/O issuing CPUs
2324			 */
2325			if (atomic_inc_return(&info->mr_ready_count) == 1)
2326				wake_up_interruptible(&info->wait_mr);
2327		}
2328	}
2329}
2330
2331static void destroy_mr_list(struct smbd_connection *info)
2332{
2333	struct smbd_mr *mr, *tmp;
2334
2335	cancel_work_sync(&info->mr_recovery_work);
2336	list_for_each_entry_safe(mr, tmp, &info->mr_list, list) {
2337		if (mr->state == MR_INVALIDATED)
2338			ib_dma_unmap_sg(info->id->device, mr->sgl,
2339				mr->sgl_count, mr->dir);
2340		ib_dereg_mr(mr->mr);
2341		kfree(mr->sgl);
2342		kfree(mr);
2343	}
2344}
2345
2346/*
2347 * Allocate MRs used for RDMA read/write
2348 * The number of MRs will not exceed hardware capability in responder_resources
2349 * All MRs are kept in mr_list. The MR can be recovered after it's used
2350 * Recovery is done in smbd_mr_recovery_work. The content of list entry changes
2351 * as MRs are used and recovered for I/O, but the list links will not change
2352 */
2353static int allocate_mr_list(struct smbd_connection *info)
2354{
2355	int i;
2356	struct smbd_mr *smbdirect_mr, *tmp;
2357
2358	INIT_LIST_HEAD(&info->mr_list);
2359	init_waitqueue_head(&info->wait_mr);
2360	spin_lock_init(&info->mr_list_lock);
2361	atomic_set(&info->mr_ready_count, 0);
2362	atomic_set(&info->mr_used_count, 0);
2363	init_waitqueue_head(&info->wait_for_mr_cleanup);
2364	/* Allocate more MRs (2x) than hardware responder_resources */
2365	for (i = 0; i < info->responder_resources * 2; i++) {
2366		smbdirect_mr = kzalloc(sizeof(*smbdirect_mr), GFP_KERNEL);
2367		if (!smbdirect_mr)
2368			goto out;
2369		smbdirect_mr->mr = ib_alloc_mr(info->pd, info->mr_type,
2370					info->max_frmr_depth);
2371		if (IS_ERR(smbdirect_mr->mr)) {
2372			log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x "
2373				"max_frmr_depth=%x\n",
2374				info->mr_type, info->max_frmr_depth);
2375			goto out;
2376		}
2377		smbdirect_mr->sgl = kcalloc(
2378					info->max_frmr_depth,
2379					sizeof(struct scatterlist),
2380					GFP_KERNEL);
2381		if (!smbdirect_mr->sgl) {
2382			log_rdma_mr(ERR, "failed to allocate sgl\n");
2383			ib_dereg_mr(smbdirect_mr->mr);
2384			goto out;
2385		}
2386		smbdirect_mr->state = MR_READY;
2387		smbdirect_mr->conn = info;
2388
2389		list_add_tail(&smbdirect_mr->list, &info->mr_list);
2390		atomic_inc(&info->mr_ready_count);
2391	}
2392	INIT_WORK(&info->mr_recovery_work, smbd_mr_recovery_work);
2393	return 0;
2394
2395out:
2396	kfree(smbdirect_mr);
2397
2398	list_for_each_entry_safe(smbdirect_mr, tmp, &info->mr_list, list) {
2399		ib_dereg_mr(smbdirect_mr->mr);
2400		kfree(smbdirect_mr->sgl);
2401		kfree(smbdirect_mr);
2402	}
2403	return -ENOMEM;
2404}
2405
2406/*
2407 * Get a MR from mr_list. This function waits until there is at least one
2408 * MR available in the list. It may access the list while the
2409 * smbd_mr_recovery_work is recovering the MR list. This doesn't need a lock
2410 * as they never modify the same places. However, there may be several CPUs
2411 * issueing I/O trying to get MR at the same time, mr_list_lock is used to
2412 * protect this situation.
2413 */
2414static struct smbd_mr *get_mr(struct smbd_connection *info)
2415{
2416	struct smbd_mr *ret;
2417	int rc;
2418again:
2419	rc = wait_event_interruptible(info->wait_mr,
2420		atomic_read(&info->mr_ready_count) ||
2421		info->transport_status != SMBD_CONNECTED);
2422	if (rc) {
2423		log_rdma_mr(ERR, "wait_event_interruptible rc=%x\n", rc);
2424		return NULL;
2425	}
2426
2427	if (info->transport_status != SMBD_CONNECTED) {
2428		log_rdma_mr(ERR, "info->transport_status=%x\n",
2429			info->transport_status);
2430		return NULL;
2431	}
2432
2433	spin_lock(&info->mr_list_lock);
2434	list_for_each_entry(ret, &info->mr_list, list) {
2435		if (ret->state == MR_READY) {
2436			ret->state = MR_REGISTERED;
2437			spin_unlock(&info->mr_list_lock);
2438			atomic_dec(&info->mr_ready_count);
2439			atomic_inc(&info->mr_used_count);
2440			return ret;
2441		}
2442	}
2443
2444	spin_unlock(&info->mr_list_lock);
2445	/*
2446	 * It is possible that we could fail to get MR because other processes may
2447	 * try to acquire a MR at the same time. If this is the case, retry it.
2448	 */
2449	goto again;
2450}
2451
2452/*
2453 * Register memory for RDMA read/write
2454 * pages[]: the list of pages to register memory with
2455 * num_pages: the number of pages to register
2456 * tailsz: if non-zero, the bytes to register in the last page
2457 * writing: true if this is a RDMA write (SMB read), false for RDMA read
2458 * need_invalidate: true if this MR needs to be locally invalidated after I/O
2459 * return value: the MR registered, NULL if failed.
2460 */
2461struct smbd_mr *smbd_register_mr(
2462	struct smbd_connection *info, struct page *pages[], int num_pages,
2463	int tailsz, bool writing, bool need_invalidate)
2464{
2465	struct smbd_mr *smbdirect_mr;
2466	int rc, i;
2467	enum dma_data_direction dir;
2468	struct ib_reg_wr *reg_wr;
2469	struct ib_send_wr *bad_wr;
2470
2471	if (num_pages > info->max_frmr_depth) {
2472		log_rdma_mr(ERR, "num_pages=%d max_frmr_depth=%d\n",
2473			num_pages, info->max_frmr_depth);
2474		return NULL;
2475	}
2476
2477	smbdirect_mr = get_mr(info);
2478	if (!smbdirect_mr) {
2479		log_rdma_mr(ERR, "get_mr returning NULL\n");
2480		return NULL;
2481	}
2482	smbdirect_mr->need_invalidate = need_invalidate;
2483	smbdirect_mr->sgl_count = num_pages;
2484	sg_init_table(smbdirect_mr->sgl, num_pages);
2485
2486	for (i = 0; i < num_pages - 1; i++)
2487		sg_set_page(&smbdirect_mr->sgl[i], pages[i], PAGE_SIZE, 0);
2488
2489	sg_set_page(&smbdirect_mr->sgl[i], pages[i],
2490		tailsz ? tailsz : PAGE_SIZE, 0);
2491
2492	dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
2493	smbdirect_mr->dir = dir;
2494	rc = ib_dma_map_sg(info->id->device, smbdirect_mr->sgl, num_pages, dir);
2495	if (!rc) {
2496		log_rdma_mr(INFO, "ib_dma_map_sg num_pages=%x dir=%x rc=%x\n",
2497			num_pages, dir, rc);
2498		goto dma_map_error;
2499	}
2500
2501	rc = ib_map_mr_sg(smbdirect_mr->mr, smbdirect_mr->sgl, num_pages,
2502		NULL, PAGE_SIZE);
2503	if (rc != num_pages) {
2504		log_rdma_mr(INFO,
2505			"ib_map_mr_sg failed rc = %x num_pages = %x\n",
2506			rc, num_pages);
2507		goto map_mr_error;
2508	}
2509
2510	ib_update_fast_reg_key(smbdirect_mr->mr,
2511		ib_inc_rkey(smbdirect_mr->mr->rkey));
2512	reg_wr = &smbdirect_mr->wr;
2513	reg_wr->wr.opcode = IB_WR_REG_MR;
2514	smbdirect_mr->cqe.done = register_mr_done;
2515	reg_wr->wr.wr_cqe = &smbdirect_mr->cqe;
2516	reg_wr->wr.num_sge = 0;
2517	reg_wr->wr.send_flags = IB_SEND_SIGNALED;
2518	reg_wr->mr = smbdirect_mr->mr;
2519	reg_wr->key = smbdirect_mr->mr->rkey;
2520	reg_wr->access = writing ?
2521			IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
2522			IB_ACCESS_REMOTE_READ;
2523
2524	/*
2525	 * There is no need for waiting for complemtion on ib_post_send
2526	 * on IB_WR_REG_MR. Hardware enforces a barrier and order of execution
2527	 * on the next ib_post_send when we actaully send I/O to remote peer
2528	 */
2529	rc = ib_post_send(info->id->qp, &reg_wr->wr, &bad_wr);
2530	if (!rc)
2531		return smbdirect_mr;
2532
2533	log_rdma_mr(ERR, "ib_post_send failed rc=%x reg_wr->key=%x\n",
2534		rc, reg_wr->key);
2535
2536	/* If all failed, attempt to recover this MR by setting it MR_ERROR*/
2537map_mr_error:
2538	ib_dma_unmap_sg(info->id->device, smbdirect_mr->sgl,
2539		smbdirect_mr->sgl_count, smbdirect_mr->dir);
2540
2541dma_map_error:
2542	smbdirect_mr->state = MR_ERROR;
2543	if (atomic_dec_and_test(&info->mr_used_count))
2544		wake_up(&info->wait_for_mr_cleanup);
2545
2546	smbd_disconnect_rdma_connection(info);
2547
2548	return NULL;
2549}
2550
2551static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
2552{
2553	struct smbd_mr *smbdirect_mr;
2554	struct ib_cqe *cqe;
2555
2556	cqe = wc->wr_cqe;
2557	smbdirect_mr = container_of(cqe, struct smbd_mr, cqe);
2558	smbdirect_mr->state = MR_INVALIDATED;
2559	if (wc->status != IB_WC_SUCCESS) {
2560		log_rdma_mr(ERR, "invalidate failed status=%x\n", wc->status);
2561		smbdirect_mr->state = MR_ERROR;
2562	}
2563	complete(&smbdirect_mr->invalidate_done);
2564}
2565
2566/*
2567 * Deregister a MR after I/O is done
2568 * This function may wait if remote invalidation is not used
2569 * and we have to locally invalidate the buffer to prevent data is being
2570 * modified by remote peer after upper layer consumes it
2571 */
2572int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
2573{
2574	struct ib_send_wr *wr, *bad_wr;
2575	struct smbd_connection *info = smbdirect_mr->conn;
2576	int rc = 0;
2577
2578	if (smbdirect_mr->need_invalidate) {
2579		/* Need to finish local invalidation before returning */
2580		wr = &smbdirect_mr->inv_wr;
2581		wr->opcode = IB_WR_LOCAL_INV;
2582		smbdirect_mr->cqe.done = local_inv_done;
2583		wr->wr_cqe = &smbdirect_mr->cqe;
2584		wr->num_sge = 0;
2585		wr->ex.invalidate_rkey = smbdirect_mr->mr->rkey;
2586		wr->send_flags = IB_SEND_SIGNALED;
2587
2588		init_completion(&smbdirect_mr->invalidate_done);
2589		rc = ib_post_send(info->id->qp, wr, &bad_wr);
2590		if (rc) {
2591			log_rdma_mr(ERR, "ib_post_send failed rc=%x\n", rc);
2592			smbd_disconnect_rdma_connection(info);
2593			goto done;
2594		}
2595		wait_for_completion(&smbdirect_mr->invalidate_done);
2596		smbdirect_mr->need_invalidate = false;
2597	} else
2598		/*
2599		 * For remote invalidation, just set it to MR_INVALIDATED
2600		 * and defer to mr_recovery_work to recover the MR for next use
2601		 */
2602		smbdirect_mr->state = MR_INVALIDATED;
2603
2604	/*
2605	 * Schedule the work to do MR recovery for future I/Os
2606	 * MR recovery is slow and we don't want it to block the current I/O
2607	 */
2608	queue_work(info->workqueue, &info->mr_recovery_work);
2609
2610done:
2611	if (atomic_dec_and_test(&info->mr_used_count))
2612		wake_up(&info->wait_for_mr_cleanup);
2613
2614	return rc;
2615}