Linux Audio

Check our new training course

Loading...
  1/* -*- mode: c; c-basic-offset: 8; -*-
  2 * vim: noexpandtab sw=8 ts=8 sts=0:
  3 *
  4 * Copyright (C) 2004, 2005, 2012 Oracle.  All rights reserved.
  5 *
  6 * This program is free software; you can redistribute it and/or
  7 * modify it under the terms of the GNU General Public
  8 * License as published by the Free Software Foundation; either
  9 * version 2 of the License, or (at your option) any later version.
 10 *
 11 * This program is distributed in the hope that it will be useful,
 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 14 * General Public License for more details.
 15 *
 16 * You should have received a copy of the GNU General Public
 17 * License along with this program; if not, write to the
 18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 19 * Boston, MA 021110-1307, USA.
 20 */
 21
 22#include <linux/slab.h>
 23#include <linux/kernel.h>
 24#include <linux/module.h>
 25#include <linux/configfs.h>
 26
 27#include "tcp.h"
 28#include "nodemanager.h"
 29#include "heartbeat.h"
 30#include "masklog.h"
 31
 32/* for now we operate under the assertion that there can be only one
 33 * cluster active at a time.  Changing this will require trickling
 34 * cluster references throughout where nodes are looked up */
 35struct r2nm_cluster *r2nm_single_cluster;
 36
 37char *r2nm_fence_method_desc[R2NM_FENCE_METHODS] = {
 38		"reset",	/* R2NM_FENCE_RESET */
 39		"panic",	/* R2NM_FENCE_PANIC */
 40};
 41
 42struct r2nm_node *r2nm_get_node_by_num(u8 node_num)
 43{
 44	struct r2nm_node *node = NULL;
 45
 46	if (node_num >= R2NM_MAX_NODES || r2nm_single_cluster == NULL)
 47		goto out;
 48
 49	read_lock(&r2nm_single_cluster->cl_nodes_lock);
 50	node = r2nm_single_cluster->cl_nodes[node_num];
 51	if (node)
 52		config_item_get(&node->nd_item);
 53	read_unlock(&r2nm_single_cluster->cl_nodes_lock);
 54out:
 55	return node;
 56}
 57EXPORT_SYMBOL_GPL(r2nm_get_node_by_num);
 58
 59int r2nm_configured_node_map(unsigned long *map, unsigned bytes)
 60{
 61	struct r2nm_cluster *cluster = r2nm_single_cluster;
 62
 63	BUG_ON(bytes < (sizeof(cluster->cl_nodes_bitmap)));
 64
 65	if (cluster == NULL)
 66		return -EINVAL;
 67
 68	read_lock(&cluster->cl_nodes_lock);
 69	memcpy(map, cluster->cl_nodes_bitmap, sizeof(cluster->cl_nodes_bitmap));
 70	read_unlock(&cluster->cl_nodes_lock);
 71
 72	return 0;
 73}
 74EXPORT_SYMBOL_GPL(r2nm_configured_node_map);
 75
 76static struct r2nm_node *r2nm_node_ip_tree_lookup(struct r2nm_cluster *cluster,
 77						  __be32 ip_needle,
 78						  struct rb_node ***ret_p,
 79						  struct rb_node **ret_parent)
 80{
 81	struct rb_node **p = &cluster->cl_node_ip_tree.rb_node;
 82	struct rb_node *parent = NULL;
 83	struct r2nm_node *node, *ret = NULL;
 84
 85	while (*p) {
 86		int cmp;
 87
 88		parent = *p;
 89		node = rb_entry(parent, struct r2nm_node, nd_ip_node);
 90
 91		cmp = memcmp(&ip_needle, &node->nd_ipv4_address,
 92				sizeof(ip_needle));
 93		if (cmp < 0)
 94			p = &(*p)->rb_left;
 95		else if (cmp > 0)
 96			p = &(*p)->rb_right;
 97		else {
 98			ret = node;
 99			break;
100		}
101	}
102
103	if (ret_p != NULL)
104		*ret_p = p;
105	if (ret_parent != NULL)
106		*ret_parent = parent;
107
108	return ret;
109}
110
111struct r2nm_node *r2nm_get_node_by_ip(__be32 addr)
112{
113	struct r2nm_node *node = NULL;
114	struct r2nm_cluster *cluster = r2nm_single_cluster;
115
116	if (cluster == NULL)
117		goto out;
118
119	read_lock(&cluster->cl_nodes_lock);
120	node = r2nm_node_ip_tree_lookup(cluster, addr, NULL, NULL);
121	if (node)
122		config_item_get(&node->nd_item);
123	read_unlock(&cluster->cl_nodes_lock);
124
125out:
126	return node;
127}
128EXPORT_SYMBOL_GPL(r2nm_get_node_by_ip);
129
130void r2nm_node_put(struct r2nm_node *node)
131{
132	config_item_put(&node->nd_item);
133}
134EXPORT_SYMBOL_GPL(r2nm_node_put);
135
136void r2nm_node_get(struct r2nm_node *node)
137{
138	config_item_get(&node->nd_item);
139}
140EXPORT_SYMBOL_GPL(r2nm_node_get);
141
142u8 r2nm_this_node(void)
143{
144	u8 node_num = R2NM_MAX_NODES;
145
146	if (r2nm_single_cluster && r2nm_single_cluster->cl_has_local)
147		node_num = r2nm_single_cluster->cl_local_node;
148
149	return node_num;
150}
151EXPORT_SYMBOL_GPL(r2nm_this_node);
152
153/* node configfs bits */
154
155static struct r2nm_cluster *to_r2nm_cluster(struct config_item *item)
156{
157	return item ?
158		container_of(to_config_group(item), struct r2nm_cluster,
159			     cl_group)
160		: NULL;
161}
162
163static struct r2nm_node *to_r2nm_node(struct config_item *item)
164{
165	return item ? container_of(item, struct r2nm_node, nd_item) : NULL;
166}
167
168static void r2nm_node_release(struct config_item *item)
169{
170	struct r2nm_node *node = to_r2nm_node(item);
171	kfree(node);
172}
173
174static ssize_t r2nm_node_num_read(struct r2nm_node *node, char *page)
175{
176	return sprintf(page, "%d\n", node->nd_num);
177}
178
179static struct r2nm_cluster *to_r2nm_cluster_from_node(struct r2nm_node *node)
180{
181	/* through the first node_set .parent
182	 * mycluster/nodes/mynode == r2nm_cluster->r2nm_node_group->r2nm_node */
183	return to_r2nm_cluster(node->nd_item.ci_parent->ci_parent);
184}
185
186enum {
187	R2NM_NODE_ATTR_NUM = 0,
188	R2NM_NODE_ATTR_PORT,
189	R2NM_NODE_ATTR_ADDRESS,
190	R2NM_NODE_ATTR_LOCAL,
191};
192
193static ssize_t r2nm_node_num_write(struct r2nm_node *node, const char *page,
194				   size_t count)
195{
196	struct r2nm_cluster *cluster = to_r2nm_cluster_from_node(node);
197	unsigned long tmp;
198	char *p = (char *)page;
199	int err;
200
201	err = kstrtoul(p, 10, &tmp);
202	if (err)
203		return err;
204
205	if (tmp >= R2NM_MAX_NODES)
206		return -ERANGE;
207
208	/* once we're in the cl_nodes tree networking can look us up by
209	 * node number and try to use our address and port attributes
210	 * to connect to this node.. make sure that they've been set
211	 * before writing the node attribute? */
212	if (!test_bit(R2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
213	    !test_bit(R2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
214		return -EINVAL; /* XXX */
215
216	write_lock(&cluster->cl_nodes_lock);
217	if (cluster->cl_nodes[tmp])
218		p = NULL;
219	else  {
220		cluster->cl_nodes[tmp] = node;
221		node->nd_num = tmp;
222		set_bit(tmp, cluster->cl_nodes_bitmap);
223	}
224	write_unlock(&cluster->cl_nodes_lock);
225	if (p == NULL)
226		return -EEXIST;
227
228	return count;
229}
230static ssize_t r2nm_node_ipv4_port_read(struct r2nm_node *node, char *page)
231{
232	return sprintf(page, "%u\n", ntohs(node->nd_ipv4_port));
233}
234
235static ssize_t r2nm_node_ipv4_port_write(struct r2nm_node *node,
236					 const char *page, size_t count)
237{
238	unsigned long tmp;
239	char *p = (char *)page;
240	int err;
241
242	err = kstrtoul(p, 10, &tmp);
243	if (err)
244		return err;
245
246	if (tmp == 0)
247		return -EINVAL;
248	if (tmp >= (u16)-1)
249		return -ERANGE;
250
251	node->nd_ipv4_port = htons(tmp);
252
253	return count;
254}
255
256static ssize_t r2nm_node_ipv4_address_read(struct r2nm_node *node, char *page)
257{
258	return sprintf(page, "%pI4\n", &node->nd_ipv4_address);
259}
260
261static ssize_t r2nm_node_ipv4_address_write(struct r2nm_node *node,
262					    const char *page,
263					    size_t count)
264{
265	struct r2nm_cluster *cluster = to_r2nm_cluster_from_node(node);
266	int ret, i;
267	struct rb_node **p, *parent;
268	unsigned int octets[4];
269	__be32 ipv4_addr = 0;
270
271	ret = sscanf(page, "%3u.%3u.%3u.%3u", &octets[3], &octets[2],
272		     &octets[1], &octets[0]);
273	if (ret != 4)
274		return -EINVAL;
275
276	for (i = 0; i < ARRAY_SIZE(octets); i++) {
277		if (octets[i] > 255)
278			return -ERANGE;
279		be32_add_cpu(&ipv4_addr, octets[i] << (i * 8));
280	}
281
282	ret = 0;
283	write_lock(&cluster->cl_nodes_lock);
284	if (r2nm_node_ip_tree_lookup(cluster, ipv4_addr, &p, &parent))
285		ret = -EEXIST;
286	else {
287		rb_link_node(&node->nd_ip_node, parent, p);
288		rb_insert_color(&node->nd_ip_node, &cluster->cl_node_ip_tree);
289	}
290	write_unlock(&cluster->cl_nodes_lock);
291	if (ret)
292		return ret;
293
294	memcpy(&node->nd_ipv4_address, &ipv4_addr, sizeof(ipv4_addr));
295
296	return count;
297}
298
299static ssize_t r2nm_node_local_read(struct r2nm_node *node, char *page)
300{
301	return sprintf(page, "%d\n", node->nd_local);
302}
303
304static ssize_t r2nm_node_local_write(struct r2nm_node *node, const char *page,
305				     size_t count)
306{
307	struct r2nm_cluster *cluster = to_r2nm_cluster_from_node(node);
308	unsigned long tmp;
309	char *p = (char *)page;
310	ssize_t ret;
311	int err;
312
313	err = kstrtoul(p, 10, &tmp);
314	if (err)
315		return err;
316
317	tmp = !!tmp; /* boolean of whether this node wants to be local */
318
319	/* setting local turns on networking rx for now so we require having
320	 * set everything else first */
321	if (!test_bit(R2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
322	    !test_bit(R2NM_NODE_ATTR_NUM, &node->nd_set_attributes) ||
323	    !test_bit(R2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
324		return -EINVAL; /* XXX */
325
326	/* the only failure case is trying to set a new local node
327	 * when a different one is already set */
328	if (tmp && tmp == cluster->cl_has_local &&
329	    cluster->cl_local_node != node->nd_num)
330		return -EBUSY;
331
332	/* bring up the rx thread if we're setting the new local node. */
333	if (tmp && !cluster->cl_has_local) {
334		ret = r2net_start_listening(node);
335		if (ret)
336			return ret;
337	}
338
339	if (!tmp && cluster->cl_has_local &&
340	    cluster->cl_local_node == node->nd_num) {
341		r2net_stop_listening(node);
342		cluster->cl_local_node = R2NM_INVALID_NODE_NUM;
343	}
344
345	node->nd_local = tmp;
346	if (node->nd_local) {
347		cluster->cl_has_local = tmp;
348		cluster->cl_local_node = node->nd_num;
349	}
350
351	return count;
352}
353
354struct r2nm_node_attribute {
355	struct configfs_attribute attr;
356	ssize_t (*show)(struct r2nm_node *, char *);
357	ssize_t (*store)(struct r2nm_node *, const char *, size_t);
358};
359
360static struct r2nm_node_attribute r2nm_node_attr_num = {
361	.attr	= { .ca_owner = THIS_MODULE,
362		    .ca_name = "num",
363		    .ca_mode = S_IRUGO | S_IWUSR },
364	.show	= r2nm_node_num_read,
365	.store	= r2nm_node_num_write,
366};
367
368static struct r2nm_node_attribute r2nm_node_attr_ipv4_port = {
369	.attr	= { .ca_owner = THIS_MODULE,
370		    .ca_name = "ipv4_port",
371		    .ca_mode = S_IRUGO | S_IWUSR },
372	.show	= r2nm_node_ipv4_port_read,
373	.store	= r2nm_node_ipv4_port_write,
374};
375
376static struct r2nm_node_attribute r2nm_node_attr_ipv4_address = {
377	.attr	= { .ca_owner = THIS_MODULE,
378		    .ca_name = "ipv4_address",
379		    .ca_mode = S_IRUGO | S_IWUSR },
380	.show	= r2nm_node_ipv4_address_read,
381	.store	= r2nm_node_ipv4_address_write,
382};
383
384static struct r2nm_node_attribute r2nm_node_attr_local = {
385	.attr	= { .ca_owner = THIS_MODULE,
386		    .ca_name = "local",
387		    .ca_mode = S_IRUGO | S_IWUSR },
388	.show	= r2nm_node_local_read,
389	.store	= r2nm_node_local_write,
390};
391
392static struct configfs_attribute *r2nm_node_attrs[] = {
393	[R2NM_NODE_ATTR_NUM] = &r2nm_node_attr_num.attr,
394	[R2NM_NODE_ATTR_PORT] = &r2nm_node_attr_ipv4_port.attr,
395	[R2NM_NODE_ATTR_ADDRESS] = &r2nm_node_attr_ipv4_address.attr,
396	[R2NM_NODE_ATTR_LOCAL] = &r2nm_node_attr_local.attr,
397	NULL,
398};
399
400static int r2nm_attr_index(struct configfs_attribute *attr)
401{
402	int i;
403	for (i = 0; i < ARRAY_SIZE(r2nm_node_attrs); i++) {
404		if (attr == r2nm_node_attrs[i])
405			return i;
406	}
407	BUG();
408	return 0;
409}
410
411static ssize_t r2nm_node_show(struct config_item *item,
412			      struct configfs_attribute *attr,
413			      char *page)
414{
415	struct r2nm_node *node = to_r2nm_node(item);
416	struct r2nm_node_attribute *r2nm_node_attr =
417		container_of(attr, struct r2nm_node_attribute, attr);
418	ssize_t ret = 0;
419
420	if (r2nm_node_attr->show)
421		ret = r2nm_node_attr->show(node, page);
422	return ret;
423}
424
425static ssize_t r2nm_node_store(struct config_item *item,
426			       struct configfs_attribute *attr,
427			       const char *page, size_t count)
428{
429	struct r2nm_node *node = to_r2nm_node(item);
430	struct r2nm_node_attribute *r2nm_node_attr =
431		container_of(attr, struct r2nm_node_attribute, attr);
432	ssize_t ret;
433	int attr_index = r2nm_attr_index(attr);
434
435	if (r2nm_node_attr->store == NULL) {
436		ret = -EINVAL;
437		goto out;
438	}
439
440	if (test_bit(attr_index, &node->nd_set_attributes))
441		return -EBUSY;
442
443	ret = r2nm_node_attr->store(node, page, count);
444	if (ret < count)
445		goto out;
446
447	set_bit(attr_index, &node->nd_set_attributes);
448out:
449	return ret;
450}
451
452static struct configfs_item_operations r2nm_node_item_ops = {
453	.release		= r2nm_node_release,
454	.show_attribute		= r2nm_node_show,
455	.store_attribute	= r2nm_node_store,
456};
457
458static struct config_item_type r2nm_node_type = {
459	.ct_item_ops	= &r2nm_node_item_ops,
460	.ct_attrs	= r2nm_node_attrs,
461	.ct_owner	= THIS_MODULE,
462};
463
464/* node set */
465
466struct r2nm_node_group {
467	struct config_group ns_group;
468	/* some stuff? */
469};
470
471#if 0
472static struct r2nm_node_group *to_r2nm_node_group(struct config_group *group)
473{
474	return group ?
475		container_of(group, struct r2nm_node_group, ns_group)
476		: NULL;
477}
478#endif
479
480struct r2nm_cluster_attribute {
481	struct configfs_attribute attr;
482	ssize_t (*show)(struct r2nm_cluster *, char *);
483	ssize_t (*store)(struct r2nm_cluster *, const char *, size_t);
484};
485
486static ssize_t r2nm_cluster_attr_write(const char *page, ssize_t count,
487					unsigned int *val)
488{
489	unsigned long tmp;
490	char *p = (char *)page;
491	int err;
492
493	err = kstrtoul(p, 10, &tmp);
494	if (err)
495		return err;
496
497	if (tmp == 0)
498		return -EINVAL;
499	if (tmp >= (u32)-1)
500		return -ERANGE;
501
502	*val = tmp;
503
504	return count;
505}
506
507static ssize_t r2nm_cluster_attr_idle_timeout_ms_read(
508	struct r2nm_cluster *cluster, char *page)
509{
510	return sprintf(page, "%u\n", cluster->cl_idle_timeout_ms);
511}
512
513static ssize_t r2nm_cluster_attr_idle_timeout_ms_write(
514	struct r2nm_cluster *cluster, const char *page, size_t count)
515{
516	ssize_t ret;
517	unsigned int val = 0;
518
519	ret =  r2nm_cluster_attr_write(page, count, &val);
520
521	if (ret > 0) {
522		if (cluster->cl_idle_timeout_ms != val
523			&& r2net_num_connected_peers()) {
524			mlog(ML_NOTICE,
525			     "r2net: cannot change idle timeout after "
526			     "the first peer has agreed to it."
527			     "  %d connected peers\n",
528			     r2net_num_connected_peers());
529			ret = -EINVAL;
530		} else if (val <= cluster->cl_keepalive_delay_ms) {
531			mlog(ML_NOTICE, "r2net: idle timeout must be larger "
532			     "than keepalive delay\n");
533			ret = -EINVAL;
534		} else {
535			cluster->cl_idle_timeout_ms = val;
536		}
537	}
538
539	return ret;
540}
541
542static ssize_t r2nm_cluster_attr_keepalive_delay_ms_read(
543	struct r2nm_cluster *cluster, char *page)
544{
545	return sprintf(page, "%u\n", cluster->cl_keepalive_delay_ms);
546}
547
548static ssize_t r2nm_cluster_attr_keepalive_delay_ms_write(
549	struct r2nm_cluster *cluster, const char *page, size_t count)
550{
551	ssize_t ret;
552	unsigned int val = 0;
553
554	ret =  r2nm_cluster_attr_write(page, count, &val);
555
556	if (ret > 0) {
557		if (cluster->cl_keepalive_delay_ms != val
558		    && r2net_num_connected_peers()) {
559			mlog(ML_NOTICE,
560			     "r2net: cannot change keepalive delay after"
561			     " the first peer has agreed to it."
562			     "  %d connected peers\n",
563			     r2net_num_connected_peers());
564			ret = -EINVAL;
565		} else if (val >= cluster->cl_idle_timeout_ms) {
566			mlog(ML_NOTICE, "r2net: keepalive delay must be "
567			     "smaller than idle timeout\n");
568			ret = -EINVAL;
569		} else {
570			cluster->cl_keepalive_delay_ms = val;
571		}
572	}
573
574	return ret;
575}
576
577static ssize_t r2nm_cluster_attr_reconnect_delay_ms_read(
578	struct r2nm_cluster *cluster, char *page)
579{
580	return sprintf(page, "%u\n", cluster->cl_reconnect_delay_ms);
581}
582
583static ssize_t r2nm_cluster_attr_reconnect_delay_ms_write(
584	struct r2nm_cluster *cluster, const char *page, size_t count)
585{
586	return r2nm_cluster_attr_write(page, count,
587					&cluster->cl_reconnect_delay_ms);
588}
589
590static ssize_t r2nm_cluster_attr_fence_method_read(
591	struct r2nm_cluster *cluster, char *page)
592{
593	ssize_t ret = 0;
594
595	if (cluster)
596		ret = sprintf(page, "%s\n",
597			      r2nm_fence_method_desc[cluster->cl_fence_method]);
598	return ret;
599}
600
601static ssize_t r2nm_cluster_attr_fence_method_write(
602	struct r2nm_cluster *cluster, const char *page, size_t count)
603{
604	unsigned int i;
605
606	if (page[count - 1] != '\n')
607		goto bail;
608
609	for (i = 0; i < R2NM_FENCE_METHODS; ++i) {
610		if (count != strlen(r2nm_fence_method_desc[i]) + 1)
611			continue;
612		if (strncasecmp(page, r2nm_fence_method_desc[i], count - 1))
613			continue;
614		if (cluster->cl_fence_method != i) {
615			printk(KERN_INFO "ramster: Changing fence method to %s\n",
616			       r2nm_fence_method_desc[i]);
617			cluster->cl_fence_method = i;
618		}
619		return count;
620	}
621
622bail:
623	return -EINVAL;
624}
625
626static struct r2nm_cluster_attribute r2nm_cluster_attr_idle_timeout_ms = {
627	.attr	= { .ca_owner = THIS_MODULE,
628		    .ca_name = "idle_timeout_ms",
629		    .ca_mode = S_IRUGO | S_IWUSR },
630	.show	= r2nm_cluster_attr_idle_timeout_ms_read,
631	.store	= r2nm_cluster_attr_idle_timeout_ms_write,
632};
633
634static struct r2nm_cluster_attribute r2nm_cluster_attr_keepalive_delay_ms = {
635	.attr	= { .ca_owner = THIS_MODULE,
636		    .ca_name = "keepalive_delay_ms",
637		    .ca_mode = S_IRUGO | S_IWUSR },
638	.show	= r2nm_cluster_attr_keepalive_delay_ms_read,
639	.store	= r2nm_cluster_attr_keepalive_delay_ms_write,
640};
641
642static struct r2nm_cluster_attribute r2nm_cluster_attr_reconnect_delay_ms = {
643	.attr	= { .ca_owner = THIS_MODULE,
644		    .ca_name = "reconnect_delay_ms",
645		    .ca_mode = S_IRUGO | S_IWUSR },
646	.show	= r2nm_cluster_attr_reconnect_delay_ms_read,
647	.store	= r2nm_cluster_attr_reconnect_delay_ms_write,
648};
649
650static struct r2nm_cluster_attribute r2nm_cluster_attr_fence_method = {
651	.attr	= { .ca_owner = THIS_MODULE,
652		    .ca_name = "fence_method",
653		    .ca_mode = S_IRUGO | S_IWUSR },
654	.show	= r2nm_cluster_attr_fence_method_read,
655	.store	= r2nm_cluster_attr_fence_method_write,
656};
657
658static struct configfs_attribute *r2nm_cluster_attrs[] = {
659	&r2nm_cluster_attr_idle_timeout_ms.attr,
660	&r2nm_cluster_attr_keepalive_delay_ms.attr,
661	&r2nm_cluster_attr_reconnect_delay_ms.attr,
662	&r2nm_cluster_attr_fence_method.attr,
663	NULL,
664};
665static ssize_t r2nm_cluster_show(struct config_item *item,
666					struct configfs_attribute *attr,
667					char *page)
668{
669	struct r2nm_cluster *cluster = to_r2nm_cluster(item);
670	struct r2nm_cluster_attribute *r2nm_cluster_attr =
671		container_of(attr, struct r2nm_cluster_attribute, attr);
672	ssize_t ret = 0;
673
674	if (r2nm_cluster_attr->show)
675		ret = r2nm_cluster_attr->show(cluster, page);
676	return ret;
677}
678
679static ssize_t r2nm_cluster_store(struct config_item *item,
680					struct configfs_attribute *attr,
681					const char *page, size_t count)
682{
683	struct r2nm_cluster *cluster = to_r2nm_cluster(item);
684	struct r2nm_cluster_attribute *r2nm_cluster_attr =
685		container_of(attr, struct r2nm_cluster_attribute, attr);
686	ssize_t ret;
687
688	if (r2nm_cluster_attr->store == NULL) {
689		ret = -EINVAL;
690		goto out;
691	}
692
693	ret = r2nm_cluster_attr->store(cluster, page, count);
694	if (ret < count)
695		goto out;
696out:
697	return ret;
698}
699
700static struct config_item *r2nm_node_group_make_item(struct config_group *group,
701						     const char *name)
702{
703	struct r2nm_node *node = NULL;
704
705	if (strlen(name) > R2NM_MAX_NAME_LEN)
706		return ERR_PTR(-ENAMETOOLONG);
707
708	node = kzalloc(sizeof(struct r2nm_node), GFP_KERNEL);
709	if (node == NULL)
710		return ERR_PTR(-ENOMEM);
711
712	strcpy(node->nd_name, name); /* use item.ci_namebuf instead? */
713	config_item_init_type_name(&node->nd_item, name, &r2nm_node_type);
714	spin_lock_init(&node->nd_lock);
715
716	mlog(ML_CLUSTER, "r2nm: Registering node %s\n", name);
717
718	return &node->nd_item;
719}
720
721static void r2nm_node_group_drop_item(struct config_group *group,
722				      struct config_item *item)
723{
724	struct r2nm_node *node = to_r2nm_node(item);
725	struct r2nm_cluster *cluster =
726				to_r2nm_cluster(group->cg_item.ci_parent);
727
728	r2net_disconnect_node(node);
729
730	if (cluster->cl_has_local &&
731	    (cluster->cl_local_node == node->nd_num)) {
732		cluster->cl_has_local = 0;
733		cluster->cl_local_node = R2NM_INVALID_NODE_NUM;
734		r2net_stop_listening(node);
735	}
736
737	/* XXX call into net to stop this node from trading messages */
738
739	write_lock(&cluster->cl_nodes_lock);
740
741	/* XXX sloppy */
742	if (node->nd_ipv4_address)
743		rb_erase(&node->nd_ip_node, &cluster->cl_node_ip_tree);
744
745	/* nd_num might be 0 if the node number hasn't been set.. */
746	if (cluster->cl_nodes[node->nd_num] == node) {
747		cluster->cl_nodes[node->nd_num] = NULL;
748		clear_bit(node->nd_num, cluster->cl_nodes_bitmap);
749	}
750	write_unlock(&cluster->cl_nodes_lock);
751
752	mlog(ML_CLUSTER, "r2nm: Unregistered node %s\n",
753	     config_item_name(&node->nd_item));
754
755	config_item_put(item);
756}
757
758static struct configfs_group_operations r2nm_node_group_group_ops = {
759	.make_item	= r2nm_node_group_make_item,
760	.drop_item	= r2nm_node_group_drop_item,
761};
762
763static struct config_item_type r2nm_node_group_type = {
764	.ct_group_ops	= &r2nm_node_group_group_ops,
765	.ct_owner	= THIS_MODULE,
766};
767
768/* cluster */
769
770static void r2nm_cluster_release(struct config_item *item)
771{
772	struct r2nm_cluster *cluster = to_r2nm_cluster(item);
773
774	kfree(cluster->cl_group.default_groups);
775	kfree(cluster);
776}
777
778static struct configfs_item_operations r2nm_cluster_item_ops = {
779	.release	= r2nm_cluster_release,
780	.show_attribute		= r2nm_cluster_show,
781	.store_attribute	= r2nm_cluster_store,
782};
783
784static struct config_item_type r2nm_cluster_type = {
785	.ct_item_ops	= &r2nm_cluster_item_ops,
786	.ct_attrs	= r2nm_cluster_attrs,
787	.ct_owner	= THIS_MODULE,
788};
789
790/* cluster set */
791
792struct r2nm_cluster_group {
793	struct configfs_subsystem cs_subsys;
794	/* some stuff? */
795};
796
797#if 0
798static struct r2nm_cluster_group *
799to_r2nm_cluster_group(struct config_group *group)
800{
801	return group ?
802		container_of(to_configfs_subsystem(group),
803				struct r2nm_cluster_group, cs_subsys)
804	       : NULL;
805}
806#endif
807
808static struct config_group *
809r2nm_cluster_group_make_group(struct config_group *group,
810							  const char *name)
811{
812	struct r2nm_cluster *cluster = NULL;
813	struct r2nm_node_group *ns = NULL;
814	struct config_group *r2hb_group = NULL, *ret = NULL;
815	void *defs = NULL;
816
817	/* this runs under the parent dir's i_mutex; there can be only
818	 * one caller in here at a time */
819	if (r2nm_single_cluster)
820		return ERR_PTR(-ENOSPC);
821
822	cluster = kzalloc(sizeof(struct r2nm_cluster), GFP_KERNEL);
823	ns = kzalloc(sizeof(struct r2nm_node_group), GFP_KERNEL);
824	defs = kcalloc(3, sizeof(struct config_group *), GFP_KERNEL);
825	r2hb_group = r2hb_alloc_hb_set();
826	if (cluster == NULL || ns == NULL || r2hb_group == NULL || defs == NULL)
827		goto out;
828
829	config_group_init_type_name(&cluster->cl_group, name,
830				    &r2nm_cluster_type);
831	config_group_init_type_name(&ns->ns_group, "node",
832				    &r2nm_node_group_type);
833
834	cluster->cl_group.default_groups = defs;
835	cluster->cl_group.default_groups[0] = &ns->ns_group;
836	cluster->cl_group.default_groups[1] = r2hb_group;
837	cluster->cl_group.default_groups[2] = NULL;
838	rwlock_init(&cluster->cl_nodes_lock);
839	cluster->cl_node_ip_tree = RB_ROOT;
840	cluster->cl_reconnect_delay_ms = R2NET_RECONNECT_DELAY_MS_DEFAULT;
841	cluster->cl_idle_timeout_ms    = R2NET_IDLE_TIMEOUT_MS_DEFAULT;
842	cluster->cl_keepalive_delay_ms = R2NET_KEEPALIVE_DELAY_MS_DEFAULT;
843	cluster->cl_fence_method       = R2NM_FENCE_RESET;
844
845	ret = &cluster->cl_group;
846	r2nm_single_cluster = cluster;
847
848out:
849	if (ret == NULL) {
850		kfree(cluster);
851		kfree(ns);
852		r2hb_free_hb_set(r2hb_group);
853		kfree(defs);
854		ret = ERR_PTR(-ENOMEM);
855	}
856
857	return ret;
858}
859
860static void r2nm_cluster_group_drop_item(struct config_group *group,
861						struct config_item *item)
862{
863	struct r2nm_cluster *cluster = to_r2nm_cluster(item);
864	int i;
865	struct config_item *killme;
866
867	BUG_ON(r2nm_single_cluster != cluster);
868	r2nm_single_cluster = NULL;
869
870	for (i = 0; cluster->cl_group.default_groups[i]; i++) {
871		killme = &cluster->cl_group.default_groups[i]->cg_item;
872		cluster->cl_group.default_groups[i] = NULL;
873		config_item_put(killme);
874	}
875
876	config_item_put(item);
877}
878
879static struct configfs_group_operations r2nm_cluster_group_group_ops = {
880	.make_group	= r2nm_cluster_group_make_group,
881	.drop_item	= r2nm_cluster_group_drop_item,
882};
883
884static struct config_item_type r2nm_cluster_group_type = {
885	.ct_group_ops	= &r2nm_cluster_group_group_ops,
886	.ct_owner	= THIS_MODULE,
887};
888
889static struct r2nm_cluster_group r2nm_cluster_group = {
890	.cs_subsys = {
891		.su_group = {
892			.cg_item = {
893				.ci_namebuf = "cluster",
894				.ci_type = &r2nm_cluster_group_type,
895			},
896		},
897	},
898};
899
900int r2nm_depend_item(struct config_item *item)
901{
902	return configfs_depend_item(&r2nm_cluster_group.cs_subsys, item);
903}
904
905void r2nm_undepend_item(struct config_item *item)
906{
907	configfs_undepend_item(&r2nm_cluster_group.cs_subsys, item);
908}
909
910int r2nm_depend_this_node(void)
911{
912	int ret = 0;
913	struct r2nm_node *local_node;
914
915	local_node = r2nm_get_node_by_num(r2nm_this_node());
916	if (!local_node) {
917		ret = -EINVAL;
918		goto out;
919	}
920
921	ret = r2nm_depend_item(&local_node->nd_item);
922	r2nm_node_put(local_node);
923
924out:
925	return ret;
926}
927
928void r2nm_undepend_this_node(void)
929{
930	struct r2nm_node *local_node;
931
932	local_node = r2nm_get_node_by_num(r2nm_this_node());
933	BUG_ON(!local_node);
934
935	r2nm_undepend_item(&local_node->nd_item);
936	r2nm_node_put(local_node);
937}
938
939
940static void __exit exit_r2nm(void)
941{
942	/* XXX sync with hb callbacks and shut down hb? */
943	r2net_unregister_hb_callbacks();
944	configfs_unregister_subsystem(&r2nm_cluster_group.cs_subsys);
945
946	r2net_exit();
947	r2hb_exit();
948}
949
950static int __init init_r2nm(void)
951{
952	int ret = -1;
953
954	ret = r2hb_init();
955	if (ret)
956		goto out;
957
958	ret = r2net_init();
959	if (ret)
960		goto out_r2hb;
961
962	ret = r2net_register_hb_callbacks();
963	if (ret)
964		goto out_r2net;
965
966	config_group_init(&r2nm_cluster_group.cs_subsys.su_group);
967	mutex_init(&r2nm_cluster_group.cs_subsys.su_mutex);
968	ret = configfs_register_subsystem(&r2nm_cluster_group.cs_subsys);
969	if (ret) {
970		printk(KERN_ERR "nodemanager: Registration returned %d\n", ret);
971		goto out_callbacks;
972	}
973
974	if (!ret)
975		goto out;
976
977	configfs_unregister_subsystem(&r2nm_cluster_group.cs_subsys);
978out_callbacks:
979	r2net_unregister_hb_callbacks();
980out_r2net:
981	r2net_exit();
982out_r2hb:
983	r2hb_exit();
984out:
985	return ret;
986}
987
988MODULE_AUTHOR("Oracle");
989MODULE_LICENSE("GPL");
990
991module_init(init_r2nm)
992module_exit(exit_r2nm)