Linux Audio

Check our new training course

Loading...
  1// SPDX-License-Identifier: GPL-2.0-only
  2/******************************************************************************
  3*******************************************************************************
  4**
  5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  6**  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
  7**
  8**
  9*******************************************************************************
 10******************************************************************************/
 11
 12/*
 13 * midcomms.c
 14 *
 15 * This is the appallingly named "mid-level" comms layer.
 
 
 16 *
 17 * Its purpose is to take packets from the "real" comms layer,
 18 * split them up into packets and pass them to the interested
 19 * part of the locking mechanism.
 20 *
 21 * It also takes messages from the locking layer, formats them
 22 * into packets and sends them to the comms layer.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 23 */
 
 
 
 
 24
 25#include "dlm_internal.h"
 
 26#include "lowcomms.h"
 27#include "config.h"
 
 28#include "lock.h"
 
 29#include "midcomms.h"
 30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 31
 32static void copy_from_cb(void *dst, const void *base, unsigned offset,
 33			 unsigned len, unsigned limit)
 34{
 35	unsigned copy = len;
 
 
 36
 37	if ((copy + offset) > limit)
 38		copy = limit - offset;
 39	memcpy(dst, base + offset, copy);
 40	len -= copy;
 41	if (len)
 42		memcpy(dst + copy, base, len);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 43}
 44
 45/*
 46 * Called from the low-level comms layer to process a buffer of
 47 * commands.
 48 *
 49 * Only complete messages are processed here, any "spare" bytes from
 50 * the end of a buffer are saved and tacked onto the front of the next
 51 * message that comes in. I doubt this will happen very often but we
 52 * need to be able to cope with it and I don't want the task to be waiting
 53 * for packets to come in when there is useful work to be done.
 54 */
 55
 56int dlm_process_incoming_buffer(int nodeid, const void *base,
 57				unsigned offset, unsigned len, unsigned limit)
 58{
 59	union {
 60		unsigned char __buf[DLM_INBUF_LEN];
 61		/* this is to force proper alignment on some arches */
 62		union dlm_packet p;
 63	} __tmp;
 64	union dlm_packet *p = &__tmp.p;
 65	int ret = 0;
 66	int err = 0;
 67	uint16_t msglen;
 68	uint32_t lockspace;
 69
 70	while (len > sizeof(struct dlm_header)) {
 
 
 
 71
 72		/* Copy just the header to check the total length.  The
 73		   message may wrap around the end of the buffer back to the
 74		   start, so we need to use a temp buffer and copy_from_cb. */
 
 75
 76		copy_from_cb(p, base, offset, sizeof(struct dlm_header),
 77			     limit);
 
 
 78
 79		msglen = le16_to_cpu(p->header.h_length);
 80		lockspace = p->header.h_lockspace;
 
 
 
 
 
 
 
 
 
 81
 82		err = -EINVAL;
 83		if (msglen < sizeof(struct dlm_header))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 84			break;
 85		if (p->header.h_cmd == DLM_MSG) {
 86			if (msglen < sizeof(struct dlm_message))
 87				break;
 
 
 
 
 88		} else {
 89			if (msglen < sizeof(struct dlm_rcom))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 90				break;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 91		}
 92		err = -E2BIG;
 93		if (msglen > dlm_config.ci_buffer_size) {
 94			log_print("message size %d from %d too big, buf len %d",
 95				  msglen, nodeid, len);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 96			break;
 97		}
 98		err = 0;
 99
100		/* If only part of the full message is contained in this
101		   buffer, then do nothing and wait for lowcomms to call
102		   us again later with more data.  We return 0 meaning
103		   we've consumed none of the input buffer. */
104
105		if (msglen > len)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
106			break;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
108		/* Allocate a larger temp buffer if the full message won't fit
109		   in the buffer on the stack (which should work for most
110		   ordinary messages). */
111
112		if (msglen > sizeof(__tmp) && p == &__tmp.p) {
113			p = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
114			if (p == NULL)
115				return ret;
 
 
 
 
 
 
 
 
 
 
 
 
 
116		}
117
118		copy_from_cb(p, base, offset, msglen, limit);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
119
120		BUG_ON(lockspace != p->header.h_lockspace);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
122		ret += msglen;
123		offset += msglen;
124		offset &= (limit - 1);
125		len -= msglen;
 
 
126
127		dlm_receive_buffer(p, nodeid);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
128	}
129
130	if (p != &__tmp.p)
131		kfree(p);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
133	return err ? err : ret;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
134}
135
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2004-2021 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12/*
  13 * midcomms.c
  14 *
  15 * This is the appallingly named "mid-level" comms layer. It takes care about
  16 * deliver an on application layer "reliable" communication above the used
  17 * lowcomms transport layer.
  18 *
  19 * How it works:
 
 
  20 *
  21 * Each nodes keeps track of all send DLM messages in send_queue with a sequence
  22 * number. The receive will send an DLM_ACK message back for every DLM message
  23 * received at the other side. If a reconnect happens in lowcomms we will send
  24 * all unacknowledged dlm messages again. The receiving side might drop any already
  25 * received message by comparing sequence numbers.
  26 *
  27 * How version detection works:
  28 *
  29 * Due the fact that dlm has pre-configured node addresses on every side
  30 * it is in it's nature that every side connects at starts to transmit
  31 * dlm messages which ends in a race. However DLM_RCOM_NAMES, DLM_RCOM_STATUS
  32 * and their replies are the first messages which are exchanges. Due backwards
  33 * compatibility these messages are not covered by the midcomms re-transmission
  34 * layer. These messages have their own re-transmission handling in the dlm
  35 * application layer. The version field of every node will be set on these RCOM
  36 * messages as soon as they arrived and the node isn't yet part of the nodes
  37 * hash. There exists also logic to detect version mismatched if something weird
  38 * going on or the first messages isn't an expected one.
  39 *
  40 * Termination:
  41 *
  42 * The midcomms layer does a 4 way handshake for termination on DLM protocol
  43 * like TCP supports it with half-closed socket support. SCTP doesn't support
  44 * half-closed socket, so we do it on DLM layer. Also socket shutdown() can be
  45 * interrupted by .e.g. tcp reset itself. Additional there exists the othercon
  46 * paradigm in lowcomms which cannot be easily without breaking backwards
  47 * compatibility. A node cannot send anything to another node when a DLM_FIN
  48 * message was send. There exists additional logic to print a warning if
  49 * DLM wants to do it. There exists a state handling like RFC 793 but reduced
  50 * to termination only. The event "member removal event" describes the cluster
  51 * manager removed the node from internal lists, at this point DLM does not
  52 * send any message to the other node. There exists two cases:
  53 *
  54 * 1. The cluster member was removed and we received a FIN
  55 * OR
  56 * 2. We received a FIN but the member was not removed yet
  57 *
  58 * One of these cases will do the CLOSE_WAIT to LAST_ACK change.
  59 *
  60 *
  61 *                              +---------+
  62 *                              | CLOSED  |
  63 *                              +---------+
  64 *                                   | add member/receive RCOM version
  65 *                                   |            detection msg
  66 *                                   V
  67 *                              +---------+
  68 *                              |  ESTAB  |
  69 *                              +---------+
  70 *                       CLOSE    |     |    rcv FIN
  71 *                      -------   |     |    -------
  72 * +---------+          snd FIN  /       \   snd ACK          +---------+
  73 * |  FIN    |<-----------------           ------------------>|  CLOSE  |
  74 * | WAIT-1  |------------------                              |   WAIT  |
  75 * +---------+          rcv FIN  \                            +---------+
  76 * | rcv ACK of FIN   -------   |                            CLOSE  | member
  77 * | --------------   snd ACK   |                           ------- | removal
  78 * V        x                   V                           snd FIN V event
  79 * +---------+                  +---------+                   +---------+
  80 * |FINWAIT-2|                  | CLOSING |                   | LAST-ACK|
  81 * +---------+                  +---------+                   +---------+
  82 * |                rcv ACK of FIN |                 rcv ACK of FIN |
  83 * |  rcv FIN       -------------- |                 -------------- |
  84 * |  -------              x       V                        x       V
  85 *  \ snd ACK                 +---------+                   +---------+
  86 *   ------------------------>| CLOSED  |                   | CLOSED  |
  87 *                            +---------+                   +---------+
  88 *
  89 * NOTE: any state can interrupted by midcomms_close() and state will be
  90 * switched to CLOSED in case of fencing. There exists also some timeout
  91 * handling when we receive the version detection RCOM messages which is
  92 * made by observation.
  93 *
  94 * Future improvements:
  95 *
  96 * There exists some known issues/improvements of the dlm handling. Some
  97 * of them should be done in a next major dlm version bump which makes
  98 * it incompatible with previous versions.
  99 *
 100 * Unaligned memory access:
 101 *
 102 * There exists cases when the dlm message buffer length is not aligned
 103 * to 8 byte. However seems nobody detected any problem with it. This
 104 * can be fixed in the next major version bump of dlm.
 105 *
 106 * Version detection:
 107 *
 108 * The version detection and how it's done is related to backwards
 109 * compatibility. There exists better ways to make a better handling.
 110 * However this should be changed in the next major version bump of dlm.
 111 *
 112 * Tail Size checking:
 113 *
 114 * There exists a message tail payload in e.g. DLM_MSG however we don't
 115 * check it against the message length yet regarding to the receive buffer
 116 * length. That need to be validated.
 117 *
 118 * Fencing bad nodes:
 119 *
 120 * At timeout places or weird sequence number behaviours we should send
 121 * a fencing request to the cluster manager.
 122 */
 123
 124/* Debug switch to enable a 5 seconds sleep waiting of a termination.
 125 * This can be useful to test fencing while termination is running.
 126 * This requires a setup with only gfs2 as dlm user, so that the
 127 * last umount will terminate the connection.
 128 *
 129 * However it became useful to test, while the 5 seconds block in umount
 130 * just press the reset button. In a lot of dropping the termination
 131 * process can could take several seconds.
 132 */
 133#define DLM_DEBUG_FENCE_TERMINATION	0
 134
 135#include <trace/events/dlm.h>
 136#include <net/tcp.h>
 137
 138#include "dlm_internal.h"
 139#include "lockspace.h"
 140#include "lowcomms.h"
 141#include "config.h"
 142#include "memory.h"
 143#include "lock.h"
 144#include "util.h"
 145#include "midcomms.h"
 146
 147/* init value for sequence numbers for testing purpose only e.g. overflows */
 148#define DLM_SEQ_INIT		0
 149/* 3 minutes wait to sync ending of dlm */
 150#define DLM_SHUTDOWN_TIMEOUT	msecs_to_jiffies(3 * 60 * 1000)
 151#define DLM_VERSION_NOT_SET	0
 152
 153struct midcomms_node {
 154	int nodeid;
 155	uint32_t version;
 156	uint32_t seq_send;
 157	uint32_t seq_next;
 158	/* These queues are unbound because we cannot drop any message in dlm.
 159	 * We could send a fence signal for a specific node to the cluster
 160	 * manager if queues hits some maximum value, however this handling
 161	 * not supported yet.
 162	 */
 163	struct list_head send_queue;
 164	spinlock_t send_queue_lock;
 165	atomic_t send_queue_cnt;
 166#define DLM_NODE_FLAG_CLOSE	1
 167#define DLM_NODE_FLAG_STOP_TX	2
 168#define DLM_NODE_FLAG_STOP_RX	3
 169#define DLM_NODE_ULP_DELIVERED	4
 170	unsigned long flags;
 171	wait_queue_head_t shutdown_wait;
 172
 173	/* dlm tcp termination state */
 174#define DLM_CLOSED	1
 175#define DLM_ESTABLISHED	2
 176#define DLM_FIN_WAIT1	3
 177#define DLM_FIN_WAIT2	4
 178#define DLM_CLOSE_WAIT	5
 179#define DLM_LAST_ACK	6
 180#define DLM_CLOSING	7
 181	int state;
 182	spinlock_t state_lock;
 183
 184	/* counts how many lockspaces are using this node
 185	 * this refcount is necessary to determine if the
 186	 * node wants to disconnect.
 187	 */
 188	int users;
 189
 190	/* not protected by srcu, node_hash lifetime */
 191	void *debugfs;
 192
 193	struct hlist_node hlist;
 194	struct rcu_head rcu;
 195};
 196
 197struct dlm_mhandle {
 198	const union dlm_packet *inner_p;
 199	struct midcomms_node *node;
 200	struct dlm_opts *opts;
 201	struct dlm_msg *msg;
 202	bool committed;
 203	uint32_t seq;
 204
 205	void (*ack_rcv)(struct midcomms_node *node);
 206
 207	/* get_mhandle/commit srcu idx exchange */
 208	int idx;
 209
 210	struct list_head list;
 211	struct rcu_head rcu;
 212};
 213
 214static struct hlist_head node_hash[CONN_HASH_SIZE];
 215static DEFINE_SPINLOCK(nodes_lock);
 216DEFINE_STATIC_SRCU(nodes_srcu);
 217
 218/* This mutex prevents that midcomms_close() is running while
 219 * stop() or remove(). As I experienced invalid memory access
 220 * behaviours when DLM_DEBUG_FENCE_TERMINATION is enabled and
 221 * resetting machines. I will end in some double deletion in nodes
 222 * datastructure.
 223 */
 224static DEFINE_MUTEX(close_lock);
 225
 226struct kmem_cache *dlm_midcomms_cache_create(void)
 
 227{
 228	return kmem_cache_create("dlm_mhandle", sizeof(struct dlm_mhandle),
 229				 0, 0, NULL);
 230}
 231
 232static inline const char *dlm_state_str(int state)
 233{
 234	switch (state) {
 235	case DLM_CLOSED:
 236		return "CLOSED";
 237	case DLM_ESTABLISHED:
 238		return "ESTABLISHED";
 239	case DLM_FIN_WAIT1:
 240		return "FIN_WAIT1";
 241	case DLM_FIN_WAIT2:
 242		return "FIN_WAIT2";
 243	case DLM_CLOSE_WAIT:
 244		return "CLOSE_WAIT";
 245	case DLM_LAST_ACK:
 246		return "LAST_ACK";
 247	case DLM_CLOSING:
 248		return "CLOSING";
 249	default:
 250		return "UNKNOWN";
 251	}
 252}
 253
 254const char *dlm_midcomms_state(struct midcomms_node *node)
 255{
 256	return dlm_state_str(node->state);
 257}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 258
 259unsigned long dlm_midcomms_flags(struct midcomms_node *node)
 260{
 261	return node->flags;
 262}
 263
 264int dlm_midcomms_send_queue_cnt(struct midcomms_node *node)
 265{
 266	return atomic_read(&node->send_queue_cnt);
 267}
 268
 269uint32_t dlm_midcomms_version(struct midcomms_node *node)
 270{
 271	return node->version;
 272}
 273
 274static struct midcomms_node *__find_node(int nodeid, int r)
 275{
 276	struct midcomms_node *node;
 277
 278	hlist_for_each_entry_rcu(node, &node_hash[r], hlist) {
 279		if (node->nodeid == nodeid)
 280			return node;
 281	}
 282
 283	return NULL;
 284}
 285
 286static void dlm_mhandle_release(struct rcu_head *rcu)
 287{
 288	struct dlm_mhandle *mh = container_of(rcu, struct dlm_mhandle, rcu);
 289
 290	dlm_lowcomms_put_msg(mh->msg);
 291	dlm_free_mhandle(mh);
 292}
 293
 294static void dlm_mhandle_delete(struct midcomms_node *node,
 295			       struct dlm_mhandle *mh)
 296{
 297	list_del_rcu(&mh->list);
 298	atomic_dec(&node->send_queue_cnt);
 299	call_rcu(&mh->rcu, dlm_mhandle_release);
 300}
 301
 302static void dlm_send_queue_flush(struct midcomms_node *node)
 303{
 304	struct dlm_mhandle *mh;
 305
 306	pr_debug("flush midcomms send queue of node %d\n", node->nodeid);
 307
 308	rcu_read_lock();
 309	spin_lock_bh(&node->send_queue_lock);
 310	list_for_each_entry_rcu(mh, &node->send_queue, list) {
 311		dlm_mhandle_delete(node, mh);
 312	}
 313	spin_unlock_bh(&node->send_queue_lock);
 314	rcu_read_unlock();
 315}
 316
 317static void midcomms_node_reset(struct midcomms_node *node)
 318{
 319	pr_debug("reset node %d\n", node->nodeid);
 320
 321	node->seq_next = DLM_SEQ_INIT;
 322	node->seq_send = DLM_SEQ_INIT;
 323	node->version = DLM_VERSION_NOT_SET;
 324	node->flags = 0;
 325
 326	dlm_send_queue_flush(node);
 327	node->state = DLM_CLOSED;
 328	wake_up(&node->shutdown_wait);
 329}
 330
 331static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc)
 332{
 333	struct midcomms_node *node, *tmp;
 334	int r = nodeid_hash(nodeid);
 335
 336	node = __find_node(nodeid, r);
 337	if (node || !alloc)
 338		return node;
 339
 340	node = kmalloc(sizeof(*node), alloc);
 341	if (!node)
 342		return NULL;
 343
 344	node->nodeid = nodeid;
 345	spin_lock_init(&node->state_lock);
 346	spin_lock_init(&node->send_queue_lock);
 347	atomic_set(&node->send_queue_cnt, 0);
 348	INIT_LIST_HEAD(&node->send_queue);
 349	init_waitqueue_head(&node->shutdown_wait);
 350	node->users = 0;
 351	midcomms_node_reset(node);
 352
 353	spin_lock(&nodes_lock);
 354	/* check again if there was somebody else
 355	 * earlier here to add the node
 356	 */
 357	tmp = __find_node(nodeid, r);
 358	if (tmp) {
 359		spin_unlock(&nodes_lock);
 360		kfree(node);
 361		return tmp;
 362	}
 363
 364	hlist_add_head_rcu(&node->hlist, &node_hash[r]);
 365	spin_unlock(&nodes_lock);
 366
 367	node->debugfs = dlm_create_debug_comms_file(nodeid, node);
 368	return node;
 369}
 370
 371static int dlm_send_ack(int nodeid, uint32_t seq)
 372{
 373	int mb_len = sizeof(struct dlm_header);
 374	struct dlm_header *m_header;
 375	struct dlm_msg *msg;
 376	char *ppc;
 377
 378	msg = dlm_lowcomms_new_msg(nodeid, mb_len, GFP_NOFS, &ppc,
 379				   NULL, NULL);
 380	if (!msg)
 381		return -ENOMEM;
 382
 383	m_header = (struct dlm_header *)ppc;
 384
 385	m_header->h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
 386	m_header->h_nodeid = cpu_to_le32(dlm_our_nodeid());
 387	m_header->h_length = cpu_to_le16(mb_len);
 388	m_header->h_cmd = DLM_ACK;
 389	m_header->u.h_seq = cpu_to_le32(seq);
 390
 391	dlm_lowcomms_commit_msg(msg);
 392	dlm_lowcomms_put_msg(msg);
 393
 394	return 0;
 395}
 396
 397static int dlm_send_fin(struct midcomms_node *node,
 398			void (*ack_rcv)(struct midcomms_node *node))
 399{
 400	int mb_len = sizeof(struct dlm_header);
 401	struct dlm_header *m_header;
 402	struct dlm_mhandle *mh;
 403	char *ppc;
 404
 405	mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_NOFS, &ppc);
 406	if (!mh)
 407		return -ENOMEM;
 408
 409	mh->ack_rcv = ack_rcv;
 410
 411	m_header = (struct dlm_header *)ppc;
 412
 413	m_header->h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
 414	m_header->h_nodeid = cpu_to_le32(dlm_our_nodeid());
 415	m_header->h_length = cpu_to_le16(mb_len);
 416	m_header->h_cmd = DLM_FIN;
 417
 418	pr_debug("sending fin msg to node %d\n", node->nodeid);
 419	dlm_midcomms_commit_mhandle(mh, NULL, 0);
 420	set_bit(DLM_NODE_FLAG_STOP_TX, &node->flags);
 421
 422	return 0;
 423}
 424
 425static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
 426{
 427	struct dlm_mhandle *mh;
 428
 429	rcu_read_lock();
 430	list_for_each_entry_rcu(mh, &node->send_queue, list) {
 431		if (before(mh->seq, seq)) {
 432			if (mh->ack_rcv)
 433				mh->ack_rcv(node);
 434		} else {
 435			/* send queue should be ordered */
 436			break;
 437		}
 438	}
 439
 440	spin_lock_bh(&node->send_queue_lock);
 441	list_for_each_entry_rcu(mh, &node->send_queue, list) {
 442		if (before(mh->seq, seq)) {
 443			dlm_mhandle_delete(node, mh);
 444		} else {
 445			/* send queue should be ordered */
 446			break;
 447		}
 448	}
 449	spin_unlock_bh(&node->send_queue_lock);
 450	rcu_read_unlock();
 451}
 452
 453static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
 454{
 455	spin_lock(&node->state_lock);
 456	pr_debug("receive passive fin ack from node %d with state %s\n",
 457		 node->nodeid, dlm_state_str(node->state));
 458
 459	switch (node->state) {
 460	case DLM_LAST_ACK:
 461		/* DLM_CLOSED */
 462		midcomms_node_reset(node);
 463		break;
 464	case DLM_CLOSED:
 465		/* not valid but somehow we got what we want */
 466		wake_up(&node->shutdown_wait);
 467		break;
 468	default:
 469		spin_unlock(&node->state_lock);
 470		log_print("%s: unexpected state: %d\n",
 471			  __func__, node->state);
 472		WARN_ON_ONCE(1);
 473		return;
 474	}
 475	spin_unlock(&node->state_lock);
 476}
 477
 478static void dlm_receive_buffer_3_2_trace(uint32_t seq, union dlm_packet *p)
 479{
 480	switch (p->header.h_cmd) {
 481	case DLM_MSG:
 482		trace_dlm_recv_message(dlm_our_nodeid(), seq, &p->message);
 483		break;
 484	case DLM_RCOM:
 485		trace_dlm_recv_rcom(dlm_our_nodeid(), seq, &p->rcom);
 486		break;
 487	default:
 488		break;
 489	}
 490}
 491
 492static void dlm_midcomms_receive_buffer(union dlm_packet *p,
 493					struct midcomms_node *node,
 494					uint32_t seq)
 495{
 496	if (seq == node->seq_next) {
 497		node->seq_next++;
 498
 499		switch (p->header.h_cmd) {
 500		case DLM_FIN:
 501			/* send ack before fin */
 502			dlm_send_ack(node->nodeid, node->seq_next);
 503
 504			spin_lock(&node->state_lock);
 505			pr_debug("receive fin msg from node %d with state %s\n",
 506				 node->nodeid, dlm_state_str(node->state));
 507
 508			switch (node->state) {
 509			case DLM_ESTABLISHED:
 510				node->state = DLM_CLOSE_WAIT;
 511				pr_debug("switch node %d to state %s\n",
 512					 node->nodeid, dlm_state_str(node->state));
 513				/* passive shutdown DLM_LAST_ACK case 1
 514				 * additional we check if the node is used by
 515				 * cluster manager events at all.
 516				 */
 517				if (node->users == 0) {
 518					node->state = DLM_LAST_ACK;
 519					pr_debug("switch node %d to state %s case 1\n",
 520						 node->nodeid, dlm_state_str(node->state));
 521					spin_unlock(&node->state_lock);
 522					goto send_fin;
 523				}
 524				break;
 525			case DLM_FIN_WAIT1:
 526				node->state = DLM_CLOSING;
 527				pr_debug("switch node %d to state %s\n",
 528					 node->nodeid, dlm_state_str(node->state));
 529				break;
 530			case DLM_FIN_WAIT2:
 531				midcomms_node_reset(node);
 532				pr_debug("switch node %d to state %s\n",
 533					 node->nodeid, dlm_state_str(node->state));
 534				wake_up(&node->shutdown_wait);
 535				break;
 536			case DLM_LAST_ACK:
 537				/* probably remove_member caught it, do nothing */
 538				break;
 539			default:
 540				spin_unlock(&node->state_lock);
 541				log_print("%s: unexpected state: %d\n",
 542					  __func__, node->state);
 543				WARN_ON_ONCE(1);
 544				return;
 545			}
 546			spin_unlock(&node->state_lock);
 547
 548			set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
 549			break;
 550		default:
 551			WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
 552			dlm_receive_buffer_3_2_trace(seq, p);
 553			dlm_receive_buffer(p, node->nodeid);
 554			set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
 555			break;
 556		}
 557	} else {
 558		/* retry to ack message which we already have by sending back
 559		 * current node->seq_next number as ack.
 560		 */
 561		if (seq < node->seq_next)
 562			dlm_send_ack(node->nodeid, node->seq_next);
 563
 564		log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d",
 565				      seq, node->seq_next, node->nodeid);
 566	}
 567
 568	return;
 569
 570send_fin:
 571	set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
 572	dlm_send_fin(node, dlm_pas_fin_ack_rcv);
 573}
 574
 575static struct midcomms_node *
 576dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p,
 577			      uint16_t msglen, int (*cb)(struct midcomms_node *node))
 578{
 579	struct midcomms_node *node = NULL;
 580	gfp_t allocation = 0;
 581	int ret;
 582
 583	switch (p->header.h_cmd) {
 584	case DLM_RCOM:
 585		if (msglen < sizeof(struct dlm_rcom)) {
 586			log_print("rcom msg too small: %u, will skip this message from node %d",
 587				  msglen, nodeid);
 588			return NULL;
 589		}
 590
 591		switch (p->rcom.rc_type) {
 592		case cpu_to_le32(DLM_RCOM_NAMES):
 593			fallthrough;
 594		case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
 595			fallthrough;
 596		case cpu_to_le32(DLM_RCOM_STATUS):
 597			fallthrough;
 598		case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
 599			node = nodeid2node(nodeid, 0);
 600			if (node) {
 601				spin_lock(&node->state_lock);
 602				if (node->state != DLM_ESTABLISHED)
 603					pr_debug("receive begin RCOM msg from node %d with state %s\n",
 604						 node->nodeid, dlm_state_str(node->state));
 605
 606				switch (node->state) {
 607				case DLM_CLOSED:
 608					node->state = DLM_ESTABLISHED;
 609					pr_debug("switch node %d to state %s\n",
 610						 node->nodeid, dlm_state_str(node->state));
 611					break;
 612				case DLM_ESTABLISHED:
 613					break;
 614				default:
 615					/* some invalid state passive shutdown
 616					 * was failed, we try to reset and
 617					 * hope it will go on.
 618					 */
 619					log_print("reset node %d because shutdown stuck",
 620						  node->nodeid);
 621
 622					midcomms_node_reset(node);
 623					node->state = DLM_ESTABLISHED;
 624					break;
 625				}
 626				spin_unlock(&node->state_lock);
 627			}
 628
 629			allocation = GFP_NOFS;
 630			break;
 631		default:
 632			break;
 633		}
 
 
 
 
 
 
 634
 635		break;
 636	default:
 637		break;
 638	}
 639
 640	node = nodeid2node(nodeid, allocation);
 641	if (!node) {
 642		switch (p->header.h_cmd) {
 643		case DLM_OPTS:
 644			if (msglen < sizeof(struct dlm_opts)) {
 645				log_print("opts msg too small: %u, will skip this message from node %d",
 646					  msglen, nodeid);
 647				return NULL;
 648			}
 649
 650			log_print_ratelimited("received dlm opts message nextcmd %d from node %d in an invalid sequence",
 651					      p->opts.o_nextcmd, nodeid);
 652			break;
 653		default:
 654			log_print_ratelimited("received dlm message cmd %d from node %d in an invalid sequence",
 655					      p->header.h_cmd, nodeid);
 656			break;
 657		}
 658
 659		return NULL;
 660	}
 661
 662	ret = cb(node);
 663	if (ret < 0)
 664		return NULL;
 665
 666	return node;
 667}
 668
 669static int dlm_midcomms_version_check_3_2(struct midcomms_node *node)
 670{
 671	switch (node->version) {
 672	case DLM_VERSION_NOT_SET:
 673		node->version = DLM_VERSION_3_2;
 674		log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
 675			  node->nodeid);
 676		break;
 677	case DLM_VERSION_3_2:
 678		break;
 679	default:
 680		log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
 681				      DLM_VERSION_3_2, node->nodeid, node->version);
 682		return -1;
 683	}
 684
 685	return 0;
 686}
 687
 688static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodeid)
 689{
 690	int len = msglen;
 691
 692	/* we only trust outer header msglen because
 693	 * it's checked against receive buffer length.
 694	 */
 695	if (len < sizeof(struct dlm_opts))
 696		return -1;
 697	len -= sizeof(struct dlm_opts);
 698
 699	if (len < le16_to_cpu(p->opts.o_optlen))
 700		return -1;
 701	len -= le16_to_cpu(p->opts.o_optlen);
 702
 703	switch (p->opts.o_nextcmd) {
 704	case DLM_FIN:
 705		if (len < sizeof(struct dlm_header)) {
 706			log_print("fin too small: %d, will skip this message from node %d",
 707				  len, nodeid);
 708			return -1;
 709		}
 710
 711		break;
 712	case DLM_MSG:
 713		if (len < sizeof(struct dlm_message)) {
 714			log_print("msg too small: %d, will skip this message from node %d",
 715				  msglen, nodeid);
 716			return -1;
 717		}
 718
 719		break;
 720	case DLM_RCOM:
 721		if (len < sizeof(struct dlm_rcom)) {
 722			log_print("rcom msg too small: %d, will skip this message from node %d",
 723				  len, nodeid);
 724			return -1;
 725		}
 726
 727		break;
 728	default:
 729		log_print("unsupported o_nextcmd received: %u, will skip this message from node %d",
 730			  p->opts.o_nextcmd, nodeid);
 731		return -1;
 732	}
 733
 734	return 0;
 735}
 736
 737static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid)
 738{
 739	uint16_t msglen = le16_to_cpu(p->header.h_length);
 740	struct midcomms_node *node;
 741	uint32_t seq;
 742	int ret, idx;
 743
 744	idx = srcu_read_lock(&nodes_srcu);
 745	node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
 746					     dlm_midcomms_version_check_3_2);
 747	if (!node)
 748		goto out;
 749
 750	switch (p->header.h_cmd) {
 751	case DLM_RCOM:
 752		/* these rcom message we use to determine version.
 753		 * they have their own retransmission handling and
 754		 * are the first messages of dlm.
 755		 *
 756		 * length already checked.
 757		 */
 758		switch (p->rcom.rc_type) {
 759		case cpu_to_le32(DLM_RCOM_NAMES):
 760			fallthrough;
 761		case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
 762			fallthrough;
 763		case cpu_to_le32(DLM_RCOM_STATUS):
 764			fallthrough;
 765		case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
 766			break;
 767		default:
 768			log_print("unsupported rcom type received: %u, will skip this message from node %d",
 769				  le32_to_cpu(p->rcom.rc_type), nodeid);
 770			goto out;
 771		}
 772
 773		WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
 774		dlm_receive_buffer(p, nodeid);
 775		break;
 776	case DLM_OPTS:
 777		seq = le32_to_cpu(p->header.u.h_seq);
 778
 779		ret = dlm_opts_check_msglen(p, msglen, nodeid);
 780		if (ret < 0) {
 781			log_print("opts msg too small: %u, will skip this message from node %d",
 782				  msglen, nodeid);
 783			goto out;
 784		}
 785
 786		p = (union dlm_packet *)((unsigned char *)p->opts.o_opts +
 787					 le16_to_cpu(p->opts.o_optlen));
 788
 789		/* recheck inner msglen just if it's not garbage */
 790		msglen = le16_to_cpu(p->header.h_length);
 791		switch (p->header.h_cmd) {
 792		case DLM_RCOM:
 793			if (msglen < sizeof(struct dlm_rcom)) {
 794				log_print("inner rcom msg too small: %u, will skip this message from node %d",
 795					  msglen, nodeid);
 796				goto out;
 797			}
 798
 799			break;
 800		case DLM_MSG:
 801			if (msglen < sizeof(struct dlm_message)) {
 802				log_print("inner msg too small: %u, will skip this message from node %d",
 803					  msglen, nodeid);
 804				goto out;
 805			}
 806
 807			break;
 808		case DLM_FIN:
 809			if (msglen < sizeof(struct dlm_header)) {
 810				log_print("inner fin too small: %u, will skip this message from node %d",
 811					  msglen, nodeid);
 812				goto out;
 813			}
 814
 815			break;
 816		default:
 817			log_print("unsupported inner h_cmd received: %u, will skip this message from node %d",
 818				  msglen, nodeid);
 819			goto out;
 820		}
 821
 822		dlm_midcomms_receive_buffer(p, node, seq);
 823		break;
 824	case DLM_ACK:
 825		seq = le32_to_cpu(p->header.u.h_seq);
 826		dlm_receive_ack(node, seq);
 827		break;
 828	default:
 829		log_print("unsupported h_cmd received: %u, will skip this message from node %d",
 830			  p->header.h_cmd, nodeid);
 831		break;
 832	}
 833
 834out:
 835	srcu_read_unlock(&nodes_srcu, idx);
 836}
 837
 838static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
 839{
 840	switch (node->version) {
 841	case DLM_VERSION_NOT_SET:
 842		node->version = DLM_VERSION_3_1;
 843		log_print("version 0x%08x for node %d detected", DLM_VERSION_3_1,
 844			  node->nodeid);
 845		break;
 846	case DLM_VERSION_3_1:
 847		break;
 848	default:
 849		log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
 850				      DLM_VERSION_3_1, node->nodeid, node->version);
 851		return -1;
 852	}
 853
 854	return 0;
 855}
 856
 857static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid)
 858{
 859	uint16_t msglen = le16_to_cpu(p->header.h_length);
 860	struct midcomms_node *node;
 861	int idx;
 862
 863	idx = srcu_read_lock(&nodes_srcu);
 864	node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
 865					     dlm_midcomms_version_check_3_1);
 866	if (!node) {
 867		srcu_read_unlock(&nodes_srcu, idx);
 868		return;
 869	}
 870	srcu_read_unlock(&nodes_srcu, idx);
 871
 872	switch (p->header.h_cmd) {
 873	case DLM_RCOM:
 874		/* length already checked */
 875		break;
 876	case DLM_MSG:
 877		if (msglen < sizeof(struct dlm_message)) {
 878			log_print("msg too small: %u, will skip this message from node %d",
 879				  msglen, nodeid);
 880			return;
 881		}
 882
 883		break;
 884	default:
 885		log_print("unsupported h_cmd received: %u, will skip this message from node %d",
 886			  p->header.h_cmd, nodeid);
 887		return;
 888	}
 889
 890	dlm_receive_buffer(p, nodeid);
 891}
 892
 893int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len)
 894{
 895	const unsigned char *ptr = buf;
 896	const struct dlm_header *hd;
 897	uint16_t msglen;
 898	int ret = 0;
 899
 900	while (len >= sizeof(struct dlm_header)) {
 901		hd = (struct dlm_header *)ptr;
 
 902
 903		/* no message should be more than DLM_MAX_SOCKET_BUFSIZE or
 904		 * less than dlm_header size.
 905		 *
 906		 * Some messages does not have a 8 byte length boundary yet
 907		 * which can occur in a unaligned memory access of some dlm
 908		 * messages. However this problem need to be fixed at the
 909		 * sending side, for now it seems nobody run into architecture
 910		 * related issues yet but it slows down some processing.
 911		 * Fixing this issue should be scheduled in future by doing
 912		 * the next major version bump.
 913		 */
 914		msglen = le16_to_cpu(hd->h_length);
 915		if (msglen > DLM_MAX_SOCKET_BUFSIZE ||
 916		    msglen < sizeof(struct dlm_header)) {
 917			log_print("received invalid length header: %u from node %d, will abort message parsing",
 918				  msglen, nodeid);
 919			return -EBADMSG;
 920		}
 921
 922		/* caller will take care that leftover
 923		 * will be parsed next call with more data
 924		 */
 925		if (msglen > len)
 926			break;
 927
 928		ret += msglen;
 929		len -= msglen;
 930		ptr += msglen;
 931	}
 932
 933	return ret;
 934}
 935
 936/*
 937 * Called from the low-level comms layer to process a buffer of
 938 * commands.
 939 */
 940int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
 941{
 942	const unsigned char *ptr = buf;
 943	const struct dlm_header *hd;
 944	uint16_t msglen;
 945	int ret = 0;
 946
 947	while (len >= sizeof(struct dlm_header)) {
 948		hd = (struct dlm_header *)ptr;
 949
 950		msglen = le16_to_cpu(hd->h_length);
 951		if (msglen > len)
 952			break;
 953
 954		switch (hd->h_version) {
 955		case cpu_to_le32(DLM_VERSION_3_1):
 956			dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid);
 957			break;
 958		case cpu_to_le32(DLM_VERSION_3_2):
 959			dlm_midcomms_receive_buffer_3_2((union dlm_packet *)ptr, nodeid);
 960			break;
 961		default:
 962			log_print("received invalid version header: %u from node %d, will skip this message",
 963				  le32_to_cpu(hd->h_version), nodeid);
 964			break;
 965		}
 966
 967		ret += msglen;
 
 
 968		len -= msglen;
 969		ptr += msglen;
 970	}
 971
 972	return ret;
 973}
 974
 975void dlm_midcomms_receive_done(int nodeid)
 976{
 977	struct midcomms_node *node;
 978	int idx;
 979
 980	idx = srcu_read_lock(&nodes_srcu);
 981	node = nodeid2node(nodeid, 0);
 982	if (!node) {
 983		srcu_read_unlock(&nodes_srcu, idx);
 984		return;
 985	}
 986
 987	/* old protocol, we do nothing */
 988	switch (node->version) {
 989	case DLM_VERSION_3_2:
 990		break;
 991	default:
 992		srcu_read_unlock(&nodes_srcu, idx);
 993		return;
 994	}
 995
 996	/* do nothing if we didn't delivered stateful to ulp */
 997	if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
 998				&node->flags)) {
 999		srcu_read_unlock(&nodes_srcu, idx);
1000		return;
1001	}
1002
1003	spin_lock(&node->state_lock);
1004	/* we only ack if state is ESTABLISHED */
1005	switch (node->state) {
1006	case DLM_ESTABLISHED:
1007		spin_unlock(&node->state_lock);
1008		dlm_send_ack(node->nodeid, node->seq_next);
1009		break;
1010	default:
1011		spin_unlock(&node->state_lock);
1012		/* do nothing FIN has it's own ack send */
1013		break;
1014	}
1015	srcu_read_unlock(&nodes_srcu, idx);
1016}
1017
1018void dlm_midcomms_unack_msg_resend(int nodeid)
1019{
1020	struct midcomms_node *node;
1021	struct dlm_mhandle *mh;
1022	int idx, ret;
1023
1024	idx = srcu_read_lock(&nodes_srcu);
1025	node = nodeid2node(nodeid, 0);
1026	if (!node) {
1027		srcu_read_unlock(&nodes_srcu, idx);
1028		return;
1029	}
1030
1031	/* old protocol, we don't support to retransmit on failure */
1032	switch (node->version) {
1033	case DLM_VERSION_3_2:
1034		break;
1035	default:
1036		srcu_read_unlock(&nodes_srcu, idx);
1037		return;
1038	}
1039
1040	rcu_read_lock();
1041	list_for_each_entry_rcu(mh, &node->send_queue, list) {
1042		if (!mh->committed)
1043			continue;
1044
1045		ret = dlm_lowcomms_resend_msg(mh->msg);
1046		if (!ret)
1047			log_print_ratelimited("retransmit dlm msg, seq %u, nodeid %d",
1048					      mh->seq, node->nodeid);
1049	}
1050	rcu_read_unlock();
1051	srcu_read_unlock(&nodes_srcu, idx);
1052}
1053
1054static void dlm_fill_opts_header(struct dlm_opts *opts, uint16_t inner_len,
1055				 uint32_t seq)
1056{
1057	opts->o_header.h_cmd = DLM_OPTS;
1058	opts->o_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
1059	opts->o_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
1060	opts->o_header.h_length = cpu_to_le16(DLM_MIDCOMMS_OPT_LEN + inner_len);
1061	opts->o_header.u.h_seq = cpu_to_le32(seq);
1062}
1063
1064static void midcomms_new_msg_cb(void *data)
1065{
1066	struct dlm_mhandle *mh = data;
1067
1068	atomic_inc(&mh->node->send_queue_cnt);
1069
1070	spin_lock_bh(&mh->node->send_queue_lock);
1071	list_add_tail_rcu(&mh->list, &mh->node->send_queue);
1072	spin_unlock_bh(&mh->node->send_queue_lock);
1073
1074	mh->seq = mh->node->seq_send++;
1075}
1076
1077static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
1078						int len, gfp_t allocation, char **ppc)
1079{
1080	struct dlm_opts *opts;
1081	struct dlm_msg *msg;
1082
1083	msg = dlm_lowcomms_new_msg(nodeid, len + DLM_MIDCOMMS_OPT_LEN,
1084				   allocation, ppc, midcomms_new_msg_cb, mh);
1085	if (!msg)
1086		return NULL;
1087
1088	opts = (struct dlm_opts *)*ppc;
1089	mh->opts = opts;
1090
1091	/* add possible options here */
1092	dlm_fill_opts_header(opts, len, mh->seq);
1093
1094	*ppc += sizeof(*opts);
1095	mh->inner_p = (const union dlm_packet *)*ppc;
1096	return msg;
1097}
1098
1099/* avoid false positive for nodes_srcu, unlock happens in
1100 * dlm_midcomms_commit_mhandle which is a must call if success
1101 */
1102#ifndef __CHECKER__
1103struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
1104					     gfp_t allocation, char **ppc)
1105{
1106	struct midcomms_node *node;
1107	struct dlm_mhandle *mh;
1108	struct dlm_msg *msg;
1109	int idx;
1110
1111	idx = srcu_read_lock(&nodes_srcu);
1112	node = nodeid2node(nodeid, 0);
1113	if (!node) {
1114		WARN_ON_ONCE(1);
1115		goto err;
1116	}
1117
1118	/* this is a bug, however we going on and hope it will be resolved */
1119	WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
1120
1121	mh = dlm_allocate_mhandle(allocation);
1122	if (!mh)
1123		goto err;
1124
1125	mh->committed = false;
1126	mh->ack_rcv = NULL;
1127	mh->idx = idx;
1128	mh->node = node;
1129
1130	switch (node->version) {
1131	case DLM_VERSION_3_1:
1132		msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc,
1133					   NULL, NULL);
1134		if (!msg) {
1135			dlm_free_mhandle(mh);
1136			goto err;
1137		}
1138
1139		break;
1140	case DLM_VERSION_3_2:
1141		msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation,
1142					       ppc);
1143		if (!msg) {
1144			dlm_free_mhandle(mh);
1145			goto err;
1146		}
1147
1148		break;
1149	default:
1150		dlm_free_mhandle(mh);
1151		WARN_ON_ONCE(1);
1152		goto err;
1153	}
1154
1155	mh->msg = msg;
1156
1157	/* keep in mind that is a must to call
1158	 * dlm_midcomms_commit_msg() which releases
1159	 * nodes_srcu using mh->idx which is assumed
1160	 * here that the application will call it.
1161	 */
1162	return mh;
1163
1164err:
1165	srcu_read_unlock(&nodes_srcu, idx);
1166	return NULL;
1167}
1168#endif
1169
1170static void dlm_midcomms_commit_msg_3_2_trace(const struct dlm_mhandle *mh,
1171					      const void *name, int namelen)
1172{
1173	switch (mh->inner_p->header.h_cmd) {
1174	case DLM_MSG:
1175		trace_dlm_send_message(mh->node->nodeid, mh->seq,
1176				       &mh->inner_p->message,
1177				       name, namelen);
1178		break;
1179	case DLM_RCOM:
1180		trace_dlm_send_rcom(mh->node->nodeid, mh->seq,
1181				    &mh->inner_p->rcom);
1182		break;
1183	default:
1184		/* nothing to trace */
1185		break;
1186	}
1187}
1188
1189static void dlm_midcomms_commit_msg_3_2(struct dlm_mhandle *mh,
1190					const void *name, int namelen)
1191{
1192	/* nexthdr chain for fast lookup */
1193	mh->opts->o_nextcmd = mh->inner_p->header.h_cmd;
1194	mh->committed = true;
1195	dlm_midcomms_commit_msg_3_2_trace(mh, name, namelen);
1196	dlm_lowcomms_commit_msg(mh->msg);
1197}
1198
1199/* avoid false positive for nodes_srcu, lock was happen in
1200 * dlm_midcomms_get_mhandle
1201 */
1202#ifndef __CHECKER__
1203void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh,
1204				 const void *name, int namelen)
1205{
1206
1207	switch (mh->node->version) {
1208	case DLM_VERSION_3_1:
1209		srcu_read_unlock(&nodes_srcu, mh->idx);
1210
1211		dlm_lowcomms_commit_msg(mh->msg);
1212		dlm_lowcomms_put_msg(mh->msg);
1213		/* mh is not part of rcu list in this case */
1214		dlm_free_mhandle(mh);
1215		break;
1216	case DLM_VERSION_3_2:
1217		dlm_midcomms_commit_msg_3_2(mh, name, namelen);
1218		srcu_read_unlock(&nodes_srcu, mh->idx);
1219		break;
1220	default:
1221		srcu_read_unlock(&nodes_srcu, mh->idx);
1222		WARN_ON_ONCE(1);
1223		break;
1224	}
1225}
1226#endif
1227
1228int dlm_midcomms_start(void)
1229{
1230	return dlm_lowcomms_start();
1231}
1232
1233void dlm_midcomms_stop(void)
1234{
1235	dlm_lowcomms_stop();
1236}
1237
1238void dlm_midcomms_init(void)
1239{
1240	int i;
1241
1242	for (i = 0; i < CONN_HASH_SIZE; i++)
1243		INIT_HLIST_HEAD(&node_hash[i]);
1244
1245	dlm_lowcomms_init();
1246}
1247
1248void dlm_midcomms_exit(void)
1249{
1250	dlm_lowcomms_exit();
1251}
1252
1253static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
1254{
1255	spin_lock(&node->state_lock);
1256	pr_debug("receive active fin ack from node %d with state %s\n",
1257		 node->nodeid, dlm_state_str(node->state));
1258
1259	switch (node->state) {
1260	case DLM_FIN_WAIT1:
1261		node->state = DLM_FIN_WAIT2;
1262		pr_debug("switch node %d to state %s\n",
1263			 node->nodeid, dlm_state_str(node->state));
1264		break;
1265	case DLM_CLOSING:
1266		midcomms_node_reset(node);
1267		pr_debug("switch node %d to state %s\n",
1268			 node->nodeid, dlm_state_str(node->state));
1269		wake_up(&node->shutdown_wait);
1270		break;
1271	case DLM_CLOSED:
1272		/* not valid but somehow we got what we want */
1273		wake_up(&node->shutdown_wait);
1274		break;
1275	default:
1276		spin_unlock(&node->state_lock);
1277		log_print("%s: unexpected state: %d\n",
1278			  __func__, node->state);
1279		WARN_ON_ONCE(1);
1280		return;
1281	}
1282	spin_unlock(&node->state_lock);
1283}
1284
1285void dlm_midcomms_add_member(int nodeid)
1286{
1287	struct midcomms_node *node;
1288	int idx;
1289
1290	if (nodeid == dlm_our_nodeid())
1291		return;
1292
1293	idx = srcu_read_lock(&nodes_srcu);
1294	node = nodeid2node(nodeid, GFP_NOFS);
1295	if (!node) {
1296		srcu_read_unlock(&nodes_srcu, idx);
1297		return;
1298	}
1299
1300	spin_lock(&node->state_lock);
1301	if (!node->users) {
1302		pr_debug("receive add member from node %d with state %s\n",
1303			 node->nodeid, dlm_state_str(node->state));
1304		switch (node->state) {
1305		case DLM_ESTABLISHED:
1306			break;
1307		case DLM_CLOSED:
1308			node->state = DLM_ESTABLISHED;
1309			pr_debug("switch node %d to state %s\n",
1310				 node->nodeid, dlm_state_str(node->state));
1311			break;
1312		default:
1313			/* some invalid state passive shutdown
1314			 * was failed, we try to reset and
1315			 * hope it will go on.
1316			 */
1317			log_print("reset node %d because shutdown stuck",
1318				  node->nodeid);
1319
1320			midcomms_node_reset(node);
1321			node->state = DLM_ESTABLISHED;
1322			break;
1323		}
1324	}
1325
1326	node->users++;
1327	pr_debug("node %d users inc count %d\n", nodeid, node->users);
1328	spin_unlock(&node->state_lock);
1329
1330	srcu_read_unlock(&nodes_srcu, idx);
1331}
1332
1333void dlm_midcomms_remove_member(int nodeid)
1334{
1335	struct midcomms_node *node;
1336	int idx;
1337
1338	if (nodeid == dlm_our_nodeid())
1339		return;
1340
1341	idx = srcu_read_lock(&nodes_srcu);
1342	node = nodeid2node(nodeid, 0);
1343	if (!node) {
1344		srcu_read_unlock(&nodes_srcu, idx);
1345		return;
1346	}
1347
1348	spin_lock(&node->state_lock);
1349	node->users--;
1350	pr_debug("node %d users dec count %d\n", nodeid, node->users);
1351
1352	/* hitting users count to zero means the
1353	 * other side is running dlm_midcomms_stop()
1354	 * we meet us to have a clean disconnect.
1355	 */
1356	if (node->users == 0) {
1357		pr_debug("receive remove member from node %d with state %s\n",
1358			 node->nodeid, dlm_state_str(node->state));
1359		switch (node->state) {
1360		case DLM_ESTABLISHED:
1361			break;
1362		case DLM_CLOSE_WAIT:
1363			/* passive shutdown DLM_LAST_ACK case 2 */
1364			node->state = DLM_LAST_ACK;
1365			spin_unlock(&node->state_lock);
1366
1367			pr_debug("switch node %d to state %s case 2\n",
1368				 node->nodeid, dlm_state_str(node->state));
1369			goto send_fin;
1370		case DLM_LAST_ACK:
1371			/* probably receive fin caught it, do nothing */
1372			break;
1373		case DLM_CLOSED:
1374			/* already gone, do nothing */
1375			break;
1376		default:
1377			log_print("%s: unexpected state: %d\n",
1378				  __func__, node->state);
1379			break;
1380		}
1381	}
1382	spin_unlock(&node->state_lock);
1383
1384	srcu_read_unlock(&nodes_srcu, idx);
1385	return;
1386
1387send_fin:
1388	set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
1389	dlm_send_fin(node, dlm_pas_fin_ack_rcv);
1390	srcu_read_unlock(&nodes_srcu, idx);
1391}
1392
1393static void midcomms_node_release(struct rcu_head *rcu)
1394{
1395	struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
1396
1397	WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
1398	kfree(node);
1399}
1400
1401static void midcomms_shutdown(struct midcomms_node *node)
1402{
1403	int ret;
1404
1405	/* old protocol, we don't wait for pending operations */
1406	switch (node->version) {
1407	case DLM_VERSION_3_2:
1408		break;
1409	default:
1410		return;
1411	}
1412
1413	spin_lock(&node->state_lock);
1414	pr_debug("receive active shutdown for node %d with state %s\n",
1415		 node->nodeid, dlm_state_str(node->state));
1416	switch (node->state) {
1417	case DLM_ESTABLISHED:
1418		node->state = DLM_FIN_WAIT1;
1419		pr_debug("switch node %d to state %s case 2\n",
1420			 node->nodeid, dlm_state_str(node->state));
1421		break;
1422	case DLM_CLOSED:
1423		/* we have what we want */
1424		spin_unlock(&node->state_lock);
1425		return;
1426	default:
1427		/* busy to enter DLM_FIN_WAIT1, wait until passive
1428		 * done in shutdown_wait to enter DLM_CLOSED.
1429		 */
1430		break;
1431	}
1432	spin_unlock(&node->state_lock);
1433
1434	if (node->state == DLM_FIN_WAIT1) {
1435		dlm_send_fin(node, dlm_act_fin_ack_rcv);
1436
1437		if (DLM_DEBUG_FENCE_TERMINATION)
1438			msleep(5000);
1439	}
1440
1441	/* wait for other side dlm + fin */
1442	ret = wait_event_timeout(node->shutdown_wait,
1443				 node->state == DLM_CLOSED ||
1444				 test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
1445				 DLM_SHUTDOWN_TIMEOUT);
1446	if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags)) {
1447		pr_debug("active shutdown timed out for node %d with state %s\n",
1448			 node->nodeid, dlm_state_str(node->state));
1449		midcomms_node_reset(node);
1450		dlm_lowcomms_shutdown_node(node->nodeid, true);
1451		return;
1452	}
1453
1454	pr_debug("active shutdown done for node %d with state %s\n",
1455		 node->nodeid, dlm_state_str(node->state));
1456	dlm_lowcomms_shutdown_node(node->nodeid, false);
1457}
1458
1459void dlm_midcomms_shutdown(void)
1460{
1461	struct midcomms_node *node;
1462	int i, idx;
1463
1464	dlm_lowcomms_shutdown();
1465
1466	mutex_lock(&close_lock);
1467	idx = srcu_read_lock(&nodes_srcu);
1468	for (i = 0; i < CONN_HASH_SIZE; i++) {
1469		hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
1470			midcomms_shutdown(node);
1471
1472			dlm_delete_debug_comms_file(node->debugfs);
1473
1474			spin_lock(&nodes_lock);
1475			hlist_del_rcu(&node->hlist);
1476			spin_unlock(&nodes_lock);
1477
1478			call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
1479		}
1480	}
1481	srcu_read_unlock(&nodes_srcu, idx);
1482	mutex_unlock(&close_lock);
1483}
1484
1485int dlm_midcomms_close(int nodeid)
1486{
1487	struct midcomms_node *node;
1488	int idx, ret;
1489
1490	if (nodeid == dlm_our_nodeid())
1491		return 0;
1492
1493	dlm_stop_lockspaces_check();
1494
1495	idx = srcu_read_lock(&nodes_srcu);
1496	/* Abort pending close/remove operation */
1497	node = nodeid2node(nodeid, 0);
1498	if (node) {
1499		/* let shutdown waiters leave */
1500		set_bit(DLM_NODE_FLAG_CLOSE, &node->flags);
1501		wake_up(&node->shutdown_wait);
1502	}
1503	srcu_read_unlock(&nodes_srcu, idx);
1504
1505	synchronize_srcu(&nodes_srcu);
1506
1507	idx = srcu_read_lock(&nodes_srcu);
1508	mutex_lock(&close_lock);
1509	node = nodeid2node(nodeid, 0);
1510	if (!node) {
1511		mutex_unlock(&close_lock);
1512		srcu_read_unlock(&nodes_srcu, idx);
1513		return dlm_lowcomms_close(nodeid);
1514	}
1515
1516	ret = dlm_lowcomms_close(nodeid);
1517	spin_lock(&node->state_lock);
1518	midcomms_node_reset(node);
1519	spin_unlock(&node->state_lock);
1520	srcu_read_unlock(&nodes_srcu, idx);
1521	mutex_unlock(&close_lock);
1522
1523	return ret;
1524}
1525
1526/* debug functionality to send raw dlm msg from user space */
1527struct dlm_rawmsg_data {
1528	struct midcomms_node *node;
1529	void *buf;
1530};
1531
1532static void midcomms_new_rawmsg_cb(void *data)
1533{
1534	struct dlm_rawmsg_data *rd = data;
1535	struct dlm_header *h = rd->buf;
1536
1537	switch (h->h_version) {
1538	case cpu_to_le32(DLM_VERSION_3_1):
1539		break;
1540	default:
1541		switch (h->h_cmd) {
1542		case DLM_OPTS:
1543			if (!h->u.h_seq)
1544				h->u.h_seq = cpu_to_le32(rd->node->seq_send++);
1545			break;
1546		default:
1547			break;
1548		}
1549		break;
1550	}
1551}
1552
1553int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf,
1554			     int buflen)
1555{
1556	struct dlm_rawmsg_data rd;
1557	struct dlm_msg *msg;
1558	char *msgbuf;
1559
1560	rd.node = node;
1561	rd.buf = buf;
1562
1563	msg = dlm_lowcomms_new_msg(node->nodeid, buflen, GFP_NOFS,
1564				   &msgbuf, midcomms_new_rawmsg_cb, &rd);
1565	if (!msg)
1566		return -ENOMEM;
1567
1568	memcpy(msgbuf, buf, buflen);
1569	dlm_lowcomms_commit_msg(msg);
1570	return 0;
1571}
1572