二十redis之gossip协议
二十redis之gossip协议
gossip协议是p2p方式的通信协议。通过节点之间不断交换信息,一段时间后所有节点都会知道整个集群完整的信息。
gossip算法,意思是八卦算法,在办公室中只要一个人八卦一下,在有限的时间内,办公室内的所有人都会知道八卦消息。
算法过程:集群中的一个节点广播自身信息,部分节点收到了信息,这些节点再继续在集群中传播这个节点的信息,一段时间后整个集群中都
有了这个节点的信息。实际上是gossip大部分节点都在一直做这个操作,所以集群在一段时间后信息透明。
通信过程
- 每一个节点有两个TCP端口:一个client访问的端口,一个节点间通信端口,通信端口号等于client访问端口加10000
- 每个节点在固定周期内通过特定规则选择几个节点发送ping消息。
- 接受到ping消息的节点会用Pong消息作为响应。
协议
消息类型分为: ping, pong, meet, fail
gossip协议消息由 消息头+ 消息体组成。
消息头:
typedef struct {
char sig[4]; // 消息标识 RCmb
uint32_t totlen; // 消息的总长度
uint16_t ver; // 协议版本 当前是 1
uint16_t port; // 基础端口号 client与server之间通信的端口
uint16_t type; // 消息类型
uint16_t count; // 如果是ping,pong表示消息体中的节点数
uint64_t currentEpoch; //当前发送节点的配置纪元
uint64_t configEpoch; // 主节点/从节点的主节点配置纪元
uint64_t offset; // 复制偏移量
char sender[CLUSTER_NAMELEN];// 当前发送节点的nodeId
unsigned char myslots[CLUSTER_SLOTS/8]; // 当前节点负责的槽信息
char slaveof[CLUSTER_NAMELEN]; //如果发送节点是从节点,记录对应主节点的nodeId
char myip[NET_IP_STR_LEN]; // 当前节点的ip
char notused1[34]; ///
uint16_t cport; //集群节点间通信端口
uint16_t flags; // 发送节点标识 区分主从、是否下线
unsigned char state; // 发送节点所处的结群状态
unsigned char mflags[3]; // 消息标识
union clusterMsgData data; // 消息体
} clusterMsg;
消息类型
#define CLUSTERMSG_TYPE_PING 0 /* Ping */
#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */
#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */
#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
消息体:
union clusterMsgData {
/* PING, MEET and PONG */
struct {
// 数组类型携带多个节点的信息
clusterMsgDataGossip gossip[1];
} ping;
// 失败节点信息
struct {
clusterMsgDataFail about;
} fail;
/* PUBLISH */
struct {
clusterMsgDataPublish msg;
} publish;
/* UPDATE */
struct {
clusterMsgDataUpdate nodecfg;
} update;
};
节点Ping,pong消息体结构
typedef struct {
char nodename[CLUSTER_NAMELEN]; // nodeId
uint32_t ping_sent; // 最近一次Ping消息时间
uint32_t pong_received; //最近一次收到Pong时间
char ip[NET_IP_STR_LEN]; // node的ip
uint16_t port; //node的基础端口
uint16_t cport; //node集群间节点通信端口
uint16_t flags; //节点标识
uint32_t notused1;
} clusterMsgDataGossip;
节点fail消息体结构
typedef struct {
char nodename[CLUSTER_NAMELEN]; // 失败nodeId
} clusterMsgDataFail;
ping —> pong 节点间通信
ping消息封装了自身和部分其他节点的状态数据
//方法调用链
clusterSendPing() --> clusterSetGossipEntry() --> clusterSendMessage()
clusterSendPing
void clusterSendPing(clusterLink *link, int type) {
unsigned char *buf;
// 发送的消息
clusterMsg *hdr;
// 消息体中包含的节点数
int gossipcount = 0;
// 接受通知的节点数量
int wanted;
// 消息总长度
int totlen;
int freshnodes = dictSize(server.cluster->nodes)-2;
// 取集群中的10%的节点
wanted = floor(dictSize(server.cluster->nodes)/10);
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;
/* Include all the nodes in PFAIL state, so that failure reports are * faster to propagate to go from PFAIL to FAIL state. */
int pfail_wanted = server.cluster->stats_pfail_nodes;
/* Compute the maxium totlen to allocate our buffer. We'll fix the totlen * later according to the number of gossip sections we really were able * to put inside the packet. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least * sizeof(clusterMsg) or more. */
if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
buf = zcalloc(totlen);
hdr = (clusterMsg*) buf;
/* Populate the header. */
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
// 初始化消息头
clusterBuildMessageHdr(hdr,type);
/* Populate the gossip fields */
int maxiterations = wanted*3;
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
// 随机选一个节点
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
/* Don't include this node: the whole packet header is about us * already, so we just gossip about other nodes. */
// 因为消息头已经包含当前节点信息,所以消息体就不需要了
if (this == myself) continue;
/* PFAIL nodes will be added later. */
//如果节点是失败状态,则进行ping-pong
if (this->flags & CLUSTER_NODE_PFAIL) continue;
/* In the gossip section don't include: * 1) Nodes in HANDSHAKE state. * 3) Nodes with the NOADDR flag set. * 4) Disconnected nodes if they don't have configured slots. */
if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
(this->link == NULL && this->numslots == 0))
{
freshnodes--; /* Tecnically not correct, but saves CPU. */
continue;
}
/* Do not add a node we already have. */
if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
// 添加节点到消息体中
clusterSetGossipEntry(hdr,gossipcount,this);
freshnodes--;
gossipcount++;
}
/* If there are PFAIL nodes, add them at the end. */
if (pfail_wanted) {
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
clusterNode *node = dictGetVal(de);
if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
if (node->flags & CLUSTER_NODE_NOADDR) continue;
if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
clusterSetGossipEntry(hdr,gossipcount,node);
freshnodes--;
gossipcount++;
/* We take the count of the slots we allocated, since the * PFAIL stats may not match perfectly with the current number * of PFAIL nodes. */
pfail_wanted--;
}
dictReleaseIterator(di);
}
/* Ready to send... fix the totlen fiend and queue the message in the * output buffer. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
hdr->count = htons(gossipcount);
hdr->totlen = htonl(totlen);
clusterSendMessage(link,buf,totlen);
zfree(buf);
}
#define CLUSTER_BROADCAST_ALL 0
#define CLUSTER_BROADCAST_LOCAL_SLAVES 1
void clusterBroadcastPong(int target) {
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (!node->link) continue;
if (node == myself || nodeInHandshake(node)) continue;
if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
int local_slave =
nodeIsSlave(node) && node->slaveof &&
(node->slaveof == myself || node->slaveof == myself->slaveof);
if (!local_slave) continue;
}
clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
}
dictReleaseIterator(di);
}
clusterSetGossipEntry
// 构造消息体的单个节点信息
void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
clusterMsgDataGossip *gossip;
// 节点加入到消息体中的第i个
gossip = &(hdr->data.ping.gossip[i]);
memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
gossip->ping_sent = htonl(n->ping_sent/1000);
gossip->pong_received = htonl(n->pong_received/1000);
memcpy(gossip->ip,n->ip,sizeof(n->ip));
gossip->port = htons(n->port);
gossip->cport = htons(n->cport);
gossip->flags = htons(n->flags);
gossip->notused1 = 0;
}
meet —> pong 新节点加入
新加入节点发送meet消息到集群内任一节点(节点B),通知有新节点加入,节点B加入新节点到自身保存的节点信息,节点B与集群内的节点进行ping-ping通信,
最终集群内的所有节点都保存了新的节点信息。
pong —> other node 广播自身节点信息
pong消息封装了节点自身状态信息。当收到meet,ping消息,作为响应回复给发送方确认通信正常。节点也可以向集群内广播pong消息来通知整个集群对自身状态进行更新。
fial —> other node 广播节点失败
当节点判定集群内另一个节点下线,会向集群内广播一个fail消息,其他节点收到fail消息,会把对应的节点更新为下线状态。
// 方法调用链:
markNodeAsFailingIfNeeded() ---> clusterSendFail() --> clusterBuildMessageHdr() --> clusterBroadcastMessage() --> clusterSendMessage()
markNodeAsFailingIfNeeded --> clusterNodeFailureReportsCount --> clusterNodeCleanupFailureReports
markNodeAsFailingIfNeeded
void markNodeAsFailingIfNeeded(clusterNode *node) {
// 集群内投票node失败的票数
int failures;
// 判定node失败下线需要的票数
int needed_quorum = (server.cluster->size / 2) + 1;
// 如果超时时间未到,则不处理
if (!nodeTimedOut(node)) return;
// 如果节点已经判定失败,则不处理
// #define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL)
if (nodeFailed(node)) return;
// 集群内投票node失败的票数
failures = clusterNodeFailureReportsCount(node);
// 如果当前节点是主节点,参与投票
if (nodeIsMaster(myself)) failures++;
// 如果票数满足,则接下来发送fail通知集群内其他节点
if (failures < needed_quorum) return;
serverLog(LL_NOTICE,
"Marking node %.40s as failing (quorum reached).", node->name);
// 设置节点失败状态
node->flags &= ~CLUSTER_NODE_PFAIL;
node->flags |= CLUSTER_NODE_FAIL;
node->fail_time = mstime();
// 如果当前节点是主节点,则在集群内广播node失败的消息
if (nodeIsMaster(myself)) clusterSendFail(node->name);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
clusterNodeFailureReportsCount
int clusterNodeFailureReportsCount(clusterNode *node) {
// 清理无效的失败票数
clusterNodeCleanupFailureReports(node);
// 认为node失败的票数
return listLength(node->fail_reports);
}
/** ** 当投票失败时间大于maxtime,则认为失败投票无效 **/
void clusterNodeCleanupFailureReports(clusterNode *node) {
// 投票认为node失败的节点集合
list *l = node->fail_reports;
listNode *ln;
listIter li;
clusterNodeFailReport *fr;
// 最大时间 = 节点超时时间 * 2
mstime_t maxtime = server.cluster_node_timeout *
CLUSTER_FAIL_REPORT_VALIDITY_MULT;
mstime_t now = mstime();
listRewind(l,&li);
while ((ln = listNext(&li)) != NULL) {
fr = ln->value;
// 节点移除,票数减一
if (now - fr->time > maxtime) listDelNode(l,ln);
}
}
clusterSendFail
// 节点Nodename失败,通知集群内的其他节点
void clusterSendFail(char *nodename) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
// 构造消息头
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
//设置消息体fail的节点id
memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
// 通知集群内的部分节点
clusterBroadcastMessage(buf,ntohl(hdr->totlen));
}
clusterBuildMessageHdr
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
int totlen = 0;
uint64_t offset;
clusterNode *master;
// 如果当前节点是salve,则master为其主节点,如果当前节点是master节点,则master为当前
master = (nodeIsSlave(myself) && myself->slaveof) ?
myself->slaveof : myself;
memset(hdr,0,sizeof(*hdr));
// 初始化协议版本、标识、及类型,
hdr->ver = htons(CLUSTER_PROTO_VER);
hdr->sig[0] = 'R';
hdr->sig[1] = 'C';
hdr->sig[2] = 'm';
hdr->sig[3] = 'b';
hdr->type = htons(type);
// 消息头设置当前节点id
memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
// 消息头设置当前节点ip
memset(hdr->myip,0,NET_IP_STR_LEN);
if (server.cluster_announce_ip) {
strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
hdr->myip[NET_IP_STR_LEN-1] = '\0';
}
// 基础端口及集群内节点通信端口
int announced_port = server.cluster_announce_port ?
server.cluster_announce_port : server.port;
int announced_cport = server.cluster_announce_bus_port ?
server.cluster_announce_bus_port :
(server.port + CLUSTER_PORT_INCR);
// 当前节点的槽信息
memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
memset(hdr->slaveof,0,CLUSTER_NAMELEN);
if (myself->slaveof != NULL)
memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
hdr->port = htons(announced_port);
hdr->cport = htons(announced_cport);
hdr->flags = htons(myself->flags);
hdr->state = server.cluster->state;
/* Set the currentEpoch and configEpochs. */
hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
hdr->configEpoch = htonu64(master->configEpoch);
// 设置复制偏移量
if (nodeIsSlave(myself))
offset = replicationGetSlaveOffset();
else
offset = server.master_repl_offset;
hdr->offset = htonu64(offset);
/* Set the message flags. */
if (nodeIsMaster(myself) && server.cluster->mf_end)
hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
// 计算并设置消息的总长度
if (type == CLUSTERMSG_TYPE_FAIL) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataFail);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataUpdate);
}
hdr->totlen = htonl(totlen);
}
clusterBroadcastMessage
void clusterBroadcastMessage(void *buf, size_t len) {
dictIterator *di;
dictEntry *de;
// 集群内节点创建迭代器
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
// 得到一个集群中的一个节点
clusterNode *node = dictGetVal(de);
// 是否能发送消息
if (!node->link) continue;
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
//发送消息
clusterSendMessage(node->link,buf,len);
}
// 释放迭代器
dictReleaseIterator(di);
}
clusterSendMessage
// 发送消息 link: 接受消息的节点 msg: 消息内容 msglen: 消息长度
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
if (sdslen(link->sndbuf) == 0 && msglen != 0)
aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
clusterWriteHandler,link);
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
/* Populate sent messages stats. */
clusterMsg *hdr = (clusterMsg*) msg;
uint16_t type = ntohs(hdr->type);
if (type < CLUSTERMSG_TYPE_COUNT)
server.cluster->stats_bus_messages_sent[type]++;
}
如何选择部分节点
// 方法调用链
serverCron() --> clusterCron() --> clusterSendPing()
redis保持有一个定时任务,1s运行10次。
- 每秒随机5次找出最久没有通信的节点
最后通信时间大于node_timeout/2
// 100ms一次,1s运行10次
run_with_period(100) {if (server.cluster_enabled) clusterCron();
}
// 每秒随机5次找出最久没有通信的节点
if (!(iteration % 10)) {int j;
/* Check a few random nodes and ping the one with the oldest * pong_received time. */
for (j = 0; j < 5; j++) {
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
/* Don't ping nodes disconnected or with a ping currently active. */
if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}
if (min_pong_node) {
serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}
// 最后通信时间大于node_timeout/2
if (node->link &&node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
每次ping会带最多1/10的节点信息,最少3个节点信息(server.cluster->nodes=6, 三主三从)
//clusterSendPing()方法部分代码段
int freshnodes = dictSize(server.cluster->nodes)-2;
wanted = floor(dictSize(server.cluster->nodes)/10);
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;
//want数为消息携带的节点信息
为什么要携带1/10的节点信息
How many gossip sections we want to add? 1/10 of the number of nodes
and anyway at least 3. Why 1/10?
If we have N masters, with N/10 entries, and we consider that in
node_timeout we exchange with each other node at least 4 packets
(we ping in the worst case in node_timeout/2 time, and we also
receive two pings from the host), we have a total of 8 packets
in the node_timeout*2 falure reports validity time. So we have
that, for a single PFAIL node, we can expect to receive the following
number of failure reports (in the specified window of time):
PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
PROB = probability of being featured in a single gossip entry,
which is 1 / NUM_OF_NODES.
ENTRIES = 10.
TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
If we assume we have just masters (so num of nodes and num of masters
is the same), with 1/10 we always get over the majority, and specifically
80% of the number of nodes, to account for many masters failing at the
same time.
Since we have non-voting slaves that lower the probability of an entry
to feature our node, we set the number of entires per packet as
10% of the total nodes we have.
计算我们要附加的gossip节点数,gossip部分的节点数应该是所有节点数的1/10,但是最少应该包含3个节点信息。
之所以在gossip部分需要包含所有节点数的1/10,是为了能够在下线检测时间,也就是2倍的node_timeout时间内,
如果有节点下线的话,能够收到大部分集群节点发来的,关于该节点的下线报告; 1/10这个数是这样来的:
如果共有N个集群节点,在超时时间node_timeout内,当前节点最少会收到其他任一节点发来的4个心跳包:
因节点最长经过node_timeout/2时间,就会其他节点发送一次PING包。节点收到PING包后,会回复PONG包。
因此,在node_timeout时间内,当前节点会收到节点A发来的两个PING包,并且会收到节点A发来的,对于我发过去的PING包的回复包,也就是2个PONG包。
因此,在下线监测时间node_timeout2内,会收到其他任一集群节点发来的8个心跳包。
因此,当前节点总共可以收到8N个心跳包,每个心跳包中,包含下线节点信息的概率是1/10,
因此,收到下线报告的期望值就是8N(1/10),也就是N*80%,因此,这意味着可以收到大部分节点发来的下线报告。
参考
以上源码基于redis-4.0.6
《redis开发与运维》
https://blog.csdn.net/Jin\_Kwok/article/details/90111631
http://c.biancheng.net/view/375.html
https://www.cnblogs.com/merlindu/p/6417957.html?utm\_source=itdadao&utm\_medium=referral
https://www.jianshu.com/p/652b45591bbf
还没有评论,来说两句吧...