网站做多个镜像,网站建设公司有哪些,网站建设评比自评情况,望野李梦阳overseer主要完成了三部分功能#xff1a;
1、连接的无损关闭#xff0c;2、连接的平滑重启#xff0c;3、文件变更的自动重启。
下面依次讲一下#xff1a;
一、连接的无损关闭
golang官方的net包是不支持连接的无损关闭的#xff0c;当主监听协程退出时#xff0c;…overseer主要完成了三部分功能
1、连接的无损关闭2、连接的平滑重启3、文件变更的自动重启。
下面依次讲一下
一、连接的无损关闭
golang官方的net包是不支持连接的无损关闭的当主监听协程退出时并不会等待各个实际work协程的处理完成。
以下是golang官方代码
Go/src/net/http/server.go
func (srv *Server) Serve(l net.Listener) error {if fn : testHookServerServe; fn ! nil {fn(srv, l) // call hook with unwrapped listener}origListener : ll onceCloseListener{Listener: l}defer l.Close()if err : srv.setupHTTP2_Serve(); err ! nil {return err}if !srv.trackListener(l, true) {return ErrServerClosed}defer srv.trackListener(l, false)baseCtx : context.Background()if srv.BaseContext ! nil {baseCtx srv.BaseContext(origListener)if baseCtx nil {panic(BaseContext returned a nil context)}}var tempDelay time.Duration // how long to sleep on accept failurectx : context.WithValue(baseCtx, ServerContextKey, srv)for {rw, err : l.Accept()if err ! nil {if srv.shuttingDown() {return ErrServerClosed}if ne, ok : err.(net.Error); ok ne.Temporary() {if tempDelay 0 {tempDelay 5 * time.Millisecond} else {tempDelay * 2}if max : 1 * time.Second; tempDelay max {tempDelay max}srv.logf(http: Accept error: %v; retrying in %v, err, tempDelay)time.Sleep(tempDelay)continue}return err}connCtx : ctxif cc : srv.ConnContext; cc ! nil {connCtx cc(connCtx, rw)if connCtx nil {panic(ConnContext returned nil)}}tempDelay 0c : srv.newConn(rw)c.setState(c.rwc, StateNew, runHooks) // before Serve can returngo c.serve(connCtx)}
}当监听套接字关闭l.Accept()退出循环时并不会等待go c.serve(connCtx)协程的处理完成。
overseer的处理方式是包装了golang的监听套接字和连接套接字通过sync.WaitGroup提供了对主协程异步等待work协程处理完成的支持。
overseer代码如下
overseer-v1.1.6\graceful.go
func (l *overseerListener) Accept() (net.Conn, error) {conn, err : l.Listener.(*net.TCPListener).AcceptTCP()if err ! nil {return nil, err}conn.SetKeepAlive(true) // see http.tcpKeepAliveListenerconn.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListeneruconn : overseerConn{Conn: conn,wg: l.wg,closed: make(chan bool),}go func() {//connection watcherselect {case -l.closeByForce:uconn.Close()case -uconn.closed://closed manually}}()l.wg.Add(1)return uconn, nil
}//non-blocking trigger close
func (l *overseerListener) release(timeout time.Duration) {//stop accepting connections - release fdl.closeError l.Listener.Close()//start timer, close by force if deadline not metwaited : make(chan bool)go func() {l.wg.Wait()waited - true}()go func() {select {case -time.After(timeout):close(l.closeByForce)case -waited://no need to force close}}()
}//blocking wait for close
func (l *overseerListener) Close() error {l.wg.Wait()return l.closeError
}func (o overseerConn) Close() error {err : o.Conn.Close()if err nil {o.wg.Done()o.closed - true}return err
}在(l *overseerListener) Accept函数中每生成一个work连接执行l.wg.Add(1)在(o overseerConn) Close函数中每关闭一个work连接执行o.wg.Done()。
在异步关闭模式(l *overseerListener) release函数中和在同步关闭模式(l *overseerListener) Close函数中都会调用l.wg.Wait()以等待work协程的处理完成。
监听套接字关闭流程
1、work进程收到重启信号或者master进程收到重启信号然后转发到work进程。
2、work进程的信号处理里包含对(l *overseerListener) release的调用。
3、在(l *overseerListener) release里关闭监听套接字并异步l.wg.Wait()。
4、在官方包net/http/server.go的 (srv *Server) Serve里l.Accept()出错返回退出监听循环然后执行defer l.Close()即(l *overseerListener) Close。
5、在(l *overseerListener) Close里同步执行l.wg.Wait()等待work连接处理完成。
6、work连接处理完成时会调用(o overseerConn) Close()进而调用o.wg.Done()。
7、所有work连接处理完成后向master进程发送SIGUSR1信号。
8、master进程收到SIGUSR1信号后将true写入mp.descriptorsReleased管道。
9、master进程的(mp *master) fork里收到mp.descriptorsReleased后结束本次fork进入下一次fork。
二、连接的平滑重启
所谓平滑重启就是重启不会造成客户端的断连对客户端无感知比如原有的排队连接不会被丢弃所以监听套接字通过master进程在新旧work进程间传递而不是新启的work进程重新创建监听连接。
监听套接字由master进程创建
overseer-v1.1.6/proc_master.go
func (mp *master) retreiveFileDescriptors() error {mp.slaveExtraFiles make([]*os.File, len(mp.Config.Addresses))for i, addr : range mp.Config.Addresses {a, err : net.ResolveTCPAddr(tcp, addr)if err ! nil {return fmt.Errorf(Invalid address %s (%s), addr, err)}l, err : net.ListenTCP(tcp, a)if err ! nil {return err}f, err : l.File()if err ! nil {return fmt.Errorf(Failed to retreive fd for: %s (%s), addr, err)}if err : l.Close(); err ! nil {return fmt.Errorf(Failed to close listener for: %s (%s), addr, err)}mp.slaveExtraFiles[i] f}return nil
}
从mp.Config.Addresses中拿到地址建立监听连接最后把文件句柄存入mp.slaveExtraFiles。
在这个过程中调用了(l *TCPListener) Close但其实对work进程无影响影响的只是master进程自己不能读写监听套接字。
这里引用下对网络套接字close和shutdown的区别 close ---- 关闭本进程的socket id但连接还是开着的用这个socket id的其它进程还能用这个连接能读或写这个socket id。 shutdown ---- 则破坏了socket 连接读的时候可能侦探到EOF结束符写的时候可能会收到一个SIGPIPE信号这个信号可能直到socket buffer被填充了才收到shutdown还有一个关闭方式的参数0 不能再读1不能再写2 读写都不能。 将mp.slaveExtraFiles传递给子进程即work进程
overseer-v1.1.6/proc_master.go
func (mp *master) fork() error {mp.debugf(starting %s, mp.binPath)cmd : exec.Command(mp.binPath)//mark this new process as the active slave process.//this process is assumed to be holding the socket files.mp.slaveCmd cmdmp.slaveID//provide the slave process with some statee : os.Environ()e append(e, envBinIDhex.EncodeToString(mp.binHash))e append(e, envBinPathmp.binPath)e append(e, envSlaveIDstrconv.Itoa(mp.slaveID))e append(e, envIsSlave1)e append(e, envNumFDsstrconv.Itoa(len(mp.slaveExtraFiles)))cmd.Env e//inherit master args/stdfilescmd.Args os.Argscmd.Stdin os.Stdincmd.Stdout os.Stdoutcmd.Stderr os.Stderr//include socket filescmd.ExtraFiles mp.slaveExtraFilesif err : cmd.Start(); err ! nil {return fmt.Errorf(Failed to start slave process: %s, err)}//was scheduled to restart, notify successif mp.restarting {mp.restartedAt time.Now()mp.restarting falsemp.restarted - true}//convert wait into channelcmdwait : make(chan error)go func() {cmdwait - cmd.Wait()}()//wait....select {case err : -cmdwait://program exited before releasing descriptors//proxy exit code out to mastercode : 0if err ! nil {code 1if exiterr, ok : err.(*exec.ExitError); ok {if status, ok : exiterr.Sys().(syscall.WaitStatus); ok {code status.ExitStatus()}}}mp.debugf(prog exited with %d, code)//if a restarts are disabled or if it was an//unexpected crash, proxy this exit straight//through to the main processif mp.NoRestart || !mp.restarting {os.Exit(code)}case -mp.descriptorsReleased://if descriptors are released, the program//has yielded control of its sockets and//a parallel instance of the program can be//started safely. it should serve state.Listeners//to ensure downtime is kept at 1sec. The previous//cmd.Wait() will still be consumed though the//result will be discarded.}return nil
}通过cmd.ExtraFiles mp.slaveExtraFiles语句向子进程传递套接字这个参数最终传递给fork系统调用传递的fd会被子进程继承。
子进程即work进程处理继承的套接字
overseer-v1.1.6/proc_slave.go
func (sp *slave) run() error {sp.id os.Getenv(envSlaveID)sp.debugf(run)sp.state.Enabled truesp.state.ID os.Getenv(envBinID)sp.state.StartedAt time.Now()sp.state.Address sp.Config.Addresssp.state.Addresses sp.Config.Addressessp.state.GracefulShutdown make(chan bool, 1)sp.state.BinPath os.Getenv(envBinPath)if err : sp.watchParent(); err ! nil {return err}if err : sp.initFileDescriptors(); err ! nil {return err}sp.watchSignal()//run program with statesp.debugf(start program)sp.Config.Program(sp.state)return nil
}func (sp *slave) initFileDescriptors() error {//inspect file descriptorsnumFDs, err : strconv.Atoi(os.Getenv(envNumFDs))if err ! nil {return fmt.Errorf(invalid %s integer, envNumFDs)}sp.listeners make([]*overseerListener, numFDs)sp.state.Listeners make([]net.Listener, numFDs)for i : 0; i numFDs; i {f : os.NewFile(uintptr(3i), )l, err : net.FileListener(f)if err ! nil {return fmt.Errorf(failed to inherit file descriptor: %d, i)}u : newOverseerListener(l)sp.listeners[i] usp.state.Listeners[i] u}if len(sp.state.Listeners) 0 {sp.state.Listener sp.state.Listeners[0]}return nil
}
子进程只是重新包装套接字并没有新建监听连接包装成u : newOverseerListener(l)类型这些监听套接字最后传递给sp.Config.Program(sp.state)即用户的启动程序
overseer-v1.1.6/example/main.go
// convert your main() into a prog(state)
// prog() is run in a child process
func prog(state overseer.State) {fmt.Printf(app#%s (%s) listening...\n, BuildID, state.ID)http.Handle(/, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {d, _ : time.ParseDuration(r.URL.Query().Get(d))time.Sleep(d)fmt.Fprintf(w, app#%s (%s) %v says hello\n, BuildID, state.ID, state.StartedAt)}))http.Serve(state.Listener, nil)fmt.Printf(app#%s (%s) exiting...\n, BuildID, state.ID)
}// then create another main which runs the upgrades
// main() is run in the initial process
func main() {overseer.Run(overseer.Config{Program: prog,Address: :5001,Fetcher: fetcher.File{Path: my_app_next},Debug: true, //display log of overseer actionsTerminateTimeout: 10 * time.Minute,})
}
在用户程序中http.Serve(state.Listener, nil)调用
1、使用的accept方式是包装后的(l *overseerListener) Accept()。
2、defer l.Close()使用也是包装后的(l *overseerListener) Close()。
3、由(l *overseerListener) Accept()创建的work连接也都包装成了overseerConn连接在关闭时会调用(o overseerConn) Close()
三、文件变更的自动重启
能够自动监视文件变化有变更时自动触发重启流程。
在master进程启动时检查配置如果设置了mp.Config.Fetcher则进入fetchLoop
overseer-v1.1.6/proc_master.go
// fetchLoop is run in a goroutine
func (mp *master) fetchLoop() {min : mp.Config.MinFetchIntervaltime.Sleep(min)for {t0 : time.Now()mp.fetch()//duration fetch of fetchdiff : time.Now().Sub(t0)if diff min {delay : min - diff//ensures at least MinFetchInterval delay.//should be throttled by the fetcher!time.Sleep(delay)}}
}
mp.Config.MinFetchInterval默认是1秒也就是每秒检查一次变更。time.Duration类型可以设置更小的粒度。
已经支持的fetcher包括fetcher_file.go、fetcher_github.go、fetcher_http.go、fetcher_s3.go。
以fetcher_file.go为例说明。
1、文件变更的判断
overseer-v1.1.6/proc_master.go //tee off to sha1hash : sha1.New()reader io.TeeReader(reader, hash)//write to a temp file_, err io.Copy(tmpBin, reader)if err ! nil {mp.warnf(failed to write temp binary: %s, err)return}//compare hashnewHash : hash.Sum(nil)if bytes.Equal(mp.binHash, newHash) {mp.debugf(hash match - skip)return}
通过sha1算法实现比较新旧hash值并没有关注文件时间戳。
2、验证是可执行文件且是支持overseer的
overseer-v1.1.6/proc_master.go tokenIn : token()cmd : exec.Command(tmpBinPath)cmd.Env append(os.Environ(), []string{envBinCheck tokenIn}...)cmd.Args os.Argsreturned : falsego func() {time.Sleep(5 * time.Second)if !returned {mp.warnf(sanity check against fetched executable timed-out, check overseer is running)if cmd.Process ! nil {cmd.Process.Kill()}}}()tokenOut, err : cmd.CombinedOutput()returned trueif err ! nil {mp.warnf(failed to run temp binary: %s (%s) output \%s\, err, tmpBinPath, tokenOut)return}if tokenIn ! string(tokenOut) {mp.warnf(sanity check failed)return}
这是通过overseer预埋的代码实现的
overseer-v1.1.6/overseer.go
//sanityCheck returns true if a check was performed
func sanityCheck() bool {//sanity checkif token : os.Getenv(envBinCheck); token ! {fmt.Fprint(os.Stdout, token)return true}//legacy sanity check using old env varif token : os.Getenv(envBinCheckLegacy); token ! {fmt.Fprint(os.Stdout, token)return true}return false
}这段代码在main启动时在overseer.Run里会调用到传递固定的环境变量然后命令行输出会原样显示出来即为成功。
3、覆盖旧文件并触发重启。
overseer-v1.1.6/proc_master.go //overwrite!if err : overwrite(mp.binPath, tmpBinPath); err ! nil {mp.warnf(failed to overwrite binary: %s, err)return}mp.debugf(upgraded binary (%x - %x), mp.binHash[:12], newHash[:12])mp.binHash newHash//binary successfully replacedif !mp.Config.NoRestartAfterFetch {mp.triggerRestart()}
由(mp *master) triggerRestart进入重启流程
overseer-v1.1.6/proc_master.go
func (mp *master) triggerRestart() {if mp.restarting {mp.debugf(already graceful restarting)return //skip} else if mp.slaveCmd nil || mp.restarting {mp.debugf(no slave process)return //skip}mp.debugf(graceful restart triggered)mp.restarting truemp.awaitingUSR1 truemp.signalledAt time.Now()mp.sendSignal(mp.Config.RestartSignal) //ask nicely to terminateselect {case -mp.restarted://successmp.debugf(restart success)case -time.After(mp.TerminateTimeout)://times up mr. process, we did ask nicely!mp.debugf(graceful timeout, forcing exit)mp.sendSignal(os.Kill)}
}
向子进程发送mp.Config.RestartSignal信号子进程收到信号后关闭监听套接字然后向父进程发送SIGUSR1信号
overseer-v1.1.6/proc_slave.go if len(sp.listeners) 0 {//perform graceful shutdownfor _, l : range sp.listeners {l.release(sp.Config.TerminateTimeout)}//signal release of held sockets, allows master to start//a new process before this child has actually exited.//early restarts not supported with restarts disabled.if !sp.NoRestart {sp.masterProc.Signal(SIGUSR1)}//listeners should be waiting on connections to close...}
父进程收到SIGUSR1信号后通知mp.descriptorsReleased管道监听套接字已经关闭
overseer-v1.1.6/proc_master.go //**during a restart** a SIGUSR1 signals//to the master process that, the file//descriptors have been releasedif mp.awaitingUSR1 s SIGUSR1 {mp.debugf(signaled, sockets ready)mp.awaitingUSR1 falsemp.descriptorsReleased - true} else
最终回到(mp *master) fork函数fork函数一直在等待mp.descriptorsReleased通知或者cmd.Wait子进程退出收到管道通知后fork退出进入下一轮fork循环。
overseer-v1.1.6/proc_master.go
func (mp *master) fork() error {//... ...//... ...//... ...//convert wait into channelcmdwait : make(chan error)go func() {cmdwait - cmd.Wait()}()//wait....select {case err : -cmdwait://program exited before releasing descriptors//proxy exit code out to mastercode : 0if err ! nil {code 1if exiterr, ok : err.(*exec.ExitError); ok {if status, ok : exiterr.Sys().(syscall.WaitStatus); ok {code status.ExitStatus()}}}mp.debugf(prog exited with %d, code)//if a restarts are disabled or if it was an//unexpected crash, proxy this exit straight//through to the main processif mp.NoRestart || !mp.restarting {os.Exit(code)}case -mp.descriptorsReleased://if descriptors are released, the program//has yielded control of its sockets and//a parallel instance of the program can be//started safely. it should serve state.Listeners//to ensure downtime is kept at 1sec. The previous//cmd.Wait() will still be consumed though the//result will be discarded.}return nil
}--end--