站牛网,网站设计展示,大理网站开发,开发一款短视频app软件需要多少钱线程通讯
在程序中不可避免的出现并发或者并行#xff0c;一般来说对于一个程序大多数是遵循开发语言的启动顺序。例如#xff0c;对于go语言来说#xff0c;一般入口为main#xff0c;main中依次导入import导入的包#xff0c;并按顺序执行init方法#xff0c;之后在按…线程通讯
在程序中不可避免的出现并发或者并行一般来说对于一个程序大多数是遵循开发语言的启动顺序。例如对于go语言来说一般入口为mainmain中依次导入import导入的包并按顺序执行init方法之后在按照调用顺序执行程序。所以一般情况下程序是串行的。如下所示
在很多时候串行并不满足要求程序同时需要满足很多客户访问例如web程序必须要设置为并发的才能满足众多请求。
go语言通过go关键字实现新线程如下·
go func(){fmt.Println(-------开启一个新线程-----)
}()线程通讯go语言实现线程通讯是通过进程通讯实现内存共享。go语言内置chan数据结构实现线程通讯与数据共享。
package mainimport (fmtsynctime
)var ch make(chan int)
var wati sync.WaitGroupfunc main() {wati.Add(2)go producter()go customer()wati.Wait()
}func producter() {for i : 0; i 10; i {ch - i * 10fmt.Println(-----等待1秒)time.Sleep(time.Second * 1)}wati.Done()close(ch)
}
func customer() {for {x, ok : -chfmt.Println(-----------------custome, x, ok)if ok {fmt.Println(-----, x)} else {fmt.Println(----no data)break}}wati.Done()
}
chan本身在设计上是并发安全的。这意味着多个协程可以同时安全地对一个chan进行发送和接收操作而无需额外的同步措施。Go 语言的运行时系统会自动处理这些操作的并发安全性保证数据的正确传递和协程的正确同步。
线程等待
在多线程中各个线程是是独立的并不知道线程完成的次序所以线程等待也很重要如下代码
package mainimport (ossync
)var Filestream make(chan []byte)func main() {var str []byte(hello)go Read(str)go Write()
}func Read(by []byte) {Filestream - byclose(Filestream)
}func Write() error {file, err : os.Create(test.txt)if err ! nil {return err}by : -Filestream_, err file.Write(by)if err ! nil {return err}return nil
}上述代码通过chan实现了多线程创建文件但是实际上执行代码并不能成功这是由于主线程没有等待其他线程导致主线程在过早结束程序结束通过sync.WaitGroup库实现线程等待改造后的代码如下
package mainimport (ossync
)var Filestream make(chan []byte)
var wait sync.WaitGroupfunc main() {var str []byte(hello)wait.Add(2)go Read(str)go Write()wait.Wait()
}func Read(by []byte) {Filestream - byclose(Filestream)wait.Done()
}func Write() error {file, err : os.Create(test.txt)if err ! nil {return err}by : -Filestream_, err file.Write(by)if err ! nil {return err}wait.Done()return nil
}
并发安全
数据锁
在并发时就会出现资源被竞争的情况这就是涉及到并发安全了例如对于一个数组var list []string在对这个数组插入数据时对于同一时刻进行的操作数据就可能会丢失因此在操作数据时一定要保证操作的数据结构是并发安全的例如sync.Map就是线程安全的go语言底层实现了。
那么如何对自定义的数据结构体实现并发安全呢就要用到互斥锁了。互斥锁是一种用于多线程或多协程编程中的同步原语其主要目的是保护共享资源防止多个线程或协程同时访问和修改这些资源从而避免数据竞争和并发冲突。
在并发编程中多个线程或协程可能会同时访问和修改同一个共享变量。如果不加以控制就可能导致数据竞争即多个操作同时对同一个数据进行读写从而导致数据的不一致性或错误结果。
互斥锁通过提供一种互斥访问的机制确保在任何时刻只有一个线程或协程能够访问被保护的共享资源。当一个线程或协程获取了互斥锁后其他试图获取该锁的线程或协程就会被阻塞直到锁被释放。
Go语言的互斥锁被sync.Mutex实现。在go语言中提供了sync.Mapmap类型是并发安全的。要实现自己的并发安全需要借助sync.Mutex如下
并发安装的结构体
type safeArr[T any] struct {sync.Mutexdata []T
}func (self *safeArr[T]) Add(item T) {self.Lock()self.data append(self.data, item)self.Unlock()
}func (self *safeArr[T]) Remove(index int) {self.Lock()if index 0 index len(self.data) {self.data append(self.data[:index], self.data[index1:]...)}self.Unlock()
}func (self *safeArr[T]) Data() []T {self.Lock()self.Unlock()return self.data
}注意data 是不能直接向外部暴露的不可以使用append直接操作必须通过并发安装的Add方法。 测试
var arr safeArr[string]func Test(c *fiber.Ctx) error {arr.Add(sss)return c.JSON(fiber.Map{data: arr.data,})
}func main() {app : fiber.New()app.Get(/test, Test)go func() {for {if requestCount 0 {wg.Done()}}}()app.Listen(:8081)
}通过上述代码测试代码测试不用路由请求进来后的线程对并发安全的 safeArr的处理是否满足要求。如下所示 同步的路由线程进来之后对全局的变量可以实现操作这里并不能体现它是并发安全的我们将Add方法的self.Unlock()注释掉再次启动服务请求接口如下 如上图所示在注释掉解锁代码后再次请求会一直堵塞等待解锁可以看出上述定义的结构体就是并发安全的了。 互斥锁会使线程变为阻塞线程等待解锁而不是直接停掉。 方法锁
如何实现对方法枷锁同一时刻只允许一个线程使用该方法。对方法加锁和对数据结构加锁一样再方法内部加锁。
var arr []stringvar wu sync.Mutexfunc Test(c *fiber.Ctx) error {wu.Lock()defer wu.Unlock()arr append(arr, ssss)return c.JSON(fiber.Map{data: arr,})
}func main() {app : fiber.New()app.Get(/test, Test)go func() {for {if requestCount 0 {wg.Done()}}}()app.Listen(:8081)
}如果将defer wu.Unlock()注释掉也会是线程进入等待状态如下 但是又会有新的问题由于线程是独立的此时存在一个线程处于阻塞状态但是却可以再次发送新的请求再阻塞的这段时间内可以一直发送请求多次请求会也会造成数据的错误如下所示 如上图可以发现互斥锁可以是方法变成并发安全的但是在线程等待的过程中仍然可以发送请求。
Redis互斥锁
在上一节的方法互斥锁中需要额外的需求就是如果当前的线程占用某个资源时新的线程不会处于阻塞状态而是直接停止这时就需要有外部的标识记录资源的占用情况就需要借助r内存数据库如redis了。如下
redis初始化
var lock sync.Mutexvar Client *CustomRedisClienttype CustomRedisClient struct {redis.Conn
}func Init() {conn, err : redis.Dial(tcp, config.RedisVar.Host)if err ! nil {panic(errors.New(conn redis failed))}_, err conn.Do(auth, config.RedisVar.Password)if err ! nil {panic(errors.New(redis auth failed))}c : CustomRedisClient{conn}Client cfmt.Println(conn redis success)
}func (c *CustomRedisClient) Cmd(commandName string, args ...interface{}) (reply interface{}, err error) {lock.Lock()defer lock.Unlock()return c.Do(commandName, args...)
}redis互斥锁 func Lock(name string) (error, *string) {apply, err : Client.Cmd(GET, name)if apply ! nil || err ! nil {return errors.New(locking), nil}uid : utils.UUID()fmt.Printf(name: %v, uid: %v \n, name, uid)_, err Client.Cmd(SET, name, uid, EX, 5)if err ! nil {Client.Cmd(DEL, name)return errors.New(redis set err), nil}return nil, uid
}func Unlock(name string, uid string) error {reply, err : Client.Cmd(GET, name)if reply nil err nil {return nil}s : string(reply.([]byte))if s ! uid {Client.Cmd(DEL, name)return errors.New(can not unlock)}_, err Client.Cmd(DEL, name)if err ! nil {Client.Cmd(DEL, name)return err}return nil
}使用上述的redis互斥锁就可以显示当某个资源被线程占用时另一个资源进来会直接停掉。
对代码改造使用新的互斥锁如下
func Test(c *fiber.Ctx) error {err, s : redis_util.Lock(test)if err ! nil {if errors.Is(err, errors.New(locking)) {return c.JSON(fiber.Map{data: locking,})} else {return c.JSON(fiber.Map{data: err.Error(),})}}defer redis_util.Unlock(test, *s)time.Sleep(time.Second * 15)arr append(arr, ssss)return c.JSON(fiber.Map{data: arr,})
} 如上图所示当资源被占用是会返回locking需要注意的是redis存储的互斥标识的时间一定要大于或等于程序的执行时间不然程序还未执行玩redis占用标识就销毁了导致错误。