在比较早的那些年,我曾经写了一个负载均衡调度算法模块,是基于应用层协议包任意偏移量开始的一段固定长度的数据计算一个值,然后将这个值hash到不同的服务器。那时觉得没啥用,就没有再继续,直到前一段时间的一段思考以及前几天的一次预研。我决定作文以记之,以后说不定能用得着。
1.UDP服务的负载均衡
以前使用UDP的服务很少,虽然HTTP并没有说一定要是TCP,但事实上几乎没有UDP上的HTTP。但是随着网络可靠性的增加,网络集中控制机制与分布式优化技术的日益成熟,使用UDP的场合越来越多。
使用UDP就意味着你必须在应用层做传输控制,其实这还不是主要的,主要的问题是现在没有什么熟知的UDP服务,比如你不能指望在负载均衡器上内置一个关于OpenVPN服务的负载均衡,但是对于基于TCP的HTTP服务几乎总是被内置于任何网关内部。因为作为一个著名的应用层协议,HTTP在各个层面都拥有自己的一套成熟的标准,大家均认可这些标准。使用UDP你必须实现一个符合常理的连接过期机制,由于在UDP层面根本就不可能识别一个"连接"的断开,这就意味着要么在应用层识别,比如发送一个特殊的UDP包表示要“断开”了,要么就是对一个UDP“连接”设置一个超时。
虽然存在这么多的问题,但是在移动时代,有些问题还真必须使用UDP作为传输协议才能解决。
2.移动网络的问题
如果使用手机或PAD访问服务,由于这些移动终端时刻处于移动中,其IP地址也会不断变化(请不要考虑LISP,这只是个理想),如果使用TCP作为服务的承载协议,那就意味着TCP会不断地断开再重连-TCP和IP是相关的,如果使用UDP,就没有这个问题,代价只是在应用层记录连接信息。这是一个会话层缺失的问题,虽然有人不太认同,但是毕竟键盘党喷子说再多也没有用,实现一个这样的机制跑出来效果才是王道。鉴于此,我给OpenVPN做了手术。
OpenVPN也是用5元组来识别一个特定客户端的,但是由于存在终端移动IP地址变化的问题,这会导致OpenVPN服务端频繁断开和客户端的连接然后等待重连,虽然这不是由于TCP导致的,但是却道出了一个问题的本质,只要是用5元组来识别连接,IP地址的变化都会导致连接断开。因此我在OpenVPN协议的头里面加了一个服务器内部唯一的4个字节的所谓sessionID用以补充缺失的会话层。以后OpenVPN服务端不再用5元组来识别到一个客户端的连接了,而是使用这个唯一的sessionID来识别,这样对于UDP的情况,即便是客户端的IP地址发生变化,服务端也不会断开连接,因为sessionID没有变化。注意,这个对于TCP模式的服务是没有用的,因为TCP处在传输层,在OpenVPN识别到sessionID以前,TCP本身就先断开了,除非在accept调用之上再封装一层,做到虽然TCP连接(TCP连接)在不断的断开/重连,但是OpenVPN连接(会话层连接)始终不会断。但是由于工作量比较大,作罢。
在强大的功能展现的效果面前,任何的唧唧歪歪都是苍白的。通过引入一个很小的字段(4字节或者2字节),完美解决了“UDP长连接”(还真不能用TCP,除非引入LISP)时IP地址切换的问题,这就是UDP的力量。OpenVPN如此,为何别的就不行。事实上,任何的应用层协议都可以用UDP来封装,将连接控制(连接,排序,重传,断开等)等操纵进行标准化置于上层即可。然而,如果客户端的IP地址不断变化,负载均衡器还能基于源IP做负载均衡吗?
很显然是可以的,但是却是有问题的。因为有可能在同一客户端变化了IP地址之后,负载均衡器会将其分发到不同的服务器上,然而实际上,它们的sessionID并没有变化,因为将不能再根据源IP地址做负载均衡了。那怎么办?答案就是基于sessionID做负载均衡。
3.基于UDP协议应用层的sessionID做负载均衡
一步一步地,我们就走到了这里,现在必须回答的问题是如何做。sessionID是什么?它并非标准协议的一部分。首先你必须保证数据包中一定要有这个字段,这个一般可以保证,我肯定知道我在配置什么东西,其次,问题是这个sessionID在什么地方?这决不能强行规定。事实上,所谓的sessionID就是在一次连接中,数据包中不会变化的那个部分,仅此。因此,最好的办法就是让配置者自己决定它在什么地方以及它的长度是多少。
有了相对应用层开始的偏移和长度,取字段和算HASH就犹如探囊取物了,几乎和取源IP一样,只是多了几个计算而已,IPVS的代码如下:
net/netfilter/ipvs/ip_vs_offh.c:
/*
* IPVS: Layer7 payload Hashing scheduling module
*
* Authors: ZHAOYA
* 基于ip_vs_sh/dh修改而来,详细注释请参见:
* net/netfilter/ipvs/ip_vs_sh.c
* net/netfilter/ipvs/ip_vs_dh.c
*/
#include <linux/ip.h>
#include <linux/tcp.h>
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/skbuff.h>
#include <linux/ctype.h>
#include <net/ip.h>
#include <net/ip_vs.h>
struct ip_vs_offh_bucket {
struct ip_vs_dest *dest;
};
struct ip_vs_offh_data {
struct ip_vs_offh_bucket *tbl;
u32 offset;
u32 offlen;
};
#define IP_VS_OFFH_TAB_BITS 8
#define IP_VS_OFFH_TAB_SIZE (1 << IP_VS_OFFH_TAB_BITS)
#define IP_VS_OFFH_TAB_MASK (IP_VS_OFFH_TAB_SIZE - 1)
/*
* 全局变量
* offset:Layer7计算hash值的payload偏移量(相对于Layer7头)
* offlen:Layer7计算hash值的payload长度
*/
static u32 offset, offlen;
static int skip_atoi(char **s)
{
int i=0;
while (isdigit(**s))
i = i*10 + *((*s)++) - '0';
return i;
}
static inline struct ip_vs_dest *
ip_vs_offh_get(struct ip_vs_offh_bucket *tbl, const char *payload, u32 length)
{
__be32 v_fold = 0;
/* 算法有待优化 */
v_fold = (payload[0]^payload[length>>2]^payload[length])*2654435761UL;
return (tbl[v_fold & IP_VS_OFFH_TAB_MASK]).dest;
}
static int
ip_vs_offh_assign(struct ip_vs_offh_bucket *tbl, struct ip_vs_service *svc)
{
int i;
struct ip_vs_offh_bucket *b;
struct list_head *p;
struct ip_vs_dest *dest;
b = tbl;
p = &svc->destinations;
for (i=0; i<IP_VS_OFFH_TAB_SIZE; i++) {
if (list_empty(p)) {
b->dest = NULL;
} else {
if (p == &svc->destinations)
p = p->next;
dest = list_entry(p, struct ip_vs_dest, n_list);
atomic_inc(&dest->refcnt);
b->dest = dest;
p = p->next;
}
b++;
}
return 0;
}
static void ip_vs_offh_flush(struct ip_vs_offh_bucket *tbl)
{
int i;
struct ip_vs_offh_bucket *b;
b = tbl;
for (i=0; i<IP_VS_OFFH_TAB_SIZE; i++) {
if (b->dest) {
atomic_dec(&b->dest->refcnt);
b->dest = NULL;
}
b++;
}
}
static int ip_vs_offh_init_svc(struct ip_vs_service *svc)
{
struct ip_vs_offh_data *pdata;
struct ip_vs_offh_bucket *tbl;
pdata = kmalloc(sizeof(struct ip_vs_offh_data), GFP_ATOMIC);
if (pdata == NULL) {
pr_err("%s(): no memory\n", __func__);
return -ENOMEM;
}
tbl = kmalloc(sizeof(struct ip_vs_offh_bucket)*IP_VS_OFFH_TAB_SIZE,
GFP_ATOMIC);
if (tbl == NULL) {
kfree(pdata);
pr_err("%s(): no memory\n", __func__);
return -ENOMEM;
}
pdata->tbl = tbl;
pdata->offset = 0;
pdata->offlen = 0;
svc->sched_data = pdata;
ip_vs_offh_assign(tbl, svc);
return 0;
}
static int ip_vs_offh_done_svc(struct ip_vs_service *svc)
{
struct ip_vs_offh_data *pdata = svc->sched_data;
struct ip_vs_offh_bucket *tbl = pdata->tbl;
ip_vs_offh_flush(tbl);
kfree(tbl);
kfree(pdata);
return 0;
}
static int ip_vs_offh_update_svc(struct ip_vs_service *svc)
{
struct ip_vs_offh_bucket *tbl = svc->sched_data;
ip_vs_offh_flush(tbl);
ip_vs_offh_assign(tbl, svc);
return 0;
}
static inline int is_overloaded(struct ip_vs_dest *dest)
{
return dest->flags & IP_VS_DEST_F_OVERLOAD;
}
static struct ip_vs_dest *
ip_vs_offh_schedule(struct ip_vs_service *svc, const struct sk_buff *skb)
{
struct ip_vs_dest *dest;
struct ip_vs_offh_data *pdata;
struct ip_vs_offh_bucket *tbl;
struct iphdr *iph;
void *transport_hdr;
char *payload;
u32 hdrlen = 0;
u32 _offset = 0;
u32 _offlen = 0;
iph = ip_hdr(skb);
hdrlen = iph->ihl*4;
if (hdrlen > skb->len) {
return NULL;
}
transport_hdr = (void *)iph + hdrlen;
switch (iph->protocol) {
case IPPROTO_TCP:
hdrlen += (((struct tcphdr*)transport_hdr)->doff)*4;
break;
case IPPROTO_UDP:
hdrlen += sizeof(struct udphdr);
break;
default:
return NULL;
}
#if 0
{
int i = 0;
_offset = offset;
_offlen = offlen;
payload = (char *)iph + hdrlen + _offset;
printk("begin:iplen:%d \n", hdrlen);
for (i = 0; i < _offlen; i++) {
printk("%02X ", payload[i]);
}
printk("\nend\n");
return NULL;
}
#endif
pdata = (struct ip_vs_offh_datai *)svc->sched_data;
tbl = pdata->tbl;
_offset = offset;//pdata->offset;
_offlen = offlen;//pdata->offlen;
if (_offlen + _offset > skb->len - hdrlen) {
IP_VS_ERR_RL("OFFH: exceed\n");
return NULL;
}
payload = (char *)iph + hdrlen + _offset;
dest = ip_vs_offh_get(tbl, payload, _offlen);
if (!dest
|| !(dest->flags & IP_VS_DEST_F_AVAILABLE)
|| atomic_read(&dest->weight) <= 0
|| is_overloaded(dest)) {
IP_VS_ERR_RL("OFFH: no destination available\n");
return NULL;
}
return dest;
}
static struct ip_vs_scheduler ip_vs_offh_scheduler =
{
.name = "offh",
.refcnt = ATOMIC_INIT(0),
.module = THIS_MODULE,
.n_list = LIST_HEAD_INIT(ip_vs_offh_scheduler.n_list),
.init_service = ip_vs_offh_init_svc,
.done_service = ip_vs_offh_done_svc,
.update_service = ip_vs_offh_update_svc,
.schedule = ip_vs_offh_schedule,
};
static ssize_t ipvs_sch_offset_read(struct file *file, char __user *buf, size_t count, loff_t *ppos)
{
int ret = 0;
ret = sprintf(buf, "offset:%u;offlen:%u\n", offset, offlen);
return ret;
}
/*
* 设置offset/offset length
* echo offset:$value1 offlen:$value2 >/proc/net/ipvs_sch_offset
*/
static int ipvs_sch_offset_write(struct file *file, const char __user *buf, size_t count, loff_t *ppos)
{
int ret = count;
char *p = buf, *pstart;
if ((p = strstr(p, "offset:")) == NULL) {
ret = -EINVAL;
goto out;
}
p += strlen("offset:");
pstart = p;
if ((p = strstr(p, " ")) == NULL) {
ret = -EINVAL;
goto out;
}
p[0] = 0;
offset = skip_atoi(&pstart);
if (offset == 0 && strcmp(pstart, "0")) {
ret = -EINVAL;
goto out;
}
p += strlen(";");
if ((p = strstr(p, "offlen:")) == NULL) {
ret = -EINVAL;
goto out;
}
p += strlen("offlen:");
pstart = p;
offlen = skip_atoi(&pstart);
if (offlen == 0 && strcmp(pstart, "0")) {
ret = -EINVAL;
goto out;
}
out:
return ret;
}
/*
* 由于不想修改用户态的配置接口,还是觉得procfs这种方式比较靠普
**/
static const struct file_operations ipvs_sch_offset_file_ops = {
.owner = THIS_MODULE,
.read = ipvs_sch_offset_read,
.write = ipvs_sch_offset_write,
};
struct net *net = &init_net;
static int __init ip_vs_offh_init(void)
{
int ret = -1;
if (!proc_create("ipvs_sch_offset", 0644, net->proc_net, &ipvs_sch_offset_file_ops)) {
printk("OFFH: create proc entry failed\n");
goto out;
}
return register_ip_vs_scheduler(&ip_vs_offh_scheduler);
out:
return ret;
}
static void __exit ip_vs_offh_cleanup(void)
{
remove_proc_entry("ipvs_sch_offset", net->proc_net);
unregister_ip_vs_scheduler(&ip_vs_offh_scheduler);
}
module_init(ip_vs_offh_init);
module_exit(ip_vs_offh_cleanup);
MODULE_LICENSE("GPL");