个人介绍网站怎么做,申请一个电子邮箱号,网站建设栏目层级,中国建设银行网站怎么登录不上Asynq[1]是一个Go实现的分布式任务队列和异步处理库#xff0c;基于redis#xff0c;类似Ruby的sidekiq[2]和Python的celery[3]。Go生态类似的还有machinery[4]和goworker 同时提供一个WebUI asynqmon[5]#xff0c;可以源码形式安装或使用Docker image, 还可以和Prometheus… Asynq[1]是一个Go实现的分布式任务队列和异步处理库基于redis类似Ruby的sidekiq[2]和Python的celery[3]。Go生态类似的还有machinery[4]和goworker 同时提供一个WebUI asynqmon[5]可以源码形式安装或使用Docker image, 还可以和Prometheus集成 docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon,如果使用的是主机上的redis还需加上 --redis-addrhost.docker.internal:6379,否则会报错[6] 即 docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon --redis-addrhost.docker.internal:6379 ➜ asynq-demo git:(main) ✗ tree.├── client.go├── const.go├── go.mod├── go.sum└── server.go0 directories, 5 files 其中const.go: package mainconst ( redisAddr 127.0.0.1:6379 redisPasswd )const ( TypeExampleTask shuang:asynq-task:example) client.go: package mainimport ( encoding/json fmt log time github.com/hibiken/asynq)type ExampleTaskPayload struct { UserID string Msg string // 业务需要的其他字段}func NewExampleTask(userID string, msg string) (*asynq.Task, error) { payload, err : json.Marshal(ExampleTaskPayload{UserID: userID, Msg: msg}) if err ! nil { return nil, err } return asynq.NewTask(TypeExampleTask, payload), nil}var client *asynq.Clientfunc main() { client asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr, Password: redisPasswd, DB: 0}) defer client.Close() //go startExampleTask() startExampleTask() //startGithubUpdate() // 定时触发}func startExampleTask() { fmt.Println(开始执行一次性的任务) // 立刻执行 task1, err : NewExampleTask(10001, mashangzhixing!) if err ! nil { log.Fatalf(could not create task: %v, err) } info, err : client.Enqueue(task1) if err ! nil { log.Fatalf(could not enqueue task: %v, err) } log.Printf(task1 - enqueued task: id%s queue%s, info.ID, info.Queue) // 10秒后执行(定时执行) task2, err : NewExampleTask(10002, 10s houzhixing) if err ! nil { log.Fatalf(could not create task: %v, err) } info, err client.Enqueue(task2, asynq.ProcessIn(10*time.Second)) if err ! nil { log.Fatalf(could not enqueue task: %v, err) } log.Printf(task2 - enqueued task: id%s queue%s, info.ID, info.Queue) // 30s后执行(定时执行) task3, err : NewExampleTask(10003, 30s houzhixing) if err ! nil { log.Fatalf(could not create task: %v, err) } theTime : time.Now().Add(30 * time.Second) info, err client.Enqueue(task3, asynq.ProcessAt(theTime)) if err ! nil { log.Fatalf(could not enqueue task: %v, err) } log.Printf(task3 - enqueued task: id%s queue%s, info.ID, info.Queue)} server.go: package mainimport ( context encoding/json fmt time github.com/davecgh/go-spew/spew github.com/hibiken/asynq)var AsynqServer *asynq.Server // 异步任务serverfunc initTaskServer() error { // 初始化异步任务服务端 AsynqServer asynq.NewServer( asynq.RedisClientOpt{ Addr: redisAddr, Password: redisPasswd, //与client对应 DB: 0, }, asynq.Config{ // Specify how many concurrent workers to use Concurrency: 100, // Optionally specify multiple queues with different priority. Queues: map[string]int{ critical: 6, default: 3, low: 1, }, // See the godoc for other configuration options }, ) return nil}func main() { initTaskServer() mux : asynq.NewServeMux() mux.HandleFunc(TypeExampleTask, HandleExampleTask) // ...register other handlers... if err : AsynqServer.Run(mux); err ! nil { fmt.Printf(could not run asynq server: %v, err) }}func HandleExampleTask(ctx context.Context, t *asynq.Task) error { res : make(map[string]string) spew.Dump(t.Payload() is:, t.Payload()) err : json.Unmarshal(t.Payload(), res) if err ! nil { fmt.Printf(rum session, can not parse payload: %s, err: %v, t.Payload(), err) return nil } //-----------具体处理逻辑------------ spew.Println(拿到的入参为:, res, 接下来将进行具体处理) fmt.Println() // 模拟具体的处理 time.Sleep(5 * time.Second) fmt.Println(--------------处理了5s处理完成-----------------) return nil} 执行redis-server 清除redis中所有的key 执行docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon --redis-addrhost.docker.internal:6379 执行 go run client.go const.go (生产者产生消息放入队列) 此时能看到redis中多个几个key 同时管理后台能看到队列的信息 执行 go run server.go const.go (消费者消费队列中的消息) 可以看到都被处理了 此时redis中的key 此处的业务处理为模拟实际可能是某个被触发后不需要马上执行的操作 实际试一下。通过一个定时器(24h执行一次)触发代码每天向github push当天的代码等内容。收到触发后无需马上执行(可能当时其他请求量高机器资源紧张)可以先放入队列延迟30min后实际去执行。 完整Demo[7] push github的功能没有完全实现 另外可以配置队列的优先级asynq队列如何配置队列优先级[8] // 初始化异步任务服务端 AsynqServer asynq.NewServer( asynq.RedisClientOpt{ Addr: redisAddr, Password: redisPasswd, //与client对应 DB: 0, }, asynq.Config{ // Specify how many concurrent workers to use Concurrency: 100, // Optionally specify multiple queues with different priority. Queues: map[string]int{ critical: 6,//关键队列中的任务将被处理 60% 的时间 default: 3,//默认队列中的任务将被处理 30% 的时间 low: 1,//低队列中的任务将被处理 10% 的时间 }, // See the godoc for other configuration options }, ) go asynq 异步任务 (延迟触发) 简单案例及奇怪的错误[9] 参考资料 [1] Asynq: https://github.com/hibiken/asynq [2] sidekiq: https://github.com/sidekiq/sidekiq [3] celery: https://github.com/celery/celery [4] machinery: https://blog.csdn.net/weixin_42681866/article/details/123334654 [5] asynqmon: https://github.com/hibiken/asynqmon [6] 报错: https://github.com/hibiken/asynqmon/issues/214 [7] 完整Demo: https://github.com/cuishuang/asynq-demo [8] asynq队列如何配置队列优先级: https://blog.csdn.net/itopit/article/details/126123626 [9] go asynq 异步任务 (延迟触发) 简单案例及奇怪的错误: https://my.oschina.net/randolphcyg/blog/5539676 本文由 mdnice 多平台发布