Linux Audio

Check our new training course

Loading...
v5.14.15
   1// SPDX-License-Identifier: GPL-2.0
   2#include <linux/ceph/ceph_debug.h>
   3
   4#include <linux/bvec.h>
   5#include <linux/crc32c.h>
   6#include <linux/net.h>
   7#include <linux/socket.h>
   8#include <net/sock.h>
   9
  10#include <linux/ceph/ceph_features.h>
  11#include <linux/ceph/decode.h>
  12#include <linux/ceph/libceph.h>
  13#include <linux/ceph/messenger.h>
  14
  15/* static tag bytes (protocol control messages) */
  16static char tag_msg = CEPH_MSGR_TAG_MSG;
  17static char tag_ack = CEPH_MSGR_TAG_ACK;
  18static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
  19static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
  20
  21/*
  22 * If @buf is NULL, discard up to @len bytes.
  23 */
  24static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
  25{
  26	struct kvec iov = {buf, len};
  27	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
  28	int r;
  29
  30	if (!buf)
  31		msg.msg_flags |= MSG_TRUNC;
  32
  33	iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, len);
  34	r = sock_recvmsg(sock, &msg, msg.msg_flags);
  35	if (r == -EAGAIN)
  36		r = 0;
  37	return r;
  38}
  39
  40static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
  41		     int page_offset, size_t length)
  42{
  43	struct bio_vec bvec = {
  44		.bv_page = page,
  45		.bv_offset = page_offset,
  46		.bv_len = length
  47	};
  48	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
  49	int r;
  50
  51	BUG_ON(page_offset + length > PAGE_SIZE);
  52	iov_iter_bvec(&msg.msg_iter, READ, &bvec, 1, length);
 
  53	r = sock_recvmsg(sock, &msg, msg.msg_flags);
  54	if (r == -EAGAIN)
  55		r = 0;
  56	return r;
  57}
  58
  59/*
  60 * write something.  @more is true if caller will be sending more data
  61 * shortly.
  62 */
  63static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
  64			    size_t kvlen, size_t len, bool more)
  65{
  66	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
  67	int r;
  68
  69	if (more)
  70		msg.msg_flags |= MSG_MORE;
  71	else
  72		msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
  73
  74	r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
  75	if (r == -EAGAIN)
  76		r = 0;
  77	return r;
  78}
  79
  80/*
  81 * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST
  82 */
  83static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
  84			     int offset, size_t size, int more)
  85{
  86	ssize_t (*sendpage)(struct socket *sock, struct page *page,
  87			    int offset, size_t size, int flags);
  88	int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
 
  89	int ret;
  90
  91	/*
  92	 * sendpage cannot properly handle pages with page_count == 0,
  93	 * we need to fall back to sendmsg if that's the case.
  94	 *
  95	 * Same goes for slab pages: skb_can_coalesce() allows
  96	 * coalescing neighboring slab objects into a single frag which
  97	 * triggers one of hardened usercopy checks.
  98	 */
  99	if (sendpage_ok(page))
 100		sendpage = sock->ops->sendpage;
 101	else
 102		sendpage = sock_no_sendpage;
 103
 104	ret = sendpage(sock, page, offset, size, flags);
 
 
 
 105	if (ret == -EAGAIN)
 106		ret = 0;
 107
 108	return ret;
 109}
 110
 111static void con_out_kvec_reset(struct ceph_connection *con)
 112{
 113	BUG_ON(con->v1.out_skip);
 114
 115	con->v1.out_kvec_left = 0;
 116	con->v1.out_kvec_bytes = 0;
 117	con->v1.out_kvec_cur = &con->v1.out_kvec[0];
 118}
 119
 120static void con_out_kvec_add(struct ceph_connection *con,
 121				size_t size, void *data)
 122{
 123	int index = con->v1.out_kvec_left;
 124
 125	BUG_ON(con->v1.out_skip);
 126	BUG_ON(index >= ARRAY_SIZE(con->v1.out_kvec));
 127
 128	con->v1.out_kvec[index].iov_len = size;
 129	con->v1.out_kvec[index].iov_base = data;
 130	con->v1.out_kvec_left++;
 131	con->v1.out_kvec_bytes += size;
 132}
 133
 134/*
 135 * Chop off a kvec from the end.  Return residual number of bytes for
 136 * that kvec, i.e. how many bytes would have been written if the kvec
 137 * hadn't been nuked.
 138 */
 139static int con_out_kvec_skip(struct ceph_connection *con)
 140{
 141	int skip = 0;
 142
 143	if (con->v1.out_kvec_bytes > 0) {
 144		skip = con->v1.out_kvec_cur[con->v1.out_kvec_left - 1].iov_len;
 145		BUG_ON(con->v1.out_kvec_bytes < skip);
 146		BUG_ON(!con->v1.out_kvec_left);
 147		con->v1.out_kvec_bytes -= skip;
 148		con->v1.out_kvec_left--;
 149	}
 150
 151	return skip;
 152}
 153
 154static size_t sizeof_footer(struct ceph_connection *con)
 155{
 156	return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
 157	    sizeof(struct ceph_msg_footer) :
 158	    sizeof(struct ceph_msg_footer_old);
 159}
 160
 161static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
 162{
 163	/* Initialize data cursor */
 
 164
 165	ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
 166}
 167
 168/*
 169 * Prepare footer for currently outgoing message, and finish things
 170 * off.  Assumes out_kvec* are already valid.. we just add on to the end.
 171 */
 172static void prepare_write_message_footer(struct ceph_connection *con)
 173{
 174	struct ceph_msg *m = con->out_msg;
 175
 176	m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
 177
 178	dout("prepare_write_message_footer %p\n", con);
 179	con_out_kvec_add(con, sizeof_footer(con), &m->footer);
 180	if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
 181		if (con->ops->sign_message)
 182			con->ops->sign_message(m);
 183		else
 184			m->footer.sig = 0;
 185	} else {
 186		m->old_footer.flags = m->footer.flags;
 187	}
 188	con->v1.out_more = m->more_to_follow;
 189	con->v1.out_msg_done = true;
 190}
 191
 192/*
 193 * Prepare headers for the next outgoing message.
 194 */
 195static void prepare_write_message(struct ceph_connection *con)
 196{
 197	struct ceph_msg *m;
 198	u32 crc;
 199
 200	con_out_kvec_reset(con);
 201	con->v1.out_msg_done = false;
 202
 203	/* Sneak an ack in there first?  If we can get it into the same
 204	 * TCP packet that's a good thing. */
 205	if (con->in_seq > con->in_seq_acked) {
 206		con->in_seq_acked = con->in_seq;
 207		con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
 208		con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
 209		con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
 210			&con->v1.out_temp_ack);
 211	}
 212
 213	ceph_con_get_out_msg(con);
 214	m = con->out_msg;
 215
 216	dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
 217	     m, con->out_seq, le16_to_cpu(m->hdr.type),
 218	     le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
 219	     m->data_length);
 220	WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
 221	WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
 222
 223	/* tag + hdr + front + middle */
 224	con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
 225	con_out_kvec_add(con, sizeof(con->v1.out_hdr), &con->v1.out_hdr);
 226	con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
 227
 228	if (m->middle)
 229		con_out_kvec_add(con, m->middle->vec.iov_len,
 230			m->middle->vec.iov_base);
 231
 232	/* fill in hdr crc and finalize hdr */
 233	crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
 234	con->out_msg->hdr.crc = cpu_to_le32(crc);
 235	memcpy(&con->v1.out_hdr, &con->out_msg->hdr, sizeof(con->v1.out_hdr));
 236
 237	/* fill in front and middle crc, footer */
 238	crc = crc32c(0, m->front.iov_base, m->front.iov_len);
 239	con->out_msg->footer.front_crc = cpu_to_le32(crc);
 240	if (m->middle) {
 241		crc = crc32c(0, m->middle->vec.iov_base,
 242				m->middle->vec.iov_len);
 243		con->out_msg->footer.middle_crc = cpu_to_le32(crc);
 244	} else
 245		con->out_msg->footer.middle_crc = 0;
 246	dout("%s front_crc %u middle_crc %u\n", __func__,
 247	     le32_to_cpu(con->out_msg->footer.front_crc),
 248	     le32_to_cpu(con->out_msg->footer.middle_crc));
 249	con->out_msg->footer.flags = 0;
 250
 251	/* is there a data payload? */
 252	con->out_msg->footer.data_crc = 0;
 253	if (m->data_length) {
 254		prepare_message_data(con->out_msg, m->data_length);
 255		con->v1.out_more = 1;  /* data + footer will follow */
 256	} else {
 257		/* no, queue up footer too and be done */
 258		prepare_write_message_footer(con);
 259	}
 260
 261	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 262}
 263
 264/*
 265 * Prepare an ack.
 266 */
 267static void prepare_write_ack(struct ceph_connection *con)
 268{
 269	dout("prepare_write_ack %p %llu -> %llu\n", con,
 270	     con->in_seq_acked, con->in_seq);
 271	con->in_seq_acked = con->in_seq;
 272
 273	con_out_kvec_reset(con);
 274
 275	con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
 276
 277	con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
 278	con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
 279			 &con->v1.out_temp_ack);
 280
 281	con->v1.out_more = 1;  /* more will follow.. eventually.. */
 282	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 283}
 284
 285/*
 286 * Prepare to share the seq during handshake
 287 */
 288static void prepare_write_seq(struct ceph_connection *con)
 289{
 290	dout("prepare_write_seq %p %llu -> %llu\n", con,
 291	     con->in_seq_acked, con->in_seq);
 292	con->in_seq_acked = con->in_seq;
 293
 294	con_out_kvec_reset(con);
 295
 296	con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
 297	con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
 298			 &con->v1.out_temp_ack);
 299
 300	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 301}
 302
 303/*
 304 * Prepare to write keepalive byte.
 305 */
 306static void prepare_write_keepalive(struct ceph_connection *con)
 307{
 308	dout("prepare_write_keepalive %p\n", con);
 309	con_out_kvec_reset(con);
 310	if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
 311		struct timespec64 now;
 312
 313		ktime_get_real_ts64(&now);
 314		con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
 315		ceph_encode_timespec64(&con->v1.out_temp_keepalive2, &now);
 316		con_out_kvec_add(con, sizeof(con->v1.out_temp_keepalive2),
 317				 &con->v1.out_temp_keepalive2);
 318	} else {
 319		con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
 320	}
 321	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 322}
 323
 324/*
 325 * Connection negotiation.
 326 */
 327
 328static int get_connect_authorizer(struct ceph_connection *con)
 329{
 330	struct ceph_auth_handshake *auth;
 331	int auth_proto;
 332
 333	if (!con->ops->get_authorizer) {
 334		con->v1.auth = NULL;
 335		con->v1.out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
 336		con->v1.out_connect.authorizer_len = 0;
 337		return 0;
 338	}
 339
 340	auth = con->ops->get_authorizer(con, &auth_proto, con->v1.auth_retry);
 341	if (IS_ERR(auth))
 342		return PTR_ERR(auth);
 343
 344	con->v1.auth = auth;
 345	con->v1.out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
 346	con->v1.out_connect.authorizer_len =
 347		cpu_to_le32(auth->authorizer_buf_len);
 348	return 0;
 349}
 350
 351/*
 352 * We connected to a peer and are saying hello.
 353 */
 354static void prepare_write_banner(struct ceph_connection *con)
 355{
 356	con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
 357	con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
 358					&con->msgr->my_enc_addr);
 359
 360	con->v1.out_more = 0;
 361	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 362}
 363
 364static void __prepare_write_connect(struct ceph_connection *con)
 365{
 366	con_out_kvec_add(con, sizeof(con->v1.out_connect),
 367			 &con->v1.out_connect);
 368	if (con->v1.auth)
 369		con_out_kvec_add(con, con->v1.auth->authorizer_buf_len,
 370				 con->v1.auth->authorizer_buf);
 371
 372	con->v1.out_more = 0;
 373	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 374}
 375
 376static int prepare_write_connect(struct ceph_connection *con)
 377{
 378	unsigned int global_seq = ceph_get_global_seq(con->msgr, 0);
 379	int proto;
 380	int ret;
 381
 382	switch (con->peer_name.type) {
 383	case CEPH_ENTITY_TYPE_MON:
 384		proto = CEPH_MONC_PROTOCOL;
 385		break;
 386	case CEPH_ENTITY_TYPE_OSD:
 387		proto = CEPH_OSDC_PROTOCOL;
 388		break;
 389	case CEPH_ENTITY_TYPE_MDS:
 390		proto = CEPH_MDSC_PROTOCOL;
 391		break;
 392	default:
 393		BUG();
 394	}
 395
 396	dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
 397	     con->v1.connect_seq, global_seq, proto);
 398
 399	con->v1.out_connect.features =
 400		cpu_to_le64(from_msgr(con->msgr)->supported_features);
 401	con->v1.out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
 402	con->v1.out_connect.connect_seq = cpu_to_le32(con->v1.connect_seq);
 403	con->v1.out_connect.global_seq = cpu_to_le32(global_seq);
 404	con->v1.out_connect.protocol_version = cpu_to_le32(proto);
 405	con->v1.out_connect.flags = 0;
 406
 407	ret = get_connect_authorizer(con);
 408	if (ret)
 409		return ret;
 410
 411	__prepare_write_connect(con);
 412	return 0;
 413}
 414
 415/*
 416 * write as much of pending kvecs to the socket as we can.
 417 *  1 -> done
 418 *  0 -> socket full, but more to do
 419 * <0 -> error
 420 */
 421static int write_partial_kvec(struct ceph_connection *con)
 422{
 423	int ret;
 424
 425	dout("write_partial_kvec %p %d left\n", con, con->v1.out_kvec_bytes);
 426	while (con->v1.out_kvec_bytes > 0) {
 427		ret = ceph_tcp_sendmsg(con->sock, con->v1.out_kvec_cur,
 428				       con->v1.out_kvec_left,
 429				       con->v1.out_kvec_bytes,
 430				       con->v1.out_more);
 431		if (ret <= 0)
 432			goto out;
 433		con->v1.out_kvec_bytes -= ret;
 434		if (!con->v1.out_kvec_bytes)
 435			break;            /* done */
 436
 437		/* account for full iov entries consumed */
 438		while (ret >= con->v1.out_kvec_cur->iov_len) {
 439			BUG_ON(!con->v1.out_kvec_left);
 440			ret -= con->v1.out_kvec_cur->iov_len;
 441			con->v1.out_kvec_cur++;
 442			con->v1.out_kvec_left--;
 443		}
 444		/* and for a partially-consumed entry */
 445		if (ret) {
 446			con->v1.out_kvec_cur->iov_len -= ret;
 447			con->v1.out_kvec_cur->iov_base += ret;
 448		}
 449	}
 450	con->v1.out_kvec_left = 0;
 451	ret = 1;
 452out:
 453	dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
 454	     con->v1.out_kvec_bytes, con->v1.out_kvec_left, ret);
 455	return ret;  /* done! */
 456}
 457
 458/*
 459 * Write as much message data payload as we can.  If we finish, queue
 460 * up the footer.
 461 *  1 -> done, footer is now queued in out_kvec[].
 462 *  0 -> socket full, but more to do
 463 * <0 -> error
 464 */
 465static int write_partial_message_data(struct ceph_connection *con)
 466{
 467	struct ceph_msg *msg = con->out_msg;
 468	struct ceph_msg_data_cursor *cursor = &msg->cursor;
 469	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
 470	int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
 471	u32 crc;
 472
 473	dout("%s %p msg %p\n", __func__, con, msg);
 474
 475	if (!msg->num_data_items)
 476		return -EINVAL;
 477
 478	/*
 479	 * Iterate through each page that contains data to be
 480	 * written, and send as much as possible for each.
 481	 *
 482	 * If we are calculating the data crc (the default), we will
 483	 * need to map the page.  If we have no pages, they have
 484	 * been revoked, so use the zero page.
 485	 */
 486	crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
 487	while (cursor->total_resid) {
 488		struct page *page;
 489		size_t page_offset;
 490		size_t length;
 491		int ret;
 492
 493		if (!cursor->resid) {
 494			ceph_msg_data_advance(cursor, 0);
 495			continue;
 496		}
 497
 498		page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
 499		if (length == cursor->total_resid)
 500			more = MSG_MORE;
 501		ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
 502					more);
 503		if (ret <= 0) {
 504			if (do_datacrc)
 505				msg->footer.data_crc = cpu_to_le32(crc);
 506
 507			return ret;
 508		}
 509		if (do_datacrc && cursor->need_crc)
 510			crc = ceph_crc32c_page(crc, page, page_offset, length);
 511		ceph_msg_data_advance(cursor, (size_t)ret);
 512	}
 513
 514	dout("%s %p msg %p done\n", __func__, con, msg);
 515
 516	/* prepare and queue up footer, too */
 517	if (do_datacrc)
 518		msg->footer.data_crc = cpu_to_le32(crc);
 519	else
 520		msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
 521	con_out_kvec_reset(con);
 522	prepare_write_message_footer(con);
 523
 524	return 1;	/* must return > 0 to indicate success */
 525}
 526
 527/*
 528 * write some zeros
 529 */
 530static int write_partial_skip(struct ceph_connection *con)
 531{
 532	int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
 533	int ret;
 534
 535	dout("%s %p %d left\n", __func__, con, con->v1.out_skip);
 536	while (con->v1.out_skip > 0) {
 537		size_t size = min(con->v1.out_skip, (int)PAGE_SIZE);
 538
 539		if (size == con->v1.out_skip)
 540			more = MSG_MORE;
 541		ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
 542					more);
 543		if (ret <= 0)
 544			goto out;
 545		con->v1.out_skip -= ret;
 546	}
 547	ret = 1;
 548out:
 549	return ret;
 550}
 551
 552/*
 553 * Prepare to read connection handshake, or an ack.
 554 */
 555static void prepare_read_banner(struct ceph_connection *con)
 556{
 557	dout("prepare_read_banner %p\n", con);
 558	con->v1.in_base_pos = 0;
 559}
 560
 561static void prepare_read_connect(struct ceph_connection *con)
 562{
 563	dout("prepare_read_connect %p\n", con);
 564	con->v1.in_base_pos = 0;
 565}
 566
 567static void prepare_read_ack(struct ceph_connection *con)
 568{
 569	dout("prepare_read_ack %p\n", con);
 570	con->v1.in_base_pos = 0;
 571}
 572
 573static void prepare_read_seq(struct ceph_connection *con)
 574{
 575	dout("prepare_read_seq %p\n", con);
 576	con->v1.in_base_pos = 0;
 577	con->v1.in_tag = CEPH_MSGR_TAG_SEQ;
 578}
 579
 580static void prepare_read_tag(struct ceph_connection *con)
 581{
 582	dout("prepare_read_tag %p\n", con);
 583	con->v1.in_base_pos = 0;
 584	con->v1.in_tag = CEPH_MSGR_TAG_READY;
 585}
 586
 587static void prepare_read_keepalive_ack(struct ceph_connection *con)
 588{
 589	dout("prepare_read_keepalive_ack %p\n", con);
 590	con->v1.in_base_pos = 0;
 591}
 592
 593/*
 594 * Prepare to read a message.
 595 */
 596static int prepare_read_message(struct ceph_connection *con)
 597{
 598	dout("prepare_read_message %p\n", con);
 599	BUG_ON(con->in_msg != NULL);
 600	con->v1.in_base_pos = 0;
 601	con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
 602	return 0;
 603}
 604
 605static int read_partial(struct ceph_connection *con,
 606			int end, int size, void *object)
 607{
 608	while (con->v1.in_base_pos < end) {
 609		int left = end - con->v1.in_base_pos;
 610		int have = size - left;
 611		int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
 612		if (ret <= 0)
 613			return ret;
 614		con->v1.in_base_pos += ret;
 615	}
 616	return 1;
 617}
 618
 619/*
 620 * Read all or part of the connect-side handshake on a new connection
 621 */
 622static int read_partial_banner(struct ceph_connection *con)
 623{
 624	int size;
 625	int end;
 626	int ret;
 627
 628	dout("read_partial_banner %p at %d\n", con, con->v1.in_base_pos);
 629
 630	/* peer's banner */
 631	size = strlen(CEPH_BANNER);
 632	end = size;
 633	ret = read_partial(con, end, size, con->v1.in_banner);
 634	if (ret <= 0)
 635		goto out;
 636
 637	size = sizeof(con->v1.actual_peer_addr);
 638	end += size;
 639	ret = read_partial(con, end, size, &con->v1.actual_peer_addr);
 640	if (ret <= 0)
 641		goto out;
 642	ceph_decode_banner_addr(&con->v1.actual_peer_addr);
 643
 644	size = sizeof(con->v1.peer_addr_for_me);
 645	end += size;
 646	ret = read_partial(con, end, size, &con->v1.peer_addr_for_me);
 647	if (ret <= 0)
 648		goto out;
 649	ceph_decode_banner_addr(&con->v1.peer_addr_for_me);
 650
 651out:
 652	return ret;
 653}
 654
 655static int read_partial_connect(struct ceph_connection *con)
 656{
 657	int size;
 658	int end;
 659	int ret;
 660
 661	dout("read_partial_connect %p at %d\n", con, con->v1.in_base_pos);
 662
 663	size = sizeof(con->v1.in_reply);
 664	end = size;
 665	ret = read_partial(con, end, size, &con->v1.in_reply);
 666	if (ret <= 0)
 667		goto out;
 668
 669	if (con->v1.auth) {
 670		size = le32_to_cpu(con->v1.in_reply.authorizer_len);
 671		if (size > con->v1.auth->authorizer_reply_buf_len) {
 672			pr_err("authorizer reply too big: %d > %zu\n", size,
 673			       con->v1.auth->authorizer_reply_buf_len);
 674			ret = -EINVAL;
 675			goto out;
 676		}
 677
 678		end += size;
 679		ret = read_partial(con, end, size,
 680				   con->v1.auth->authorizer_reply_buf);
 681		if (ret <= 0)
 682			goto out;
 683	}
 684
 685	dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
 686	     con, con->v1.in_reply.tag,
 687	     le32_to_cpu(con->v1.in_reply.connect_seq),
 688	     le32_to_cpu(con->v1.in_reply.global_seq));
 689out:
 690	return ret;
 691}
 692
 693/*
 694 * Verify the hello banner looks okay.
 695 */
 696static int verify_hello(struct ceph_connection *con)
 697{
 698	if (memcmp(con->v1.in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
 699		pr_err("connect to %s got bad banner\n",
 700		       ceph_pr_addr(&con->peer_addr));
 701		con->error_msg = "protocol error, bad banner";
 702		return -1;
 703	}
 704	return 0;
 705}
 706
 707static int process_banner(struct ceph_connection *con)
 708{
 709	struct ceph_entity_addr *my_addr = &con->msgr->inst.addr;
 710
 711	dout("process_banner on %p\n", con);
 712
 713	if (verify_hello(con) < 0)
 714		return -1;
 715
 716	/*
 717	 * Make sure the other end is who we wanted.  note that the other
 718	 * end may not yet know their ip address, so if it's 0.0.0.0, give
 719	 * them the benefit of the doubt.
 720	 */
 721	if (memcmp(&con->peer_addr, &con->v1.actual_peer_addr,
 722		   sizeof(con->peer_addr)) != 0 &&
 723	    !(ceph_addr_is_blank(&con->v1.actual_peer_addr) &&
 724	      con->v1.actual_peer_addr.nonce == con->peer_addr.nonce)) {
 725		pr_warn("wrong peer, want %s/%u, got %s/%u\n",
 726			ceph_pr_addr(&con->peer_addr),
 727			le32_to_cpu(con->peer_addr.nonce),
 728			ceph_pr_addr(&con->v1.actual_peer_addr),
 729			le32_to_cpu(con->v1.actual_peer_addr.nonce));
 730		con->error_msg = "wrong peer at address";
 731		return -1;
 732	}
 733
 734	/*
 735	 * did we learn our address?
 736	 */
 737	if (ceph_addr_is_blank(my_addr)) {
 738		memcpy(&my_addr->in_addr,
 739		       &con->v1.peer_addr_for_me.in_addr,
 740		       sizeof(con->v1.peer_addr_for_me.in_addr));
 741		ceph_addr_set_port(my_addr, 0);
 742		ceph_encode_my_addr(con->msgr);
 743		dout("process_banner learned my addr is %s\n",
 744		     ceph_pr_addr(my_addr));
 745	}
 746
 747	return 0;
 748}
 749
 750static int process_connect(struct ceph_connection *con)
 751{
 752	u64 sup_feat = from_msgr(con->msgr)->supported_features;
 753	u64 req_feat = from_msgr(con->msgr)->required_features;
 754	u64 server_feat = le64_to_cpu(con->v1.in_reply.features);
 755	int ret;
 756
 757	dout("process_connect on %p tag %d\n", con, con->v1.in_tag);
 758
 759	if (con->v1.auth) {
 760		int len = le32_to_cpu(con->v1.in_reply.authorizer_len);
 761
 762		/*
 763		 * Any connection that defines ->get_authorizer()
 764		 * should also define ->add_authorizer_challenge() and
 765		 * ->verify_authorizer_reply().
 766		 *
 767		 * See get_connect_authorizer().
 768		 */
 769		if (con->v1.in_reply.tag ==
 770				CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
 771			ret = con->ops->add_authorizer_challenge(
 772				con, con->v1.auth->authorizer_reply_buf, len);
 773			if (ret < 0)
 774				return ret;
 775
 776			con_out_kvec_reset(con);
 777			__prepare_write_connect(con);
 778			prepare_read_connect(con);
 779			return 0;
 780		}
 781
 782		if (len) {
 783			ret = con->ops->verify_authorizer_reply(con);
 784			if (ret < 0) {
 785				con->error_msg = "bad authorize reply";
 786				return ret;
 787			}
 788		}
 789	}
 790
 791	switch (con->v1.in_reply.tag) {
 792	case CEPH_MSGR_TAG_FEATURES:
 793		pr_err("%s%lld %s feature set mismatch,"
 794		       " my %llx < server's %llx, missing %llx\n",
 795		       ENTITY_NAME(con->peer_name),
 796		       ceph_pr_addr(&con->peer_addr),
 797		       sup_feat, server_feat, server_feat & ~sup_feat);
 798		con->error_msg = "missing required protocol features";
 799		return -1;
 800
 801	case CEPH_MSGR_TAG_BADPROTOVER:
 802		pr_err("%s%lld %s protocol version mismatch,"
 803		       " my %d != server's %d\n",
 804		       ENTITY_NAME(con->peer_name),
 805		       ceph_pr_addr(&con->peer_addr),
 806		       le32_to_cpu(con->v1.out_connect.protocol_version),
 807		       le32_to_cpu(con->v1.in_reply.protocol_version));
 808		con->error_msg = "protocol version mismatch";
 809		return -1;
 810
 811	case CEPH_MSGR_TAG_BADAUTHORIZER:
 812		con->v1.auth_retry++;
 813		dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
 814		     con->v1.auth_retry);
 815		if (con->v1.auth_retry == 2) {
 816			con->error_msg = "connect authorization failure";
 817			return -1;
 818		}
 819		con_out_kvec_reset(con);
 820		ret = prepare_write_connect(con);
 821		if (ret < 0)
 822			return ret;
 823		prepare_read_connect(con);
 824		break;
 825
 826	case CEPH_MSGR_TAG_RESETSESSION:
 827		/*
 828		 * If we connected with a large connect_seq but the peer
 829		 * has no record of a session with us (no connection, or
 830		 * connect_seq == 0), they will send RESETSESION to indicate
 831		 * that they must have reset their session, and may have
 832		 * dropped messages.
 833		 */
 834		dout("process_connect got RESET peer seq %u\n",
 835		     le32_to_cpu(con->v1.in_reply.connect_seq));
 836		pr_info("%s%lld %s session reset\n",
 837			ENTITY_NAME(con->peer_name),
 838			ceph_pr_addr(&con->peer_addr));
 839		ceph_con_reset_session(con);
 840		con_out_kvec_reset(con);
 841		ret = prepare_write_connect(con);
 842		if (ret < 0)
 843			return ret;
 844		prepare_read_connect(con);
 845
 846		/* Tell ceph about it. */
 847		mutex_unlock(&con->mutex);
 848		if (con->ops->peer_reset)
 849			con->ops->peer_reset(con);
 850		mutex_lock(&con->mutex);
 851		if (con->state != CEPH_CON_S_V1_CONNECT_MSG)
 852			return -EAGAIN;
 853		break;
 854
 855	case CEPH_MSGR_TAG_RETRY_SESSION:
 856		/*
 857		 * If we sent a smaller connect_seq than the peer has, try
 858		 * again with a larger value.
 859		 */
 860		dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
 861		     le32_to_cpu(con->v1.out_connect.connect_seq),
 862		     le32_to_cpu(con->v1.in_reply.connect_seq));
 863		con->v1.connect_seq = le32_to_cpu(con->v1.in_reply.connect_seq);
 864		con_out_kvec_reset(con);
 865		ret = prepare_write_connect(con);
 866		if (ret < 0)
 867			return ret;
 868		prepare_read_connect(con);
 869		break;
 870
 871	case CEPH_MSGR_TAG_RETRY_GLOBAL:
 872		/*
 873		 * If we sent a smaller global_seq than the peer has, try
 874		 * again with a larger value.
 875		 */
 876		dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
 877		     con->v1.peer_global_seq,
 878		     le32_to_cpu(con->v1.in_reply.global_seq));
 879		ceph_get_global_seq(con->msgr,
 880				    le32_to_cpu(con->v1.in_reply.global_seq));
 881		con_out_kvec_reset(con);
 882		ret = prepare_write_connect(con);
 883		if (ret < 0)
 884			return ret;
 885		prepare_read_connect(con);
 886		break;
 887
 888	case CEPH_MSGR_TAG_SEQ:
 889	case CEPH_MSGR_TAG_READY:
 890		if (req_feat & ~server_feat) {
 891			pr_err("%s%lld %s protocol feature mismatch,"
 892			       " my required %llx > server's %llx, need %llx\n",
 893			       ENTITY_NAME(con->peer_name),
 894			       ceph_pr_addr(&con->peer_addr),
 895			       req_feat, server_feat, req_feat & ~server_feat);
 896			con->error_msg = "missing required protocol features";
 897			return -1;
 898		}
 899
 900		WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG);
 901		con->state = CEPH_CON_S_OPEN;
 902		con->v1.auth_retry = 0;    /* we authenticated; clear flag */
 903		con->v1.peer_global_seq =
 904			le32_to_cpu(con->v1.in_reply.global_seq);
 905		con->v1.connect_seq++;
 906		con->peer_features = server_feat;
 907		dout("process_connect got READY gseq %d cseq %d (%d)\n",
 908		     con->v1.peer_global_seq,
 909		     le32_to_cpu(con->v1.in_reply.connect_seq),
 910		     con->v1.connect_seq);
 911		WARN_ON(con->v1.connect_seq !=
 912			le32_to_cpu(con->v1.in_reply.connect_seq));
 913
 914		if (con->v1.in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
 915			ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX);
 916
 917		con->delay = 0;      /* reset backoff memory */
 918
 919		if (con->v1.in_reply.tag == CEPH_MSGR_TAG_SEQ) {
 920			prepare_write_seq(con);
 921			prepare_read_seq(con);
 922		} else {
 923			prepare_read_tag(con);
 924		}
 925		break;
 926
 927	case CEPH_MSGR_TAG_WAIT:
 928		/*
 929		 * If there is a connection race (we are opening
 930		 * connections to each other), one of us may just have
 931		 * to WAIT.  This shouldn't happen if we are the
 932		 * client.
 933		 */
 934		con->error_msg = "protocol error, got WAIT as client";
 935		return -1;
 936
 937	default:
 938		con->error_msg = "protocol error, garbage tag during connect";
 939		return -1;
 940	}
 941	return 0;
 942}
 943
 944/*
 945 * read (part of) an ack
 946 */
 947static int read_partial_ack(struct ceph_connection *con)
 948{
 949	int size = sizeof(con->v1.in_temp_ack);
 950	int end = size;
 951
 952	return read_partial(con, end, size, &con->v1.in_temp_ack);
 953}
 954
 955/*
 956 * We can finally discard anything that's been acked.
 957 */
 958static void process_ack(struct ceph_connection *con)
 959{
 960	u64 ack = le64_to_cpu(con->v1.in_temp_ack);
 961
 962	if (con->v1.in_tag == CEPH_MSGR_TAG_ACK)
 963		ceph_con_discard_sent(con, ack);
 964	else
 965		ceph_con_discard_requeued(con, ack);
 966
 967	prepare_read_tag(con);
 968}
 969
 970static int read_partial_message_section(struct ceph_connection *con,
 971					struct kvec *section,
 972					unsigned int sec_len, u32 *crc)
 973{
 974	int ret, left;
 975
 976	BUG_ON(!section);
 977
 978	while (section->iov_len < sec_len) {
 979		BUG_ON(section->iov_base == NULL);
 980		left = sec_len - section->iov_len;
 981		ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
 982				       section->iov_len, left);
 983		if (ret <= 0)
 984			return ret;
 985		section->iov_len += ret;
 986	}
 987	if (section->iov_len == sec_len)
 988		*crc = crc32c(0, section->iov_base, section->iov_len);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 989
 
 
 
 
 
 
 
 
 
 
 
 990	return 1;
 991}
 992
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 993static int read_partial_msg_data(struct ceph_connection *con)
 994{
 995	struct ceph_msg *msg = con->in_msg;
 996	struct ceph_msg_data_cursor *cursor = &msg->cursor;
 997	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
 998	struct page *page;
 999	size_t page_offset;
1000	size_t length;
1001	u32 crc = 0;
1002	int ret;
1003
1004	if (!msg->num_data_items)
1005		return -EIO;
1006
1007	if (do_datacrc)
1008		crc = con->in_data_crc;
1009	while (cursor->total_resid) {
1010		if (!cursor->resid) {
1011			ceph_msg_data_advance(cursor, 0);
1012			continue;
1013		}
1014
1015		page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
1016		ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
1017		if (ret <= 0) {
1018			if (do_datacrc)
1019				con->in_data_crc = crc;
1020
1021			return ret;
1022		}
1023
1024		if (do_datacrc)
1025			crc = ceph_crc32c_page(crc, page, page_offset, ret);
1026		ceph_msg_data_advance(cursor, (size_t)ret);
1027	}
1028	if (do_datacrc)
1029		con->in_data_crc = crc;
1030
1031	return 1;	/* must return > 0 to indicate success */
1032}
1033
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1034/*
1035 * read (part of) a message.
1036 */
1037static int read_partial_message(struct ceph_connection *con)
1038{
1039	struct ceph_msg *m = con->in_msg;
1040	int size;
1041	int end;
1042	int ret;
1043	unsigned int front_len, middle_len, data_len;
1044	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1045	bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
1046	u64 seq;
1047	u32 crc;
1048
1049	dout("read_partial_message con %p msg %p\n", con, m);
1050
1051	/* header */
1052	size = sizeof(con->v1.in_hdr);
1053	end = size;
1054	ret = read_partial(con, end, size, &con->v1.in_hdr);
1055	if (ret <= 0)
1056		return ret;
1057
1058	crc = crc32c(0, &con->v1.in_hdr, offsetof(struct ceph_msg_header, crc));
1059	if (cpu_to_le32(crc) != con->v1.in_hdr.crc) {
1060		pr_err("read_partial_message bad hdr crc %u != expected %u\n",
1061		       crc, con->v1.in_hdr.crc);
1062		return -EBADMSG;
1063	}
1064
1065	front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1066	if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1067		return -EIO;
1068	middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1069	if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
1070		return -EIO;
1071	data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1072	if (data_len > CEPH_MSG_MAX_DATA_LEN)
1073		return -EIO;
1074
1075	/* verify seq# */
1076	seq = le64_to_cpu(con->v1.in_hdr.seq);
1077	if ((s64)seq - (s64)con->in_seq < 1) {
1078		pr_info("skipping %s%lld %s seq %lld expected %lld\n",
1079			ENTITY_NAME(con->peer_name),
1080			ceph_pr_addr(&con->peer_addr),
1081			seq, con->in_seq + 1);
1082		con->v1.in_base_pos = -front_len - middle_len - data_len -
1083				      sizeof_footer(con);
1084		con->v1.in_tag = CEPH_MSGR_TAG_READY;
1085		return 1;
1086	} else if ((s64)seq - (s64)con->in_seq > 1) {
1087		pr_err("read_partial_message bad seq %lld expected %lld\n",
1088		       seq, con->in_seq + 1);
1089		con->error_msg = "bad message sequence # for incoming message";
1090		return -EBADE;
1091	}
1092
1093	/* allocate message? */
1094	if (!con->in_msg) {
1095		int skip = 0;
1096
1097		dout("got hdr type %d front %d data %d\n", con->v1.in_hdr.type,
1098		     front_len, data_len);
1099		ret = ceph_con_in_msg_alloc(con, &con->v1.in_hdr, &skip);
1100		if (ret < 0)
1101			return ret;
1102
1103		BUG_ON((!con->in_msg) ^ skip);
1104		if (skip) {
1105			/* skip this message */
1106			dout("alloc_msg said skip message\n");
1107			con->v1.in_base_pos = -front_len - middle_len -
1108					      data_len - sizeof_footer(con);
1109			con->v1.in_tag = CEPH_MSGR_TAG_READY;
1110			con->in_seq++;
1111			return 1;
1112		}
1113
1114		BUG_ON(!con->in_msg);
1115		BUG_ON(con->in_msg->con != con);
1116		m = con->in_msg;
1117		m->front.iov_len = 0;    /* haven't read it yet */
1118		if (m->middle)
1119			m->middle->vec.iov_len = 0;
1120
1121		/* prepare for data payload, if any */
1122
1123		if (data_len)
1124			prepare_message_data(con->in_msg, data_len);
1125	}
1126
1127	/* front */
1128	ret = read_partial_message_section(con, &m->front, front_len,
1129					   &con->in_front_crc);
1130	if (ret <= 0)
1131		return ret;
1132
1133	/* middle */
1134	if (m->middle) {
1135		ret = read_partial_message_section(con, &m->middle->vec,
1136						   middle_len,
1137						   &con->in_middle_crc);
1138		if (ret <= 0)
1139			return ret;
1140	}
1141
1142	/* (page) data */
1143	if (data_len) {
1144		ret = read_partial_msg_data(con);
 
 
 
 
 
 
 
 
1145		if (ret <= 0)
1146			return ret;
1147	}
1148
1149	/* footer */
1150	size = sizeof_footer(con);
1151	end += size;
1152	ret = read_partial(con, end, size, &m->footer);
1153	if (ret <= 0)
1154		return ret;
1155
1156	if (!need_sign) {
1157		m->footer.flags = m->old_footer.flags;
1158		m->footer.sig = 0;
1159	}
1160
1161	dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1162	     m, front_len, m->footer.front_crc, middle_len,
1163	     m->footer.middle_crc, data_len, m->footer.data_crc);
1164
1165	/* crc ok? */
1166	if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
1167		pr_err("read_partial_message %p front crc %u != exp. %u\n",
1168		       m, con->in_front_crc, m->footer.front_crc);
1169		return -EBADMSG;
1170	}
1171	if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
1172		pr_err("read_partial_message %p middle crc %u != exp %u\n",
1173		       m, con->in_middle_crc, m->footer.middle_crc);
1174		return -EBADMSG;
1175	}
1176	if (do_datacrc &&
1177	    (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
1178	    con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
1179		pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
1180		       con->in_data_crc, le32_to_cpu(m->footer.data_crc));
1181		return -EBADMSG;
1182	}
1183
1184	if (need_sign && con->ops->check_message_signature &&
1185	    con->ops->check_message_signature(m)) {
1186		pr_err("read_partial_message %p signature check failed\n", m);
1187		return -EBADMSG;
1188	}
1189
1190	return 1; /* done! */
1191}
1192
1193static int read_keepalive_ack(struct ceph_connection *con)
1194{
1195	struct ceph_timespec ceph_ts;
1196	size_t size = sizeof(ceph_ts);
1197	int ret = read_partial(con, size, size, &ceph_ts);
1198	if (ret <= 0)
1199		return ret;
1200	ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts);
1201	prepare_read_tag(con);
1202	return 1;
1203}
1204
1205/*
1206 * Read what we can from the socket.
1207 */
1208int ceph_con_v1_try_read(struct ceph_connection *con)
1209{
1210	int ret = -1;
1211
1212more:
1213	dout("try_read start %p state %d\n", con, con->state);
1214	if (con->state != CEPH_CON_S_V1_BANNER &&
1215	    con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1216	    con->state != CEPH_CON_S_OPEN)
1217		return 0;
1218
1219	BUG_ON(!con->sock);
1220
1221	dout("try_read tag %d in_base_pos %d\n", con->v1.in_tag,
1222	     con->v1.in_base_pos);
1223
1224	if (con->state == CEPH_CON_S_V1_BANNER) {
1225		ret = read_partial_banner(con);
1226		if (ret <= 0)
1227			goto out;
1228		ret = process_banner(con);
1229		if (ret < 0)
1230			goto out;
1231
1232		con->state = CEPH_CON_S_V1_CONNECT_MSG;
1233
1234		/*
1235		 * Received banner is good, exchange connection info.
1236		 * Do not reset out_kvec, as sending our banner raced
1237		 * with receiving peer banner after connect completed.
1238		 */
1239		ret = prepare_write_connect(con);
1240		if (ret < 0)
1241			goto out;
1242		prepare_read_connect(con);
1243
1244		/* Send connection info before awaiting response */
1245		goto out;
1246	}
1247
1248	if (con->state == CEPH_CON_S_V1_CONNECT_MSG) {
1249		ret = read_partial_connect(con);
1250		if (ret <= 0)
1251			goto out;
1252		ret = process_connect(con);
1253		if (ret < 0)
1254			goto out;
1255		goto more;
1256	}
1257
1258	WARN_ON(con->state != CEPH_CON_S_OPEN);
1259
1260	if (con->v1.in_base_pos < 0) {
1261		/*
1262		 * skipping + discarding content.
1263		 */
1264		ret = ceph_tcp_recvmsg(con->sock, NULL, -con->v1.in_base_pos);
1265		if (ret <= 0)
1266			goto out;
1267		dout("skipped %d / %d bytes\n", ret, -con->v1.in_base_pos);
1268		con->v1.in_base_pos += ret;
1269		if (con->v1.in_base_pos)
1270			goto more;
1271	}
1272	if (con->v1.in_tag == CEPH_MSGR_TAG_READY) {
1273		/*
1274		 * what's next?
1275		 */
1276		ret = ceph_tcp_recvmsg(con->sock, &con->v1.in_tag, 1);
1277		if (ret <= 0)
1278			goto out;
1279		dout("try_read got tag %d\n", con->v1.in_tag);
1280		switch (con->v1.in_tag) {
1281		case CEPH_MSGR_TAG_MSG:
1282			prepare_read_message(con);
1283			break;
1284		case CEPH_MSGR_TAG_ACK:
1285			prepare_read_ack(con);
1286			break;
1287		case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
1288			prepare_read_keepalive_ack(con);
1289			break;
1290		case CEPH_MSGR_TAG_CLOSE:
1291			ceph_con_close_socket(con);
1292			con->state = CEPH_CON_S_CLOSED;
1293			goto out;
1294		default:
1295			goto bad_tag;
1296		}
1297	}
1298	if (con->v1.in_tag == CEPH_MSGR_TAG_MSG) {
1299		ret = read_partial_message(con);
1300		if (ret <= 0) {
1301			switch (ret) {
1302			case -EBADMSG:
1303				con->error_msg = "bad crc/signature";
1304				fallthrough;
1305			case -EBADE:
1306				ret = -EIO;
1307				break;
1308			case -EIO:
1309				con->error_msg = "io error";
1310				break;
1311			}
1312			goto out;
1313		}
1314		if (con->v1.in_tag == CEPH_MSGR_TAG_READY)
1315			goto more;
1316		ceph_con_process_message(con);
1317		if (con->state == CEPH_CON_S_OPEN)
1318			prepare_read_tag(con);
1319		goto more;
1320	}
1321	if (con->v1.in_tag == CEPH_MSGR_TAG_ACK ||
1322	    con->v1.in_tag == CEPH_MSGR_TAG_SEQ) {
1323		/*
1324		 * the final handshake seq exchange is semantically
1325		 * equivalent to an ACK
1326		 */
1327		ret = read_partial_ack(con);
1328		if (ret <= 0)
1329			goto out;
1330		process_ack(con);
1331		goto more;
1332	}
1333	if (con->v1.in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
1334		ret = read_keepalive_ack(con);
1335		if (ret <= 0)
1336			goto out;
1337		goto more;
1338	}
1339
1340out:
1341	dout("try_read done on %p ret %d\n", con, ret);
1342	return ret;
1343
1344bad_tag:
1345	pr_err("try_read bad tag %d\n", con->v1.in_tag);
1346	con->error_msg = "protocol error, garbage tag";
1347	ret = -1;
1348	goto out;
1349}
1350
1351/*
1352 * Write something to the socket.  Called in a worker thread when the
1353 * socket appears to be writeable and we have something ready to send.
1354 */
1355int ceph_con_v1_try_write(struct ceph_connection *con)
1356{
1357	int ret = 1;
1358
1359	dout("try_write start %p state %d\n", con, con->state);
1360	if (con->state != CEPH_CON_S_PREOPEN &&
1361	    con->state != CEPH_CON_S_V1_BANNER &&
1362	    con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1363	    con->state != CEPH_CON_S_OPEN)
1364		return 0;
1365
1366	/* open the socket first? */
1367	if (con->state == CEPH_CON_S_PREOPEN) {
1368		BUG_ON(con->sock);
1369		con->state = CEPH_CON_S_V1_BANNER;
1370
1371		con_out_kvec_reset(con);
1372		prepare_write_banner(con);
1373		prepare_read_banner(con);
1374
1375		BUG_ON(con->in_msg);
1376		con->v1.in_tag = CEPH_MSGR_TAG_READY;
1377		dout("try_write initiating connect on %p new state %d\n",
1378		     con, con->state);
1379		ret = ceph_tcp_connect(con);
1380		if (ret < 0) {
1381			con->error_msg = "connect error";
1382			goto out;
1383		}
1384	}
1385
1386more:
1387	dout("try_write out_kvec_bytes %d\n", con->v1.out_kvec_bytes);
1388	BUG_ON(!con->sock);
1389
1390	/* kvec data queued? */
1391	if (con->v1.out_kvec_left) {
1392		ret = write_partial_kvec(con);
1393		if (ret <= 0)
1394			goto out;
1395	}
1396	if (con->v1.out_skip) {
1397		ret = write_partial_skip(con);
1398		if (ret <= 0)
1399			goto out;
1400	}
1401
1402	/* msg pages? */
1403	if (con->out_msg) {
1404		if (con->v1.out_msg_done) {
1405			ceph_msg_put(con->out_msg);
1406			con->out_msg = NULL;   /* we're done with this one */
1407			goto do_next;
1408		}
1409
1410		ret = write_partial_message_data(con);
1411		if (ret == 1)
1412			goto more;  /* we need to send the footer, too! */
1413		if (ret == 0)
1414			goto out;
1415		if (ret < 0) {
1416			dout("try_write write_partial_message_data err %d\n",
1417			     ret);
1418			goto out;
1419		}
1420	}
1421
1422do_next:
1423	if (con->state == CEPH_CON_S_OPEN) {
1424		if (ceph_con_flag_test_and_clear(con,
1425				CEPH_CON_F_KEEPALIVE_PENDING)) {
1426			prepare_write_keepalive(con);
1427			goto more;
1428		}
1429		/* is anything else pending? */
1430		if (!list_empty(&con->out_queue)) {
1431			prepare_write_message(con);
1432			goto more;
1433		}
1434		if (con->in_seq > con->in_seq_acked) {
1435			prepare_write_ack(con);
1436			goto more;
1437		}
1438	}
1439
1440	/* Nothing to do! */
1441	ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
1442	dout("try_write nothing else to write.\n");
1443	ret = 0;
1444out:
1445	dout("try_write done on %p ret %d\n", con, ret);
1446	return ret;
1447}
1448
1449void ceph_con_v1_revoke(struct ceph_connection *con)
1450{
1451	struct ceph_msg *msg = con->out_msg;
1452
1453	WARN_ON(con->v1.out_skip);
1454	/* footer */
1455	if (con->v1.out_msg_done) {
1456		con->v1.out_skip += con_out_kvec_skip(con);
1457	} else {
1458		WARN_ON(!msg->data_length);
1459		con->v1.out_skip += sizeof_footer(con);
1460	}
1461	/* data, middle, front */
1462	if (msg->data_length)
1463		con->v1.out_skip += msg->cursor.total_resid;
1464	if (msg->middle)
1465		con->v1.out_skip += con_out_kvec_skip(con);
1466	con->v1.out_skip += con_out_kvec_skip(con);
1467
1468	dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
1469	     con->v1.out_kvec_bytes, con->v1.out_skip);
1470}
1471
1472void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
1473{
1474	unsigned int front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1475	unsigned int middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1476	unsigned int data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1477
1478	/* skip rest of message */
1479	con->v1.in_base_pos = con->v1.in_base_pos -
1480			sizeof(struct ceph_msg_header) -
1481			front_len -
1482			middle_len -
1483			data_len -
1484			sizeof(struct ceph_msg_footer);
1485
1486	con->v1.in_tag = CEPH_MSGR_TAG_READY;
1487	con->in_seq++;
1488
1489	dout("%s con %p in_base_pos %d\n", __func__, con, con->v1.in_base_pos);
1490}
1491
1492bool ceph_con_v1_opened(struct ceph_connection *con)
1493{
1494	return con->v1.connect_seq;
1495}
1496
1497void ceph_con_v1_reset_session(struct ceph_connection *con)
1498{
1499	con->v1.connect_seq = 0;
1500	con->v1.peer_global_seq = 0;
1501}
1502
1503void ceph_con_v1_reset_protocol(struct ceph_connection *con)
1504{
1505	con->v1.out_skip = 0;
1506}
v6.8
   1// SPDX-License-Identifier: GPL-2.0
   2#include <linux/ceph/ceph_debug.h>
   3
   4#include <linux/bvec.h>
   5#include <linux/crc32c.h>
   6#include <linux/net.h>
   7#include <linux/socket.h>
   8#include <net/sock.h>
   9
  10#include <linux/ceph/ceph_features.h>
  11#include <linux/ceph/decode.h>
  12#include <linux/ceph/libceph.h>
  13#include <linux/ceph/messenger.h>
  14
  15/* static tag bytes (protocol control messages) */
  16static char tag_msg = CEPH_MSGR_TAG_MSG;
  17static char tag_ack = CEPH_MSGR_TAG_ACK;
  18static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
  19static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
  20
  21/*
  22 * If @buf is NULL, discard up to @len bytes.
  23 */
  24static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
  25{
  26	struct kvec iov = {buf, len};
  27	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
  28	int r;
  29
  30	if (!buf)
  31		msg.msg_flags |= MSG_TRUNC;
  32
  33	iov_iter_kvec(&msg.msg_iter, ITER_DEST, &iov, 1, len);
  34	r = sock_recvmsg(sock, &msg, msg.msg_flags);
  35	if (r == -EAGAIN)
  36		r = 0;
  37	return r;
  38}
  39
  40static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
  41		     int page_offset, size_t length)
  42{
  43	struct bio_vec bvec;
 
 
 
 
  44	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
  45	int r;
  46
  47	BUG_ON(page_offset + length > PAGE_SIZE);
  48	bvec_set_page(&bvec, page, length, page_offset);
  49	iov_iter_bvec(&msg.msg_iter, ITER_DEST, &bvec, 1, length);
  50	r = sock_recvmsg(sock, &msg, msg.msg_flags);
  51	if (r == -EAGAIN)
  52		r = 0;
  53	return r;
  54}
  55
  56/*
  57 * write something.  @more is true if caller will be sending more data
  58 * shortly.
  59 */
  60static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
  61			    size_t kvlen, size_t len, bool more)
  62{
  63	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
  64	int r;
  65
  66	if (more)
  67		msg.msg_flags |= MSG_MORE;
  68	else
  69		msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
  70
  71	r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
  72	if (r == -EAGAIN)
  73		r = 0;
  74	return r;
  75}
  76
  77/*
  78 * @more: MSG_MORE or 0.
  79 */
  80static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
  81			     int offset, size_t size, int more)
  82{
  83	struct msghdr msg = {
  84		.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | more,
  85	};
  86	struct bio_vec bvec;
  87	int ret;
  88
  89	/*
  90	 * MSG_SPLICE_PAGES cannot properly handle pages with page_count == 0,
  91	 * we need to fall back to sendmsg if that's the case.
  92	 *
  93	 * Same goes for slab pages: skb_can_coalesce() allows
  94	 * coalescing neighboring slab objects into a single frag which
  95	 * triggers one of hardened usercopy checks.
  96	 */
  97	if (sendpage_ok(page))
  98		msg.msg_flags |= MSG_SPLICE_PAGES;
 
 
  99
 100	bvec_set_page(&bvec, page, size, offset);
 101	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, size);
 102
 103	ret = sock_sendmsg(sock, &msg);
 104	if (ret == -EAGAIN)
 105		ret = 0;
 106
 107	return ret;
 108}
 109
 110static void con_out_kvec_reset(struct ceph_connection *con)
 111{
 112	BUG_ON(con->v1.out_skip);
 113
 114	con->v1.out_kvec_left = 0;
 115	con->v1.out_kvec_bytes = 0;
 116	con->v1.out_kvec_cur = &con->v1.out_kvec[0];
 117}
 118
 119static void con_out_kvec_add(struct ceph_connection *con,
 120				size_t size, void *data)
 121{
 122	int index = con->v1.out_kvec_left;
 123
 124	BUG_ON(con->v1.out_skip);
 125	BUG_ON(index >= ARRAY_SIZE(con->v1.out_kvec));
 126
 127	con->v1.out_kvec[index].iov_len = size;
 128	con->v1.out_kvec[index].iov_base = data;
 129	con->v1.out_kvec_left++;
 130	con->v1.out_kvec_bytes += size;
 131}
 132
 133/*
 134 * Chop off a kvec from the end.  Return residual number of bytes for
 135 * that kvec, i.e. how many bytes would have been written if the kvec
 136 * hadn't been nuked.
 137 */
 138static int con_out_kvec_skip(struct ceph_connection *con)
 139{
 140	int skip = 0;
 141
 142	if (con->v1.out_kvec_bytes > 0) {
 143		skip = con->v1.out_kvec_cur[con->v1.out_kvec_left - 1].iov_len;
 144		BUG_ON(con->v1.out_kvec_bytes < skip);
 145		BUG_ON(!con->v1.out_kvec_left);
 146		con->v1.out_kvec_bytes -= skip;
 147		con->v1.out_kvec_left--;
 148	}
 149
 150	return skip;
 151}
 152
 153static size_t sizeof_footer(struct ceph_connection *con)
 154{
 155	return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
 156	    sizeof(struct ceph_msg_footer) :
 157	    sizeof(struct ceph_msg_footer_old);
 158}
 159
 160static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
 161{
 162	/* Initialize data cursor if it's not a sparse read */
 163	u64 len = msg->sparse_read_total ? : data_len;
 164
 165	ceph_msg_data_cursor_init(&msg->cursor, msg, len);
 166}
 167
 168/*
 169 * Prepare footer for currently outgoing message, and finish things
 170 * off.  Assumes out_kvec* are already valid.. we just add on to the end.
 171 */
 172static void prepare_write_message_footer(struct ceph_connection *con)
 173{
 174	struct ceph_msg *m = con->out_msg;
 175
 176	m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
 177
 178	dout("prepare_write_message_footer %p\n", con);
 179	con_out_kvec_add(con, sizeof_footer(con), &m->footer);
 180	if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
 181		if (con->ops->sign_message)
 182			con->ops->sign_message(m);
 183		else
 184			m->footer.sig = 0;
 185	} else {
 186		m->old_footer.flags = m->footer.flags;
 187	}
 188	con->v1.out_more = m->more_to_follow;
 189	con->v1.out_msg_done = true;
 190}
 191
 192/*
 193 * Prepare headers for the next outgoing message.
 194 */
 195static void prepare_write_message(struct ceph_connection *con)
 196{
 197	struct ceph_msg *m;
 198	u32 crc;
 199
 200	con_out_kvec_reset(con);
 201	con->v1.out_msg_done = false;
 202
 203	/* Sneak an ack in there first?  If we can get it into the same
 204	 * TCP packet that's a good thing. */
 205	if (con->in_seq > con->in_seq_acked) {
 206		con->in_seq_acked = con->in_seq;
 207		con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
 208		con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
 209		con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
 210			&con->v1.out_temp_ack);
 211	}
 212
 213	ceph_con_get_out_msg(con);
 214	m = con->out_msg;
 215
 216	dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
 217	     m, con->out_seq, le16_to_cpu(m->hdr.type),
 218	     le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
 219	     m->data_length);
 220	WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
 221	WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
 222
 223	/* tag + hdr + front + middle */
 224	con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
 225	con_out_kvec_add(con, sizeof(con->v1.out_hdr), &con->v1.out_hdr);
 226	con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
 227
 228	if (m->middle)
 229		con_out_kvec_add(con, m->middle->vec.iov_len,
 230			m->middle->vec.iov_base);
 231
 232	/* fill in hdr crc and finalize hdr */
 233	crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
 234	con->out_msg->hdr.crc = cpu_to_le32(crc);
 235	memcpy(&con->v1.out_hdr, &con->out_msg->hdr, sizeof(con->v1.out_hdr));
 236
 237	/* fill in front and middle crc, footer */
 238	crc = crc32c(0, m->front.iov_base, m->front.iov_len);
 239	con->out_msg->footer.front_crc = cpu_to_le32(crc);
 240	if (m->middle) {
 241		crc = crc32c(0, m->middle->vec.iov_base,
 242				m->middle->vec.iov_len);
 243		con->out_msg->footer.middle_crc = cpu_to_le32(crc);
 244	} else
 245		con->out_msg->footer.middle_crc = 0;
 246	dout("%s front_crc %u middle_crc %u\n", __func__,
 247	     le32_to_cpu(con->out_msg->footer.front_crc),
 248	     le32_to_cpu(con->out_msg->footer.middle_crc));
 249	con->out_msg->footer.flags = 0;
 250
 251	/* is there a data payload? */
 252	con->out_msg->footer.data_crc = 0;
 253	if (m->data_length) {
 254		prepare_message_data(con->out_msg, m->data_length);
 255		con->v1.out_more = 1;  /* data + footer will follow */
 256	} else {
 257		/* no, queue up footer too and be done */
 258		prepare_write_message_footer(con);
 259	}
 260
 261	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 262}
 263
 264/*
 265 * Prepare an ack.
 266 */
 267static void prepare_write_ack(struct ceph_connection *con)
 268{
 269	dout("prepare_write_ack %p %llu -> %llu\n", con,
 270	     con->in_seq_acked, con->in_seq);
 271	con->in_seq_acked = con->in_seq;
 272
 273	con_out_kvec_reset(con);
 274
 275	con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
 276
 277	con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
 278	con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
 279			 &con->v1.out_temp_ack);
 280
 281	con->v1.out_more = 1;  /* more will follow.. eventually.. */
 282	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 283}
 284
 285/*
 286 * Prepare to share the seq during handshake
 287 */
 288static void prepare_write_seq(struct ceph_connection *con)
 289{
 290	dout("prepare_write_seq %p %llu -> %llu\n", con,
 291	     con->in_seq_acked, con->in_seq);
 292	con->in_seq_acked = con->in_seq;
 293
 294	con_out_kvec_reset(con);
 295
 296	con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
 297	con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
 298			 &con->v1.out_temp_ack);
 299
 300	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 301}
 302
 303/*
 304 * Prepare to write keepalive byte.
 305 */
 306static void prepare_write_keepalive(struct ceph_connection *con)
 307{
 308	dout("prepare_write_keepalive %p\n", con);
 309	con_out_kvec_reset(con);
 310	if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
 311		struct timespec64 now;
 312
 313		ktime_get_real_ts64(&now);
 314		con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
 315		ceph_encode_timespec64(&con->v1.out_temp_keepalive2, &now);
 316		con_out_kvec_add(con, sizeof(con->v1.out_temp_keepalive2),
 317				 &con->v1.out_temp_keepalive2);
 318	} else {
 319		con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
 320	}
 321	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 322}
 323
 324/*
 325 * Connection negotiation.
 326 */
 327
 328static int get_connect_authorizer(struct ceph_connection *con)
 329{
 330	struct ceph_auth_handshake *auth;
 331	int auth_proto;
 332
 333	if (!con->ops->get_authorizer) {
 334		con->v1.auth = NULL;
 335		con->v1.out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
 336		con->v1.out_connect.authorizer_len = 0;
 337		return 0;
 338	}
 339
 340	auth = con->ops->get_authorizer(con, &auth_proto, con->v1.auth_retry);
 341	if (IS_ERR(auth))
 342		return PTR_ERR(auth);
 343
 344	con->v1.auth = auth;
 345	con->v1.out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
 346	con->v1.out_connect.authorizer_len =
 347		cpu_to_le32(auth->authorizer_buf_len);
 348	return 0;
 349}
 350
 351/*
 352 * We connected to a peer and are saying hello.
 353 */
 354static void prepare_write_banner(struct ceph_connection *con)
 355{
 356	con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
 357	con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
 358					&con->msgr->my_enc_addr);
 359
 360	con->v1.out_more = 0;
 361	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 362}
 363
 364static void __prepare_write_connect(struct ceph_connection *con)
 365{
 366	con_out_kvec_add(con, sizeof(con->v1.out_connect),
 367			 &con->v1.out_connect);
 368	if (con->v1.auth)
 369		con_out_kvec_add(con, con->v1.auth->authorizer_buf_len,
 370				 con->v1.auth->authorizer_buf);
 371
 372	con->v1.out_more = 0;
 373	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
 374}
 375
 376static int prepare_write_connect(struct ceph_connection *con)
 377{
 378	unsigned int global_seq = ceph_get_global_seq(con->msgr, 0);
 379	int proto;
 380	int ret;
 381
 382	switch (con->peer_name.type) {
 383	case CEPH_ENTITY_TYPE_MON:
 384		proto = CEPH_MONC_PROTOCOL;
 385		break;
 386	case CEPH_ENTITY_TYPE_OSD:
 387		proto = CEPH_OSDC_PROTOCOL;
 388		break;
 389	case CEPH_ENTITY_TYPE_MDS:
 390		proto = CEPH_MDSC_PROTOCOL;
 391		break;
 392	default:
 393		BUG();
 394	}
 395
 396	dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
 397	     con->v1.connect_seq, global_seq, proto);
 398
 399	con->v1.out_connect.features =
 400		cpu_to_le64(from_msgr(con->msgr)->supported_features);
 401	con->v1.out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
 402	con->v1.out_connect.connect_seq = cpu_to_le32(con->v1.connect_seq);
 403	con->v1.out_connect.global_seq = cpu_to_le32(global_seq);
 404	con->v1.out_connect.protocol_version = cpu_to_le32(proto);
 405	con->v1.out_connect.flags = 0;
 406
 407	ret = get_connect_authorizer(con);
 408	if (ret)
 409		return ret;
 410
 411	__prepare_write_connect(con);
 412	return 0;
 413}
 414
 415/*
 416 * write as much of pending kvecs to the socket as we can.
 417 *  1 -> done
 418 *  0 -> socket full, but more to do
 419 * <0 -> error
 420 */
 421static int write_partial_kvec(struct ceph_connection *con)
 422{
 423	int ret;
 424
 425	dout("write_partial_kvec %p %d left\n", con, con->v1.out_kvec_bytes);
 426	while (con->v1.out_kvec_bytes > 0) {
 427		ret = ceph_tcp_sendmsg(con->sock, con->v1.out_kvec_cur,
 428				       con->v1.out_kvec_left,
 429				       con->v1.out_kvec_bytes,
 430				       con->v1.out_more);
 431		if (ret <= 0)
 432			goto out;
 433		con->v1.out_kvec_bytes -= ret;
 434		if (!con->v1.out_kvec_bytes)
 435			break;            /* done */
 436
 437		/* account for full iov entries consumed */
 438		while (ret >= con->v1.out_kvec_cur->iov_len) {
 439			BUG_ON(!con->v1.out_kvec_left);
 440			ret -= con->v1.out_kvec_cur->iov_len;
 441			con->v1.out_kvec_cur++;
 442			con->v1.out_kvec_left--;
 443		}
 444		/* and for a partially-consumed entry */
 445		if (ret) {
 446			con->v1.out_kvec_cur->iov_len -= ret;
 447			con->v1.out_kvec_cur->iov_base += ret;
 448		}
 449	}
 450	con->v1.out_kvec_left = 0;
 451	ret = 1;
 452out:
 453	dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
 454	     con->v1.out_kvec_bytes, con->v1.out_kvec_left, ret);
 455	return ret;  /* done! */
 456}
 457
 458/*
 459 * Write as much message data payload as we can.  If we finish, queue
 460 * up the footer.
 461 *  1 -> done, footer is now queued in out_kvec[].
 462 *  0 -> socket full, but more to do
 463 * <0 -> error
 464 */
 465static int write_partial_message_data(struct ceph_connection *con)
 466{
 467	struct ceph_msg *msg = con->out_msg;
 468	struct ceph_msg_data_cursor *cursor = &msg->cursor;
 469	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
 
 470	u32 crc;
 471
 472	dout("%s %p msg %p\n", __func__, con, msg);
 473
 474	if (!msg->num_data_items)
 475		return -EINVAL;
 476
 477	/*
 478	 * Iterate through each page that contains data to be
 479	 * written, and send as much as possible for each.
 480	 *
 481	 * If we are calculating the data crc (the default), we will
 482	 * need to map the page.  If we have no pages, they have
 483	 * been revoked, so use the zero page.
 484	 */
 485	crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
 486	while (cursor->total_resid) {
 487		struct page *page;
 488		size_t page_offset;
 489		size_t length;
 490		int ret;
 491
 492		if (!cursor->resid) {
 493			ceph_msg_data_advance(cursor, 0);
 494			continue;
 495		}
 496
 497		page = ceph_msg_data_next(cursor, &page_offset, &length);
 
 
 498		ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
 499					MSG_MORE);
 500		if (ret <= 0) {
 501			if (do_datacrc)
 502				msg->footer.data_crc = cpu_to_le32(crc);
 503
 504			return ret;
 505		}
 506		if (do_datacrc && cursor->need_crc)
 507			crc = ceph_crc32c_page(crc, page, page_offset, length);
 508		ceph_msg_data_advance(cursor, (size_t)ret);
 509	}
 510
 511	dout("%s %p msg %p done\n", __func__, con, msg);
 512
 513	/* prepare and queue up footer, too */
 514	if (do_datacrc)
 515		msg->footer.data_crc = cpu_to_le32(crc);
 516	else
 517		msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
 518	con_out_kvec_reset(con);
 519	prepare_write_message_footer(con);
 520
 521	return 1;	/* must return > 0 to indicate success */
 522}
 523
 524/*
 525 * write some zeros
 526 */
 527static int write_partial_skip(struct ceph_connection *con)
 528{
 
 529	int ret;
 530
 531	dout("%s %p %d left\n", __func__, con, con->v1.out_skip);
 532	while (con->v1.out_skip > 0) {
 533		size_t size = min(con->v1.out_skip, (int)PAGE_SIZE);
 534
 
 
 535		ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
 536					MSG_MORE);
 537		if (ret <= 0)
 538			goto out;
 539		con->v1.out_skip -= ret;
 540	}
 541	ret = 1;
 542out:
 543	return ret;
 544}
 545
 546/*
 547 * Prepare to read connection handshake, or an ack.
 548 */
 549static void prepare_read_banner(struct ceph_connection *con)
 550{
 551	dout("prepare_read_banner %p\n", con);
 552	con->v1.in_base_pos = 0;
 553}
 554
 555static void prepare_read_connect(struct ceph_connection *con)
 556{
 557	dout("prepare_read_connect %p\n", con);
 558	con->v1.in_base_pos = 0;
 559}
 560
 561static void prepare_read_ack(struct ceph_connection *con)
 562{
 563	dout("prepare_read_ack %p\n", con);
 564	con->v1.in_base_pos = 0;
 565}
 566
 567static void prepare_read_seq(struct ceph_connection *con)
 568{
 569	dout("prepare_read_seq %p\n", con);
 570	con->v1.in_base_pos = 0;
 571	con->v1.in_tag = CEPH_MSGR_TAG_SEQ;
 572}
 573
 574static void prepare_read_tag(struct ceph_connection *con)
 575{
 576	dout("prepare_read_tag %p\n", con);
 577	con->v1.in_base_pos = 0;
 578	con->v1.in_tag = CEPH_MSGR_TAG_READY;
 579}
 580
 581static void prepare_read_keepalive_ack(struct ceph_connection *con)
 582{
 583	dout("prepare_read_keepalive_ack %p\n", con);
 584	con->v1.in_base_pos = 0;
 585}
 586
 587/*
 588 * Prepare to read a message.
 589 */
 590static int prepare_read_message(struct ceph_connection *con)
 591{
 592	dout("prepare_read_message %p\n", con);
 593	BUG_ON(con->in_msg != NULL);
 594	con->v1.in_base_pos = 0;
 595	con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
 596	return 0;
 597}
 598
 599static int read_partial(struct ceph_connection *con,
 600			int end, int size, void *object)
 601{
 602	while (con->v1.in_base_pos < end) {
 603		int left = end - con->v1.in_base_pos;
 604		int have = size - left;
 605		int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
 606		if (ret <= 0)
 607			return ret;
 608		con->v1.in_base_pos += ret;
 609	}
 610	return 1;
 611}
 612
 613/*
 614 * Read all or part of the connect-side handshake on a new connection
 615 */
 616static int read_partial_banner(struct ceph_connection *con)
 617{
 618	int size;
 619	int end;
 620	int ret;
 621
 622	dout("read_partial_banner %p at %d\n", con, con->v1.in_base_pos);
 623
 624	/* peer's banner */
 625	size = strlen(CEPH_BANNER);
 626	end = size;
 627	ret = read_partial(con, end, size, con->v1.in_banner);
 628	if (ret <= 0)
 629		goto out;
 630
 631	size = sizeof(con->v1.actual_peer_addr);
 632	end += size;
 633	ret = read_partial(con, end, size, &con->v1.actual_peer_addr);
 634	if (ret <= 0)
 635		goto out;
 636	ceph_decode_banner_addr(&con->v1.actual_peer_addr);
 637
 638	size = sizeof(con->v1.peer_addr_for_me);
 639	end += size;
 640	ret = read_partial(con, end, size, &con->v1.peer_addr_for_me);
 641	if (ret <= 0)
 642		goto out;
 643	ceph_decode_banner_addr(&con->v1.peer_addr_for_me);
 644
 645out:
 646	return ret;
 647}
 648
 649static int read_partial_connect(struct ceph_connection *con)
 650{
 651	int size;
 652	int end;
 653	int ret;
 654
 655	dout("read_partial_connect %p at %d\n", con, con->v1.in_base_pos);
 656
 657	size = sizeof(con->v1.in_reply);
 658	end = size;
 659	ret = read_partial(con, end, size, &con->v1.in_reply);
 660	if (ret <= 0)
 661		goto out;
 662
 663	if (con->v1.auth) {
 664		size = le32_to_cpu(con->v1.in_reply.authorizer_len);
 665		if (size > con->v1.auth->authorizer_reply_buf_len) {
 666			pr_err("authorizer reply too big: %d > %zu\n", size,
 667			       con->v1.auth->authorizer_reply_buf_len);
 668			ret = -EINVAL;
 669			goto out;
 670		}
 671
 672		end += size;
 673		ret = read_partial(con, end, size,
 674				   con->v1.auth->authorizer_reply_buf);
 675		if (ret <= 0)
 676			goto out;
 677	}
 678
 679	dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
 680	     con, con->v1.in_reply.tag,
 681	     le32_to_cpu(con->v1.in_reply.connect_seq),
 682	     le32_to_cpu(con->v1.in_reply.global_seq));
 683out:
 684	return ret;
 685}
 686
 687/*
 688 * Verify the hello banner looks okay.
 689 */
 690static int verify_hello(struct ceph_connection *con)
 691{
 692	if (memcmp(con->v1.in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
 693		pr_err("connect to %s got bad banner\n",
 694		       ceph_pr_addr(&con->peer_addr));
 695		con->error_msg = "protocol error, bad banner";
 696		return -1;
 697	}
 698	return 0;
 699}
 700
 701static int process_banner(struct ceph_connection *con)
 702{
 703	struct ceph_entity_addr *my_addr = &con->msgr->inst.addr;
 704
 705	dout("process_banner on %p\n", con);
 706
 707	if (verify_hello(con) < 0)
 708		return -1;
 709
 710	/*
 711	 * Make sure the other end is who we wanted.  note that the other
 712	 * end may not yet know their ip address, so if it's 0.0.0.0, give
 713	 * them the benefit of the doubt.
 714	 */
 715	if (memcmp(&con->peer_addr, &con->v1.actual_peer_addr,
 716		   sizeof(con->peer_addr)) != 0 &&
 717	    !(ceph_addr_is_blank(&con->v1.actual_peer_addr) &&
 718	      con->v1.actual_peer_addr.nonce == con->peer_addr.nonce)) {
 719		pr_warn("wrong peer, want %s/%u, got %s/%u\n",
 720			ceph_pr_addr(&con->peer_addr),
 721			le32_to_cpu(con->peer_addr.nonce),
 722			ceph_pr_addr(&con->v1.actual_peer_addr),
 723			le32_to_cpu(con->v1.actual_peer_addr.nonce));
 724		con->error_msg = "wrong peer at address";
 725		return -1;
 726	}
 727
 728	/*
 729	 * did we learn our address?
 730	 */
 731	if (ceph_addr_is_blank(my_addr)) {
 732		memcpy(&my_addr->in_addr,
 733		       &con->v1.peer_addr_for_me.in_addr,
 734		       sizeof(con->v1.peer_addr_for_me.in_addr));
 735		ceph_addr_set_port(my_addr, 0);
 736		ceph_encode_my_addr(con->msgr);
 737		dout("process_banner learned my addr is %s\n",
 738		     ceph_pr_addr(my_addr));
 739	}
 740
 741	return 0;
 742}
 743
 744static int process_connect(struct ceph_connection *con)
 745{
 746	u64 sup_feat = from_msgr(con->msgr)->supported_features;
 747	u64 req_feat = from_msgr(con->msgr)->required_features;
 748	u64 server_feat = le64_to_cpu(con->v1.in_reply.features);
 749	int ret;
 750
 751	dout("process_connect on %p tag %d\n", con, con->v1.in_tag);
 752
 753	if (con->v1.auth) {
 754		int len = le32_to_cpu(con->v1.in_reply.authorizer_len);
 755
 756		/*
 757		 * Any connection that defines ->get_authorizer()
 758		 * should also define ->add_authorizer_challenge() and
 759		 * ->verify_authorizer_reply().
 760		 *
 761		 * See get_connect_authorizer().
 762		 */
 763		if (con->v1.in_reply.tag ==
 764				CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
 765			ret = con->ops->add_authorizer_challenge(
 766				con, con->v1.auth->authorizer_reply_buf, len);
 767			if (ret < 0)
 768				return ret;
 769
 770			con_out_kvec_reset(con);
 771			__prepare_write_connect(con);
 772			prepare_read_connect(con);
 773			return 0;
 774		}
 775
 776		if (len) {
 777			ret = con->ops->verify_authorizer_reply(con);
 778			if (ret < 0) {
 779				con->error_msg = "bad authorize reply";
 780				return ret;
 781			}
 782		}
 783	}
 784
 785	switch (con->v1.in_reply.tag) {
 786	case CEPH_MSGR_TAG_FEATURES:
 787		pr_err("%s%lld %s feature set mismatch,"
 788		       " my %llx < server's %llx, missing %llx\n",
 789		       ENTITY_NAME(con->peer_name),
 790		       ceph_pr_addr(&con->peer_addr),
 791		       sup_feat, server_feat, server_feat & ~sup_feat);
 792		con->error_msg = "missing required protocol features";
 793		return -1;
 794
 795	case CEPH_MSGR_TAG_BADPROTOVER:
 796		pr_err("%s%lld %s protocol version mismatch,"
 797		       " my %d != server's %d\n",
 798		       ENTITY_NAME(con->peer_name),
 799		       ceph_pr_addr(&con->peer_addr),
 800		       le32_to_cpu(con->v1.out_connect.protocol_version),
 801		       le32_to_cpu(con->v1.in_reply.protocol_version));
 802		con->error_msg = "protocol version mismatch";
 803		return -1;
 804
 805	case CEPH_MSGR_TAG_BADAUTHORIZER:
 806		con->v1.auth_retry++;
 807		dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
 808		     con->v1.auth_retry);
 809		if (con->v1.auth_retry == 2) {
 810			con->error_msg = "connect authorization failure";
 811			return -1;
 812		}
 813		con_out_kvec_reset(con);
 814		ret = prepare_write_connect(con);
 815		if (ret < 0)
 816			return ret;
 817		prepare_read_connect(con);
 818		break;
 819
 820	case CEPH_MSGR_TAG_RESETSESSION:
 821		/*
 822		 * If we connected with a large connect_seq but the peer
 823		 * has no record of a session with us (no connection, or
 824		 * connect_seq == 0), they will send RESETSESION to indicate
 825		 * that they must have reset their session, and may have
 826		 * dropped messages.
 827		 */
 828		dout("process_connect got RESET peer seq %u\n",
 829		     le32_to_cpu(con->v1.in_reply.connect_seq));
 830		pr_info("%s%lld %s session reset\n",
 831			ENTITY_NAME(con->peer_name),
 832			ceph_pr_addr(&con->peer_addr));
 833		ceph_con_reset_session(con);
 834		con_out_kvec_reset(con);
 835		ret = prepare_write_connect(con);
 836		if (ret < 0)
 837			return ret;
 838		prepare_read_connect(con);
 839
 840		/* Tell ceph about it. */
 841		mutex_unlock(&con->mutex);
 842		if (con->ops->peer_reset)
 843			con->ops->peer_reset(con);
 844		mutex_lock(&con->mutex);
 845		if (con->state != CEPH_CON_S_V1_CONNECT_MSG)
 846			return -EAGAIN;
 847		break;
 848
 849	case CEPH_MSGR_TAG_RETRY_SESSION:
 850		/*
 851		 * If we sent a smaller connect_seq than the peer has, try
 852		 * again with a larger value.
 853		 */
 854		dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
 855		     le32_to_cpu(con->v1.out_connect.connect_seq),
 856		     le32_to_cpu(con->v1.in_reply.connect_seq));
 857		con->v1.connect_seq = le32_to_cpu(con->v1.in_reply.connect_seq);
 858		con_out_kvec_reset(con);
 859		ret = prepare_write_connect(con);
 860		if (ret < 0)
 861			return ret;
 862		prepare_read_connect(con);
 863		break;
 864
 865	case CEPH_MSGR_TAG_RETRY_GLOBAL:
 866		/*
 867		 * If we sent a smaller global_seq than the peer has, try
 868		 * again with a larger value.
 869		 */
 870		dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
 871		     con->v1.peer_global_seq,
 872		     le32_to_cpu(con->v1.in_reply.global_seq));
 873		ceph_get_global_seq(con->msgr,
 874				    le32_to_cpu(con->v1.in_reply.global_seq));
 875		con_out_kvec_reset(con);
 876		ret = prepare_write_connect(con);
 877		if (ret < 0)
 878			return ret;
 879		prepare_read_connect(con);
 880		break;
 881
 882	case CEPH_MSGR_TAG_SEQ:
 883	case CEPH_MSGR_TAG_READY:
 884		if (req_feat & ~server_feat) {
 885			pr_err("%s%lld %s protocol feature mismatch,"
 886			       " my required %llx > server's %llx, need %llx\n",
 887			       ENTITY_NAME(con->peer_name),
 888			       ceph_pr_addr(&con->peer_addr),
 889			       req_feat, server_feat, req_feat & ~server_feat);
 890			con->error_msg = "missing required protocol features";
 891			return -1;
 892		}
 893
 894		WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG);
 895		con->state = CEPH_CON_S_OPEN;
 896		con->v1.auth_retry = 0;    /* we authenticated; clear flag */
 897		con->v1.peer_global_seq =
 898			le32_to_cpu(con->v1.in_reply.global_seq);
 899		con->v1.connect_seq++;
 900		con->peer_features = server_feat;
 901		dout("process_connect got READY gseq %d cseq %d (%d)\n",
 902		     con->v1.peer_global_seq,
 903		     le32_to_cpu(con->v1.in_reply.connect_seq),
 904		     con->v1.connect_seq);
 905		WARN_ON(con->v1.connect_seq !=
 906			le32_to_cpu(con->v1.in_reply.connect_seq));
 907
 908		if (con->v1.in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
 909			ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX);
 910
 911		con->delay = 0;      /* reset backoff memory */
 912
 913		if (con->v1.in_reply.tag == CEPH_MSGR_TAG_SEQ) {
 914			prepare_write_seq(con);
 915			prepare_read_seq(con);
 916		} else {
 917			prepare_read_tag(con);
 918		}
 919		break;
 920
 921	case CEPH_MSGR_TAG_WAIT:
 922		/*
 923		 * If there is a connection race (we are opening
 924		 * connections to each other), one of us may just have
 925		 * to WAIT.  This shouldn't happen if we are the
 926		 * client.
 927		 */
 928		con->error_msg = "protocol error, got WAIT as client";
 929		return -1;
 930
 931	default:
 932		con->error_msg = "protocol error, garbage tag during connect";
 933		return -1;
 934	}
 935	return 0;
 936}
 937
 938/*
 939 * read (part of) an ack
 940 */
 941static int read_partial_ack(struct ceph_connection *con)
 942{
 943	int size = sizeof(con->v1.in_temp_ack);
 944	int end = size;
 945
 946	return read_partial(con, end, size, &con->v1.in_temp_ack);
 947}
 948
 949/*
 950 * We can finally discard anything that's been acked.
 951 */
 952static void process_ack(struct ceph_connection *con)
 953{
 954	u64 ack = le64_to_cpu(con->v1.in_temp_ack);
 955
 956	if (con->v1.in_tag == CEPH_MSGR_TAG_ACK)
 957		ceph_con_discard_sent(con, ack);
 958	else
 959		ceph_con_discard_requeued(con, ack);
 960
 961	prepare_read_tag(con);
 962}
 963
 964static int read_partial_message_chunk(struct ceph_connection *con,
 965				      struct kvec *section,
 966				      unsigned int sec_len, u32 *crc)
 967{
 968	int ret, left;
 969
 970	BUG_ON(!section);
 971
 972	while (section->iov_len < sec_len) {
 973		BUG_ON(section->iov_base == NULL);
 974		left = sec_len - section->iov_len;
 975		ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
 976				       section->iov_len, left);
 977		if (ret <= 0)
 978			return ret;
 979		section->iov_len += ret;
 980	}
 981	if (section->iov_len == sec_len)
 982		*crc = crc32c(*crc, section->iov_base, section->iov_len);
 983
 984	return 1;
 985}
 986
 987static inline int read_partial_message_section(struct ceph_connection *con,
 988					       struct kvec *section,
 989					       unsigned int sec_len, u32 *crc)
 990{
 991	*crc = 0;
 992	return read_partial_message_chunk(con, section, sec_len, crc);
 993}
 994
 995static int read_partial_sparse_msg_extent(struct ceph_connection *con, u32 *crc)
 996{
 997	struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
 998	bool do_bounce = ceph_test_opt(from_msgr(con->msgr), RXBOUNCE);
 999
1000	if (do_bounce && unlikely(!con->bounce_page)) {
1001		con->bounce_page = alloc_page(GFP_NOIO);
1002		if (!con->bounce_page) {
1003			pr_err("failed to allocate bounce page\n");
1004			return -ENOMEM;
1005		}
1006	}
1007
1008	while (cursor->sr_resid > 0) {
1009		struct page *page, *rpage;
1010		size_t off, len;
1011		int ret;
1012
1013		page = ceph_msg_data_next(cursor, &off, &len);
1014		rpage = do_bounce ? con->bounce_page : page;
1015
1016		/* clamp to what remains in extent */
1017		len = min_t(int, len, cursor->sr_resid);
1018		ret = ceph_tcp_recvpage(con->sock, rpage, (int)off, len);
1019		if (ret <= 0)
1020			return ret;
1021		*crc = ceph_crc32c_page(*crc, rpage, off, ret);
1022		ceph_msg_data_advance(cursor, (size_t)ret);
1023		cursor->sr_resid -= ret;
1024		if (do_bounce)
1025			memcpy_page(page, off, rpage, off, ret);
1026	}
1027	return 1;
1028}
1029
1030static int read_partial_sparse_msg_data(struct ceph_connection *con)
1031{
1032	struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
1033	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1034	u32 crc = 0;
1035	int ret = 1;
1036
1037	if (do_datacrc)
1038		crc = con->in_data_crc;
1039
1040	while (cursor->total_resid) {
1041		if (con->v1.in_sr_kvec.iov_base)
1042			ret = read_partial_message_chunk(con,
1043							 &con->v1.in_sr_kvec,
1044							 con->v1.in_sr_len,
1045							 &crc);
1046		else if (cursor->sr_resid > 0)
1047			ret = read_partial_sparse_msg_extent(con, &crc);
1048		if (ret <= 0)
1049			break;
1050
1051		memset(&con->v1.in_sr_kvec, 0, sizeof(con->v1.in_sr_kvec));
1052		ret = con->ops->sparse_read(con, cursor,
1053				(char **)&con->v1.in_sr_kvec.iov_base);
1054		if (ret <= 0) {
1055			ret = ret ? ret : 1;  /* must return > 0 to indicate success */
1056			break;
1057		}
1058		con->v1.in_sr_len = ret;
1059	}
1060
1061	if (do_datacrc)
1062		con->in_data_crc = crc;
1063
1064	return ret;
1065}
1066
1067static int read_partial_msg_data(struct ceph_connection *con)
1068{
1069	struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
 
1070	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1071	struct page *page;
1072	size_t page_offset;
1073	size_t length;
1074	u32 crc = 0;
1075	int ret;
1076
 
 
 
1077	if (do_datacrc)
1078		crc = con->in_data_crc;
1079	while (cursor->total_resid) {
1080		if (!cursor->resid) {
1081			ceph_msg_data_advance(cursor, 0);
1082			continue;
1083		}
1084
1085		page = ceph_msg_data_next(cursor, &page_offset, &length);
1086		ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
1087		if (ret <= 0) {
1088			if (do_datacrc)
1089				con->in_data_crc = crc;
1090
1091			return ret;
1092		}
1093
1094		if (do_datacrc)
1095			crc = ceph_crc32c_page(crc, page, page_offset, ret);
1096		ceph_msg_data_advance(cursor, (size_t)ret);
1097	}
1098	if (do_datacrc)
1099		con->in_data_crc = crc;
1100
1101	return 1;	/* must return > 0 to indicate success */
1102}
1103
1104static int read_partial_msg_data_bounce(struct ceph_connection *con)
1105{
1106	struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
1107	struct page *page;
1108	size_t off, len;
1109	u32 crc;
1110	int ret;
1111
1112	if (unlikely(!con->bounce_page)) {
1113		con->bounce_page = alloc_page(GFP_NOIO);
1114		if (!con->bounce_page) {
1115			pr_err("failed to allocate bounce page\n");
1116			return -ENOMEM;
1117		}
1118	}
1119
1120	crc = con->in_data_crc;
1121	while (cursor->total_resid) {
1122		if (!cursor->resid) {
1123			ceph_msg_data_advance(cursor, 0);
1124			continue;
1125		}
1126
1127		page = ceph_msg_data_next(cursor, &off, &len);
1128		ret = ceph_tcp_recvpage(con->sock, con->bounce_page, 0, len);
1129		if (ret <= 0) {
1130			con->in_data_crc = crc;
1131			return ret;
1132		}
1133
1134		crc = crc32c(crc, page_address(con->bounce_page), ret);
1135		memcpy_to_page(page, off, page_address(con->bounce_page), ret);
1136
1137		ceph_msg_data_advance(cursor, ret);
1138	}
1139	con->in_data_crc = crc;
1140
1141	return 1;	/* must return > 0 to indicate success */
1142}
1143
1144/*
1145 * read (part of) a message.
1146 */
1147static int read_partial_message(struct ceph_connection *con)
1148{
1149	struct ceph_msg *m = con->in_msg;
1150	int size;
1151	int end;
1152	int ret;
1153	unsigned int front_len, middle_len, data_len;
1154	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1155	bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
1156	u64 seq;
1157	u32 crc;
1158
1159	dout("read_partial_message con %p msg %p\n", con, m);
1160
1161	/* header */
1162	size = sizeof(con->v1.in_hdr);
1163	end = size;
1164	ret = read_partial(con, end, size, &con->v1.in_hdr);
1165	if (ret <= 0)
1166		return ret;
1167
1168	crc = crc32c(0, &con->v1.in_hdr, offsetof(struct ceph_msg_header, crc));
1169	if (cpu_to_le32(crc) != con->v1.in_hdr.crc) {
1170		pr_err("read_partial_message bad hdr crc %u != expected %u\n",
1171		       crc, con->v1.in_hdr.crc);
1172		return -EBADMSG;
1173	}
1174
1175	front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1176	if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1177		return -EIO;
1178	middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1179	if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
1180		return -EIO;
1181	data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1182	if (data_len > CEPH_MSG_MAX_DATA_LEN)
1183		return -EIO;
1184
1185	/* verify seq# */
1186	seq = le64_to_cpu(con->v1.in_hdr.seq);
1187	if ((s64)seq - (s64)con->in_seq < 1) {
1188		pr_info("skipping %s%lld %s seq %lld expected %lld\n",
1189			ENTITY_NAME(con->peer_name),
1190			ceph_pr_addr(&con->peer_addr),
1191			seq, con->in_seq + 1);
1192		con->v1.in_base_pos = -front_len - middle_len - data_len -
1193				      sizeof_footer(con);
1194		con->v1.in_tag = CEPH_MSGR_TAG_READY;
1195		return 1;
1196	} else if ((s64)seq - (s64)con->in_seq > 1) {
1197		pr_err("read_partial_message bad seq %lld expected %lld\n",
1198		       seq, con->in_seq + 1);
1199		con->error_msg = "bad message sequence # for incoming message";
1200		return -EBADE;
1201	}
1202
1203	/* allocate message? */
1204	if (!con->in_msg) {
1205		int skip = 0;
1206
1207		dout("got hdr type %d front %d data %d\n", con->v1.in_hdr.type,
1208		     front_len, data_len);
1209		ret = ceph_con_in_msg_alloc(con, &con->v1.in_hdr, &skip);
1210		if (ret < 0)
1211			return ret;
1212
1213		BUG_ON((!con->in_msg) ^ skip);
1214		if (skip) {
1215			/* skip this message */
1216			dout("alloc_msg said skip message\n");
1217			con->v1.in_base_pos = -front_len - middle_len -
1218					      data_len - sizeof_footer(con);
1219			con->v1.in_tag = CEPH_MSGR_TAG_READY;
1220			con->in_seq++;
1221			return 1;
1222		}
1223
1224		BUG_ON(!con->in_msg);
1225		BUG_ON(con->in_msg->con != con);
1226		m = con->in_msg;
1227		m->front.iov_len = 0;    /* haven't read it yet */
1228		if (m->middle)
1229			m->middle->vec.iov_len = 0;
1230
1231		/* prepare for data payload, if any */
1232
1233		if (data_len)
1234			prepare_message_data(con->in_msg, data_len);
1235	}
1236
1237	/* front */
1238	ret = read_partial_message_section(con, &m->front, front_len,
1239					   &con->in_front_crc);
1240	if (ret <= 0)
1241		return ret;
1242
1243	/* middle */
1244	if (m->middle) {
1245		ret = read_partial_message_section(con, &m->middle->vec,
1246						   middle_len,
1247						   &con->in_middle_crc);
1248		if (ret <= 0)
1249			return ret;
1250	}
1251
1252	/* (page) data */
1253	if (data_len) {
1254		if (!m->num_data_items)
1255			return -EIO;
1256
1257		if (m->sparse_read_total)
1258			ret = read_partial_sparse_msg_data(con);
1259		else if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
1260			ret = read_partial_msg_data_bounce(con);
1261		else
1262			ret = read_partial_msg_data(con);
1263		if (ret <= 0)
1264			return ret;
1265	}
1266
1267	/* footer */
1268	size = sizeof_footer(con);
1269	end += size;
1270	ret = read_partial(con, end, size, &m->footer);
1271	if (ret <= 0)
1272		return ret;
1273
1274	if (!need_sign) {
1275		m->footer.flags = m->old_footer.flags;
1276		m->footer.sig = 0;
1277	}
1278
1279	dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1280	     m, front_len, m->footer.front_crc, middle_len,
1281	     m->footer.middle_crc, data_len, m->footer.data_crc);
1282
1283	/* crc ok? */
1284	if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
1285		pr_err("read_partial_message %p front crc %u != exp. %u\n",
1286		       m, con->in_front_crc, m->footer.front_crc);
1287		return -EBADMSG;
1288	}
1289	if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
1290		pr_err("read_partial_message %p middle crc %u != exp %u\n",
1291		       m, con->in_middle_crc, m->footer.middle_crc);
1292		return -EBADMSG;
1293	}
1294	if (do_datacrc &&
1295	    (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
1296	    con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
1297		pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
1298		       con->in_data_crc, le32_to_cpu(m->footer.data_crc));
1299		return -EBADMSG;
1300	}
1301
1302	if (need_sign && con->ops->check_message_signature &&
1303	    con->ops->check_message_signature(m)) {
1304		pr_err("read_partial_message %p signature check failed\n", m);
1305		return -EBADMSG;
1306	}
1307
1308	return 1; /* done! */
1309}
1310
1311static int read_keepalive_ack(struct ceph_connection *con)
1312{
1313	struct ceph_timespec ceph_ts;
1314	size_t size = sizeof(ceph_ts);
1315	int ret = read_partial(con, size, size, &ceph_ts);
1316	if (ret <= 0)
1317		return ret;
1318	ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts);
1319	prepare_read_tag(con);
1320	return 1;
1321}
1322
1323/*
1324 * Read what we can from the socket.
1325 */
1326int ceph_con_v1_try_read(struct ceph_connection *con)
1327{
1328	int ret = -1;
1329
1330more:
1331	dout("try_read start %p state %d\n", con, con->state);
1332	if (con->state != CEPH_CON_S_V1_BANNER &&
1333	    con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1334	    con->state != CEPH_CON_S_OPEN)
1335		return 0;
1336
1337	BUG_ON(!con->sock);
1338
1339	dout("try_read tag %d in_base_pos %d\n", con->v1.in_tag,
1340	     con->v1.in_base_pos);
1341
1342	if (con->state == CEPH_CON_S_V1_BANNER) {
1343		ret = read_partial_banner(con);
1344		if (ret <= 0)
1345			goto out;
1346		ret = process_banner(con);
1347		if (ret < 0)
1348			goto out;
1349
1350		con->state = CEPH_CON_S_V1_CONNECT_MSG;
1351
1352		/*
1353		 * Received banner is good, exchange connection info.
1354		 * Do not reset out_kvec, as sending our banner raced
1355		 * with receiving peer banner after connect completed.
1356		 */
1357		ret = prepare_write_connect(con);
1358		if (ret < 0)
1359			goto out;
1360		prepare_read_connect(con);
1361
1362		/* Send connection info before awaiting response */
1363		goto out;
1364	}
1365
1366	if (con->state == CEPH_CON_S_V1_CONNECT_MSG) {
1367		ret = read_partial_connect(con);
1368		if (ret <= 0)
1369			goto out;
1370		ret = process_connect(con);
1371		if (ret < 0)
1372			goto out;
1373		goto more;
1374	}
1375
1376	WARN_ON(con->state != CEPH_CON_S_OPEN);
1377
1378	if (con->v1.in_base_pos < 0) {
1379		/*
1380		 * skipping + discarding content.
1381		 */
1382		ret = ceph_tcp_recvmsg(con->sock, NULL, -con->v1.in_base_pos);
1383		if (ret <= 0)
1384			goto out;
1385		dout("skipped %d / %d bytes\n", ret, -con->v1.in_base_pos);
1386		con->v1.in_base_pos += ret;
1387		if (con->v1.in_base_pos)
1388			goto more;
1389	}
1390	if (con->v1.in_tag == CEPH_MSGR_TAG_READY) {
1391		/*
1392		 * what's next?
1393		 */
1394		ret = ceph_tcp_recvmsg(con->sock, &con->v1.in_tag, 1);
1395		if (ret <= 0)
1396			goto out;
1397		dout("try_read got tag %d\n", con->v1.in_tag);
1398		switch (con->v1.in_tag) {
1399		case CEPH_MSGR_TAG_MSG:
1400			prepare_read_message(con);
1401			break;
1402		case CEPH_MSGR_TAG_ACK:
1403			prepare_read_ack(con);
1404			break;
1405		case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
1406			prepare_read_keepalive_ack(con);
1407			break;
1408		case CEPH_MSGR_TAG_CLOSE:
1409			ceph_con_close_socket(con);
1410			con->state = CEPH_CON_S_CLOSED;
1411			goto out;
1412		default:
1413			goto bad_tag;
1414		}
1415	}
1416	if (con->v1.in_tag == CEPH_MSGR_TAG_MSG) {
1417		ret = read_partial_message(con);
1418		if (ret <= 0) {
1419			switch (ret) {
1420			case -EBADMSG:
1421				con->error_msg = "bad crc/signature";
1422				fallthrough;
1423			case -EBADE:
1424				ret = -EIO;
1425				break;
1426			case -EIO:
1427				con->error_msg = "io error";
1428				break;
1429			}
1430			goto out;
1431		}
1432		if (con->v1.in_tag == CEPH_MSGR_TAG_READY)
1433			goto more;
1434		ceph_con_process_message(con);
1435		if (con->state == CEPH_CON_S_OPEN)
1436			prepare_read_tag(con);
1437		goto more;
1438	}
1439	if (con->v1.in_tag == CEPH_MSGR_TAG_ACK ||
1440	    con->v1.in_tag == CEPH_MSGR_TAG_SEQ) {
1441		/*
1442		 * the final handshake seq exchange is semantically
1443		 * equivalent to an ACK
1444		 */
1445		ret = read_partial_ack(con);
1446		if (ret <= 0)
1447			goto out;
1448		process_ack(con);
1449		goto more;
1450	}
1451	if (con->v1.in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
1452		ret = read_keepalive_ack(con);
1453		if (ret <= 0)
1454			goto out;
1455		goto more;
1456	}
1457
1458out:
1459	dout("try_read done on %p ret %d\n", con, ret);
1460	return ret;
1461
1462bad_tag:
1463	pr_err("try_read bad tag %d\n", con->v1.in_tag);
1464	con->error_msg = "protocol error, garbage tag";
1465	ret = -1;
1466	goto out;
1467}
1468
1469/*
1470 * Write something to the socket.  Called in a worker thread when the
1471 * socket appears to be writeable and we have something ready to send.
1472 */
1473int ceph_con_v1_try_write(struct ceph_connection *con)
1474{
1475	int ret = 1;
1476
1477	dout("try_write start %p state %d\n", con, con->state);
1478	if (con->state != CEPH_CON_S_PREOPEN &&
1479	    con->state != CEPH_CON_S_V1_BANNER &&
1480	    con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1481	    con->state != CEPH_CON_S_OPEN)
1482		return 0;
1483
1484	/* open the socket first? */
1485	if (con->state == CEPH_CON_S_PREOPEN) {
1486		BUG_ON(con->sock);
1487		con->state = CEPH_CON_S_V1_BANNER;
1488
1489		con_out_kvec_reset(con);
1490		prepare_write_banner(con);
1491		prepare_read_banner(con);
1492
1493		BUG_ON(con->in_msg);
1494		con->v1.in_tag = CEPH_MSGR_TAG_READY;
1495		dout("try_write initiating connect on %p new state %d\n",
1496		     con, con->state);
1497		ret = ceph_tcp_connect(con);
1498		if (ret < 0) {
1499			con->error_msg = "connect error";
1500			goto out;
1501		}
1502	}
1503
1504more:
1505	dout("try_write out_kvec_bytes %d\n", con->v1.out_kvec_bytes);
1506	BUG_ON(!con->sock);
1507
1508	/* kvec data queued? */
1509	if (con->v1.out_kvec_left) {
1510		ret = write_partial_kvec(con);
1511		if (ret <= 0)
1512			goto out;
1513	}
1514	if (con->v1.out_skip) {
1515		ret = write_partial_skip(con);
1516		if (ret <= 0)
1517			goto out;
1518	}
1519
1520	/* msg pages? */
1521	if (con->out_msg) {
1522		if (con->v1.out_msg_done) {
1523			ceph_msg_put(con->out_msg);
1524			con->out_msg = NULL;   /* we're done with this one */
1525			goto do_next;
1526		}
1527
1528		ret = write_partial_message_data(con);
1529		if (ret == 1)
1530			goto more;  /* we need to send the footer, too! */
1531		if (ret == 0)
1532			goto out;
1533		if (ret < 0) {
1534			dout("try_write write_partial_message_data err %d\n",
1535			     ret);
1536			goto out;
1537		}
1538	}
1539
1540do_next:
1541	if (con->state == CEPH_CON_S_OPEN) {
1542		if (ceph_con_flag_test_and_clear(con,
1543				CEPH_CON_F_KEEPALIVE_PENDING)) {
1544			prepare_write_keepalive(con);
1545			goto more;
1546		}
1547		/* is anything else pending? */
1548		if (!list_empty(&con->out_queue)) {
1549			prepare_write_message(con);
1550			goto more;
1551		}
1552		if (con->in_seq > con->in_seq_acked) {
1553			prepare_write_ack(con);
1554			goto more;
1555		}
1556	}
1557
1558	/* Nothing to do! */
1559	ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
1560	dout("try_write nothing else to write.\n");
1561	ret = 0;
1562out:
1563	dout("try_write done on %p ret %d\n", con, ret);
1564	return ret;
1565}
1566
1567void ceph_con_v1_revoke(struct ceph_connection *con)
1568{
1569	struct ceph_msg *msg = con->out_msg;
1570
1571	WARN_ON(con->v1.out_skip);
1572	/* footer */
1573	if (con->v1.out_msg_done) {
1574		con->v1.out_skip += con_out_kvec_skip(con);
1575	} else {
1576		WARN_ON(!msg->data_length);
1577		con->v1.out_skip += sizeof_footer(con);
1578	}
1579	/* data, middle, front */
1580	if (msg->data_length)
1581		con->v1.out_skip += msg->cursor.total_resid;
1582	if (msg->middle)
1583		con->v1.out_skip += con_out_kvec_skip(con);
1584	con->v1.out_skip += con_out_kvec_skip(con);
1585
1586	dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
1587	     con->v1.out_kvec_bytes, con->v1.out_skip);
1588}
1589
1590void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
1591{
1592	unsigned int front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1593	unsigned int middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1594	unsigned int data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1595
1596	/* skip rest of message */
1597	con->v1.in_base_pos = con->v1.in_base_pos -
1598			sizeof(struct ceph_msg_header) -
1599			front_len -
1600			middle_len -
1601			data_len -
1602			sizeof(struct ceph_msg_footer);
1603
1604	con->v1.in_tag = CEPH_MSGR_TAG_READY;
1605	con->in_seq++;
1606
1607	dout("%s con %p in_base_pos %d\n", __func__, con, con->v1.in_base_pos);
1608}
1609
1610bool ceph_con_v1_opened(struct ceph_connection *con)
1611{
1612	return con->v1.connect_seq;
1613}
1614
1615void ceph_con_v1_reset_session(struct ceph_connection *con)
1616{
1617	con->v1.connect_seq = 0;
1618	con->v1.peer_global_seq = 0;
1619}
1620
1621void ceph_con_v1_reset_protocol(struct ceph_connection *con)
1622{
1623	con->v1.out_skip = 0;
1624}