做网站送企业邮箱,查询网站备案信息,wordpress安装完无法登录,域名网站免费建站redsync使用教程前言redsync结构Pool结构Mutex结构acquire加锁操作release解锁操作redsync包的使用前言
在编程语言中锁可以理解为一个变量#xff0c;该变量在同一时刻只能有一个线程拥有#xff0c;以便保护共享数据在同一时刻只有一个线程去操作。对于高可用的分布式锁应…
redsync使用教程前言redsync结构Pool结构Mutex结构acquire加锁操作release解锁操作redsync包的使用前言
在编程语言中锁可以理解为一个变量该变量在同一时刻只能有一个线程拥有以便保护共享数据在同一时刻只有一个线程去操作。对于高可用的分布式锁应该满足以下条件 1.互斥在任意时间内只有一个客户能够获得一把锁具有排他性。 2.避免死锁即使客户端宕机或者从集群中分离了其它客户端仍然可以获取到该锁 3.容错只要大部分Redis节点存活客户端就能正确地获取锁和释放锁。即使锁住某个资源的客户端释放锁之前崩溃或者网络分区仍然能够获取锁和释放锁。 对于Redis高可用集群而言上述三个条件都非常容易满足所以适合做分布式锁。
redsync结构
redsync 的通用结构定义如下
Pool抽象连接池Conn抽象每个 Redis 连接ScriptRedis 脚本
Pool结构
redsync结构的Pools是一个redis.pool数组,每个 redis.Pool 都是上面的 Pool 实现它代表了一个 Redis 实例的连接池
Mutex结构
Mutex代表了一个分布式锁其成员多为 redlock 算法所需要的条件
// A Mutex is a distributed mutual exclusion lock.
type Mutex struct {name string // 名称expiry time.Duration // 锁的有效时间tries int // 尝试次数delayFunc DelayFunc // 失败尝试设置延迟factor float64 // 误差系数控制quorum int // 投票数 一般为节点数 / 21节点数为奇数genValueFunc func() (string, error) // 加密函数生成唯一随机串value string // 默认就是唯一随机串until time.Time // 过期时间pools []Pool // 连接池每个 Pool 指一个 Redis 实例
}获取锁的Lock方法实现了redLock的加锁接口具体实现如下
func (m *Mutex) LockContext(ctx context.Context) error {if ctx nil {ctx context.Background()}//生成随机串base64value, err : m.genValueFunc()if err ! nil {return err}//不超过tries次数进行加锁for i : 0; i m.tries; i {if i ! 0 {time.Sleep(m.delayFunc(i))}start : time.Now()n, err : func() (int, error) {ctx, cancel : context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))defer cancel()//尝试异步去获取锁return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {return m.acquire(ctx, pool, value)})}()now : time.Now()// 过期时间 有效时间值 - 获取锁消耗的时间值 - 有效时间值 * 误差系数until : now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))//成功节点数节点数/21 未过期时判定加锁成功 if n m.quorum now.Before(until) {m.value valuem.until untilreturn nil}func() (int, error) {ctx, cancel : context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))defer cancel()//获取锁失败尝试异步去释放锁return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {return m.release(ctx, pool, value)})}()if i m.tries-1 err ! nil {return err}}return ErrFailed
}time.Sleep(m.delayFunc(i))的失败重试逻辑是当客户端无法获取锁时会设置一个随机值来重试。这个随机值应当和申请锁时间错开减少脑裂的可能性。此外还调用了actOnPoolsAsync来实现非阻塞方式同时向多个Redis实例发送set请求。我们来看下actOnPoolsAsync是如何定义的。
func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {type result struct {Node intStatus boolErr error}ch : make(chan result)for node, pool : range m.pools {go func(node int, pool redis.Pool) {r : result{Node: node}r.Status, r.Err actFn(pool)ch - r}(node, pool)}n : 0var taken []intvar err errorfor range m.pools {r : -chif r.Status {n} else if r.Err ! nil {err multierror.Append(err, RedisError{Node: r.Node, Err: r.Err})} else {taken append(taken, r.Node)err multierror.Append(err, ErrNodeTaken{Node: r.Node})}}if len(taken) m.quorum {return n, ErrTaken{Nodes: taken}}return n, err
}
acquire加锁操作
func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {conn, err : pool.Get(ctx)if err ! nil {return false, err}defer conn.Close()reply, err : conn.SetNX(m.name, value, m.expiry)if err ! nil {return false, err}return reply, nil
}release解锁操作
func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {conn, err : pool.Get(ctx)if err ! nil {return false, err}defer conn.Close()//调用Eval,以脚本方式释放锁status, err : conn.Eval(deleteScript, m.name, value)if err ! nil {return false, err}return status ! int64(0), nil
}redsync包的使用
该包的使用很简单具体步骤如下
首先创建一个Redis的客户端连接将该客户端连接加入到Redis的Pool中redsync基于该Redis Pool进行实例化通过redsync实例的NewMutex就可以基于一个具体的key新建一个分布式锁 该包进行实例化时有基于Redis的单机模式和集群模式两种使用方式在使用上主要有两种区别Redis的客户端是以集群模式还是单机模式创建在导入redsync包时集群模式需要导入goredis/v8的版本 具体例子如下
func main() {//创建redis的客户端连接cli : goredislib.NewClient(goredislib.Options{Addr: localhost:6379,})pool : goredis.NewPool(cli)rs : redsync.New(pool)mutexname : test-global-mutexmutex : rs.NewMutex(mutexname)if err : mutex.Lock(); err ! nil {panic(err)}if ok, err : mutex.Unlock(); !ok || err nil {panic(unlock failed)}}