奔驰宝马游戏网站建设,互联网专属保险什么意思,网站建设网络推广代理公司,黄冈seoRaft
Raft提供了一种在计算系统集群中分布状态机的通用方法#xff0c;确保集群中的每个节点都同意一系列相同的状态转换。 一个Raft集群包含若干个服务器节点#xff0c;通常为5个#xff0c;这允许整个系统容忍2个节点的失效。每个节点处于以下三种状态之一#xff1a; …Raft
Raft提供了一种在计算系统集群中分布状态机的通用方法确保集群中的每个节点都同意一系列相同的状态转换。 一个Raft集群包含若干个服务器节点通常为5个这允许整个系统容忍2个节点的失效。每个节点处于以下三种状态之一
follower跟随者所有节点都以follower的状态开始。如果没有收到leader的消息则会变成candidate状态。candidate候选人会向其他节点“拉选票”如果得到大部分的票则成为leader这个过程叫做Leader选举Leader Election。未当选Leader的节点会将状态转换为follower。leader领导者所有对系统的修改都会先经过leader。
raft 是 etcd 和consoul的核心算法。
Raft 一致性算法
Raft通过选出一个leader来简化日志副本的管理例如日志项log entry)只允许从leader流向follower。基于leader的方法Raft算法可以分解成三个子问题。 Leader election领导选举原来的leader挂掉后必须选出一个新的leader。Log replication日志复制leader从客户端接收日志并复制到整个集群中。Safety安全性如果有任意的server将日志项回放到状态机中了那么其他的server只会回放相同的日志项。
动画演示
地址http://thesecretlivesofdata.com/raft/ 网络节点必须是奇数个。 动画主要包含三部分内容
第一部分介绍简单版的领导者选举和日志复制的过程第二部分介绍详细版的领导者选举和日志复制过程第三部分介绍如果遇到网络分区脑裂raft算法是如何回复网络一致的。
Leader election领导选举
Raft 使用一种心跳机制来触发领导人选举当服务器程序启动时节点都是 follower(跟随者) 身份如果一个跟随者在一段时间里没有接收到任何消息也就是选举超时然后他就会认为系统中没有可用的领导者然后开始进行选举以选出新的领导者要开始一次选举过程follower 会给当前term加1并且转换成candidate状态然后它会并行的向集群中的其他服务器节点发送请求投票的 RPCs 来给自己投票。候选人的状态维持直到发生以下任何一个条件发生的时候 他自己赢得了这次的选举其他的服务器成为领导者一段时间之后没有任何一个获胜的人
Log replication日志复制
当选出 leader 后它会开始接收客户端请求每个请求会带有一个指令可以被回放到状态机中leader 把指令追加成一个log entry然后通过AppendEntries RPC并行地发送给其他的server当该entry被多数server复制后leader 会把该entry回放到状态机中然后把结果返回给客户端当 follower 宕机或者运行较慢时leader 会无限地重发AppendEntries给这些follower直到所有的follower都复制了该log entryraft的log replication要保证如果两个log entry有相同的index和term那么它们存储相同的指令leader在一个特定的term和index下只会创建一个log entry
代码
package mainimport (osstrconvfmtsyncnet/rpcnet/httpmath/randtimelog
)//对每个节点id和端口的封装类型
type nodeInfo struct {id stringport string
}//声明节点对象类型Raft
type Raft struct {node nodeInfomu sync.Mutex//当前节点编号me intcurrentTerm intvotedFor intstate inttimeout intcurrentLeader int//该节点最后一次处理数据的时间lastMessageTime int64message chan booleclectCh chan boolheartbeat chan bool//子节点给主节点返回心跳信号heartbeatRe chan bool
}//声明leader对象
type Leader struct {//任期Term int//leader 编号LeaderId int
}//设置节点个数
const raftCount 2var leader Leader{0, -1}
//存储缓存信息
var bufferMessage make(map[string]string)
//处理数据库信息
var mysqlMessage make(map[string]string)
//操作消息数组下标
var messageId 1
//用nodeTable存储每个节点中的键值对
var nodeTable map[string]stringfunc main() {//终端接收来的是数组if len(os.Args) 1 {//接收终端输入的信息userId : os.Args[1]//字符串转换整型id, _ : strconv.Atoi(userId)fmt.Println(id)//定义节点id和端口号nodeTable map[string]string{1: :8000,2: :8001,}//封装nodeInfo对象node : nodeInfo{id: userId, port: nodeTable[userId]}//创建节点对象rf : Make(id)//确保每个新建立的节点都有端口对应//127.0.0.1:8000rf.node node//注册rpcgo func() {//注册rpc为了实现远程链接rf.raftRegisterRPC(node.port)}()if userId 1 {go func() {//回调方法http.HandleFunc(/req, rf.getRequest)fmt.Println(监听8080)if err : http.ListenAndServe(:8080, nil); err ! nil {fmt.Println(err)return}}()}}for {;}
}var clientWriter http.ResponseWriterfunc (rf *Raft) getRequest(writer http.ResponseWriter, request *http.Request) {request.ParseForm()if len(request.Form[age]) 0 {clientWriter writerfmt.Println(主节点广播客户端请求age:, request.Form[age][0])param : Param{Msg: request.Form[age][0], MsgId: strconv.Itoa(messageId)}messageIdif leader.LeaderId rf.me {rf.sendMessageToOtherNodes(param)} else {//将消息转发给leaderleaderId : nodeTable[strconv.Itoa(leader.LeaderId)]//连接远程rpc服务rpc, err : rpc.DialHTTP(tcp, 127.0.0.1leaderId)if err ! nil {log.Fatal(\nrpc转发连接server错误:, leader.LeaderId, err)}var bo false//首先给leader传递err rpc.Call(Raft.ForwardingMessage, param, bo)if err ! nil {log.Fatal(\nrpc转发调用server错误:, leader.LeaderId, err)}}}
}func (rf *Raft) sendMessageToOtherNodes(param Param) {bufferMessage[param.MsgId] param.Msg// 只有领导才能给其它服务器发送消息if rf.currentLeader rf.me {var success_count 0fmt.Printf(领导者发送数据中 。。。\n)go func() {rf.broadcast(param, Raft.LogDataCopy, func(ok bool) {//需要其它服务端回应rf.message - ok})}()for i : 0; i raftCount-1; i {fmt.Println(等待其它服务端回应)select {case ok : -rf.message:if ok {success_countif success_count raftCount/2 {rf.mu.Lock()rf.lastMessageTime milliseconds()mysqlMessage[param.MsgId] bufferMessage[param.MsgId]delete(bufferMessage, param.MsgId)if clientWriter ! nil {fmt.Fprintf(clientWriter, OK)}fmt.Printf(\n领导者发送数据结束\n)rf.mu.Unlock()}}}}}
}//注册Raft对象注册后的目的为确保每个节点raft) 可以远程接收
func (node *Raft) raftRegisterRPC(port string) {//注册一个服务器rpc.Register(node)//把服务绑定到http协议上rpc.HandleHTTP()err : http.ListenAndServe(port, nil)if err ! nil {fmt.Println(注册rpc服务失败, err)}
}//创建节点对象
func Make(me int) *Raft {rf : Raft{}rf.me merf.votedFor -1//0 follower ,1 candidate ,2 leaderrf.state 0rf.timeout 0rf.currentLeader -1rf.setTerm(0)//初始化通道rf.message make(chan bool)rf.heartbeat make(chan bool)rf.heartbeatRe make(chan bool)rf.eclectCh make(chan bool)//每个节点都有选举权go rf.election()//每个节点都有心跳功能go rf.sendLeaderHeartBeat()return rf
}//选举成功后应该广播所有的节点本节点成为了leader
func (rf *Raft) sendLeaderHeartBeat() {for {select {case -rf.heartbeat:rf.sendAppendEntriesImpl()}}
}func (rf *Raft) sendAppendEntriesImpl() {if rf.currentLeader rf.me {var success_count 0go func() {param : Param{Msg: leader heartbeat,Arg: Leader{rf.currentTerm, rf.me}}rf.broadcast(param, Raft.Heartbeat, func(ok bool) {rf.heartbeatRe - ok})}()for i : 0; i raftCount-1; i {select {case ok : -rf.heartbeatRe:if ok {success_countif success_count raftCount/2 {rf.mu.Lock()rf.lastMessageTime milliseconds()fmt.Println(接收到了子节点们的返回信息)rf.mu.Unlock()}}}}}
}func randomRange(min, max int64) int64 {//设置随机时间rand.Seed(time.Now().UnixNano())return rand.Int63n(max-min) min
}//获得当前时间毫秒
func milliseconds() int64 {return time.Now().UnixNano() / int64(time.Millisecond)
}func (rf *Raft) election() {var result bool//每隔一段时间发一次心跳for {//延时时间timeout : randomRange(1500, 3000)//设置该节点最有一次处理消息的时间rf.lastMessageTime milliseconds()select {//间隔时间为1500-3000ms的随机值case -time.After(time.Duration(timeout) * time.Millisecond):}result falsefor !result {//选择leaderresult rf.election_one_round(leader)}}
}func (rf *Raft) election_one_round(args *Leader) bool {//已经有了leader并且不是自己那么returnif args.LeaderId -1 args.LeaderId ! rf.me {fmt.Printf(%d已是leader终止%d选举\n, args.LeaderId, rf.me)return true}var timeout int64var vote intvar triggerHeartbeat booltimeout 2000last : milliseconds()success : falserf.mu.Lock()rf.becomeCandidate()rf.mu.Unlock()fmt.Printf(candidate%d start electing leader\n, rf.me)for {fmt.Printf(candidate%d send request vote to server\n, rf.me)go func() {rf.broadcast(Param{Msg: send request vote}, Raft.ElectingLeader, func(ok bool) {//无论成功失败都需要发送到通道 避免堵塞rf.eclectCh - ok})}()vote 0triggerHeartbeat falsefor i : 0; i raftCount-1; i {fmt.Printf(candidate%d waiting for select for i%d\n, rf.me, i)select {case ok : -rf.eclectCh:if ok {votesuccess vote raftCount/2 || rf.currentLeader -1if success !triggerHeartbeat {fmt.Println(okok, args)triggerHeartbeat truerf.mu.Lock()rf.becomeLeader()args.Term rf.currentTerm 1args.LeaderId rf.merf.mu.Unlock()fmt.Printf(candidate%d becomes leader\n, rf.currentLeader)rf.heartbeat - true}}}fmt.Printf(candidate%d complete for select for i%d\n, rf.me, i)}if (timeoutlast milliseconds()) || (vote raftCount/2 || rf.currentLeader -1) {break} else {select {case -time.After(time.Duration(5000) * time.Millisecond):}}}fmt.Printf(candidate%d receive votes status%t\n, rf.me, success)return success
}func (rf *Raft) becomeLeader() {rf.state 2fmt.Println(rf.me, 成为了leader)rf.currentLeader rf.me
}//设置发送参数的数据类型
type Param struct {Msg stringMsgId stringArg Leader
}func (rf *Raft) broadcast(msg Param, path string, fun func(ok bool)) {//设置不要自己给自己广播for nodeID, port : range nodeTable {if nodeID rf.node.id {continue;}//链接远程rpcrp, err : rpc.DialHTTP(tcp, 127.0.0.1port)if err ! nil {fun(false)continue}var bo falseerr rp.Call(path, msg, bo)if err ! nil {fun(false)continue}fun(bo)}}func (rf *Raft) becomeCandidate() {if rf.state 0 || rf.currentLeader -1 {//候选人状态rf.state 1rf.votedFor rf.merf.setTerm(rf.currentTerm 1)rf.currentLeader -1}
}func (rf *Raft) setTerm(term int) {rf.currentTerm term
}//Rpc处理
func (rf *Raft) ElectingLeader(param Param, a *bool) error {//给leader投票*a truerf.lastMessageTime milliseconds()return nil
}func (rf *Raft) Heartbeat(param Param, a *bool) error {fmt.Println(\nrpc:heartbeat:, rf.me, param.Msg)if param.Arg.Term rf.currentTerm {*a false} else {leader param.Argfmt.Printf(%d收到leader%d心跳\n, rf.currentLeader, leader.LeaderId)*a truerf.mu.Lock()rf.currentLeader leader.LeaderIdrf.votedFor leader.LeaderIdrf.state 0rf.lastMessageTime milliseconds()fmt.Printf(server %d learned that leader %d\n, rf.me, rf.currentLeader)rf.mu.Unlock()}return nil
}//连接到leader节点
func (rf *Raft) ForwardingMessage(param Param, a *bool) error {fmt.Println(\nrpc:forwardingMessage:, rf.me, param.Msg)rf.sendMessageToOtherNodes(param)*a truerf.lastMessageTime milliseconds()return nil
}//接收leader传过来的日志
func (r *Raft) LogDataCopy(param Param, a *bool) error {fmt.Println(\nrpc:LogDataCopy:, r.me, param.Msg)bufferMessage[param.MsgId] param.Msg*a truereturn nil
}
Reference 老男孩 Go 5期