Go基于协程池的延迟任务调度器

原理

通过用一个goroutine以及堆来存储要待调度的延迟任务,当达到调度时间后,将其添加到协程池中去执行。
主要是使用了chan、Mutex、atomic及ants协程池来实现。

用途

主要是用于高并发及大量定时任务要处理的情况,如果使用Go协程来实现每次延迟任务的调度,那么数量极大的goroutine将会占用内存,导致性能下降,使用协程池实现延迟任务的调度,会改善该情况。
如在物联网设备中,当连接数量达到几十万时,如果使用goroutine来处理心跳或者活跃检测,频繁的创建销毁goroutine会影响性能。

特色

在常见的cron等开源框架中使用的是数组存储待调度的任务,每次循环时都要排序,并且要删除某个任务则时间复杂度是O(n)。

本文通过使用堆及双重Map优化存储待调度的任务,使得添加任务时间复杂度为O(log n),获取任务时间复杂度为O(1),删除时间复杂度为O(1)。

API

创建

NewSchedule(workerNum int, options ...ants.Option) (*Schedule, error) //创建协程数是1的延迟任务调度器
s, _ := NewSchedule(1)

创建一个延迟调度任务器,workerNum是协程数量,options是ants协程池的配置,除了WithMaxBlockingTasks不能配置,别的都可以,具体参考:https://github.com/panjf2000/ants

调度一次

func (s *Schedule) ScheduleOne(job func(), duration time.Duration) (TaskId, error) //1秒后打印一次时间
taskId, _ := s.ScheduleOne(func() {fmt.Println(time.Now())
}, time.Second)

重复调度

func (s *Schedule) Schedule(job func(), duration time.Duration) (TaskId, error) //每隔一秒打印一次时间
taskId, _ := s.Schedule(func() {fmt.Println(time.Now())
}, time.Second)

取消调度

func (s *Schedule) Schedule(job func(), duration time.Duration) (TaskId, error) //每隔一秒打印一次时间
taskId, _ := s.Schedule(func() {fmt.Println(time.Now())
}, time.Second)
//休眠3秒后,取消调度
time.Sleep(3 * time.Second)
s.CancelTask(taskId)

停止调度

func (s *Schedule) Schedule(job func(), duration time.Duration) (TaskId, error) //每隔一秒打印一次时间
taskId, _ := s.Schedule(func() {fmt.Println(time.Now())
}, time.Second)
//休眠3秒后,停用延迟任务调度器
time.Sleep(3 * time.Second)
s.Shutdown()

代码

package scheduleimport ("container/heap""errors""github.com/panjf2000/ants/v2""math""sync""sync/atomic""time"
)var (// ErrScheduleShutdown 延迟任务调度器已关闭错误ErrScheduleShutdown = errors.New("schedule: schedule is already in shutdown")
)const InvalidTaskId = 0// Schedule 延迟调度的结构体,提供延迟调度任务的全部方法
// 通过NewSchedule方法创建Schedule,通过Schedule、ScheduleOne方法添加延迟调度任务,通过CancelTask方法取消任务,通过Shutdown停止延迟任务
type Schedule struct {//任务堆,按时间排序taskHeap taskHeap//可执行的任务Map,key是当前的任务id,value是任务的第一次原始id,用于优化取消任务时需要遍历堆去删除executeTaskMap map[TaskId]TaskId//任务id的Map,key是任务的第一次原始id,value是当前的任务id,用于优化取消任务时需要遍历堆去删除taskIdMap map[TaskId]TaskId//锁 保证并发安全mux sync.Mutex//调度器是否运行中running bool//任务运行池pool *ants.Pool//下一个任务idnextTaskId TaskId//添加任务ChanaddTaskChan chan *Task//删除任务ChanstopTaskChan chan struct{}//取消任务ChancancelTaskChan chan TaskId
}// NewSchedule 构建一个Schedule
// workerNum 工作的协程数量,options ants协程池的配置,除了WithMaxBlockingTasks不能配置,别的都可以,具体参考:https://github.com/panjf2000/ants
func NewSchedule(workerNum int, options ...ants.Option) (*Schedule, error) {//延迟任务的最大任务数量必须不限制options = append(options, ants.WithMaxBlockingTasks(0))//创建一个协程池pool, err := ants.NewPool(workerNum)if err != nil {return nil, err}//创建一个延迟调度结构体s := &Schedule{taskHeap:       make(taskHeap, 0),executeTaskMap: make(map[TaskId]TaskId),taskIdMap:      make(map[TaskId]TaskId),running:        true,nextTaskId:     0,mux:            sync.Mutex{},pool:           pool,addTaskChan:    make(chan *Task),stopTaskChan:   make(chan struct{}),cancelTaskChan: make(chan TaskId),}//启动调度 会开启一个协程去将即将要调度的任务添加到协程池中运行s.start()return s, nil
}// ScheduleOne 添加延迟调度任务,只调度一次
// job 执行的方法 duration 周期间隔,如果是负数立马执行,如果是负数立马且只执行一次
func (s *Schedule) ScheduleOne(job func(), duration time.Duration) (TaskId, error) {return s.doSchedule(job, duration, true)
}// Schedule 添加延迟调度任务,重复调度
// job 执行的方法 duration 周期间隔,如果是负数立马且只执行一次
func (s *Schedule) Schedule(job func(), duration time.Duration) (TaskId, error) {return s.doSchedule(job, duration, false)
}// doSchedule 添加延迟调度任务的具体实现
func (s *Schedule) doSchedule(job func(), duration time.Duration, onlyOne bool) (TaskId, error) {s.mux.Lock()defer s.mux.Unlock()//如果是负数 只执行一次if duration <= 0 {onlyOne = true}if s.running {task := &Task{job:         job,executeTime: time.Now().Add(duration),onlyOne:     onlyOne,duration:    duration,}//设置任务id并添加到任务堆中s.setTaskId(task, true)s.addTaskChan <- taskreturn task.originalId, nil} else {return InvalidTaskId, ErrScheduleShutdown}
}// CancelTask 取消延迟调度任务
// taskId 任务id
func (s *Schedule) CancelTask(taskId TaskId) {s.mux.Lock()defer s.mux.Unlock()if s.running {s.cancelTaskChan <- taskId}
}// Shutdown 结束延迟任务调度
func (s *Schedule) Shutdown() {s.mux.Lock()defer s.mux.Unlock()if s.running {s.running = falses.stopTaskChan <- struct{}{}}
}// IsShutdown 延迟任务调度是否关闭
func (s *Schedule) IsShutdown() bool {s.mux.Lock()defer s.mux.Unlock()return !s.running
}// start 启动延迟任务调度
func (s *Schedule) start() {go func() {for {now := time.Now()var timer *time.Timer//如果没有任务提交,睡眠等待任务if s.taskHeap.Len() == 0 {timer = time.NewTimer(math.MaxUint16 * time.Hour)} else {//查看第一个要执行的任务是否是被取消的task := s.taskHeap.Peek().(*Task)_, ok := s.executeTaskMap[task.id]if !ok {//是被取消的任务,移除后continues.taskHeap.Pop()continue} else {//设置执行间隔timer = time.NewTimer(task.executeTime.Sub(now))}}select {case <-timer.C://到达第一个任务执行时间task := s.taskHeap.Pop().(*Task)//提交到线程池执行,返回的error不需要处理,因为任务池是无限大_ = s.pool.Submit(task.job)//单次执行则删除,多次执行,则更新if task.onlyOne {s.removeTask(&task.originalId, task.id)} else {s.updateTask(task)}case originalTaskId := <-s.cancelTaskChan:timer.Stop()//如果取消的任务id在待执行任务列表中,则删除任务if taskId, ok := s.taskIdMap[originalTaskId]; ok {s.removeTask(&originalTaskId, taskId)}case task := <-s.addTaskChan:timer.Stop()//添加任务s.addTask(task)case <-s.stopTaskChan:timer.Stop()//关闭资源s.close()return}}}()
}// updateTask 更新延迟调度任务
func (s *Schedule) updateTask(executedTask *Task) {//拷贝 并设置新的执行时间和IDtask := *executedTasktask.executeTime = time.Now().Add(task.duration)//设置任务IDs.setTaskId(&task, false)//把已执行的任务删除s.removeTask(nil, executedTask.id)//添加新的任务s.addTask(&task)
}// removeTask 移除任务
func (s *Schedule) removeTask(originalId *TaskId, taskId TaskId) {//如果原始的任务ID不为空,则为使用者取消的,从任务Map中也删除if originalId != nil {delete(s.taskIdMap, *originalId)}delete(s.executeTaskMap, taskId)
}// addTask 添加任务
func (s *Schedule) addTask(task *Task) {s.taskIdMap[task.originalId] = task.ids.executeTaskMap[task.id] = task.originalIdheap.Push(&s.taskHeap, task)
}// setTaskId 设置任务id
func (s *Schedule) setTaskId(task *Task, addOriginalId bool) {//为了兼容低版本,使用旧版本的CAS去设值,保证可见性atomic.AddInt32((*int32)(&s.nextTaskId), 1)if atomic.LoadInt32((*int32)(&s.nextTaskId)) == InvalidTaskId {atomic.AddInt32((*int32)(&s.nextTaskId), 1)}task.id = TaskId(atomic.LoadInt32((*int32)(&s.nextTaskId)))if addOriginalId {task.originalId = task.id}
}// close 关闭Schedule资源和协程池的资源
func (s *Schedule) close() {s.taskHeap = nils.taskIdMap = nils.executeTaskMap = nils.pool.Release()close(s.addTaskChan)close(s.cancelTaskChan)close(s.stopTaskChan)s.pool = nils.addTaskChan = nils.cancelTaskChan = nils.stopTaskChan = nil
}// TaskId 任务ID
type TaskId int32// Task 调度任务结构体,是一个调度任务的实体信息
type Task struct {// 原始id,用于Schedule本身的删除使用,用两层Map的方式优化数组删除的O(n)时间复杂度originalId TaskId// 任务idid TaskId// 执行的时间,每次执行完,如果重复调度就重新计算executeTime time.Time// 周期间隔duration time.Duration// 执行的任务job func()// 是否只执行一次onlyOne bool
}// 任务的堆,使用队只需要在添加的时候进行排序,堆顶是最先要执行的任务
type taskHeap []*Task// 下面都是堆接口的实现func (t *taskHeap) Len() int {return len(*t)
}
func (t *taskHeap) Less(i, j int) bool {return (*t)[i].executeTime.After((*t)[j].executeTime)
}func (t *taskHeap) Swap(i, j int) {(*t)[i], (*t)[j] = (*t)[j], (*t)[i]
}func (t *taskHeap) Push(x interface{}) {*t = append(*t, x.(*Task))
}func (t *taskHeap) Pop() interface{} {old := *tn := len(old)x := old[n-1]*t = old[:n-1]return x
}// Peek 查看堆顶元素,非堆接口的实现
func (t *taskHeap) Peek() interface{} {return (*t)[len(*t)-1]
}

代码加上详细的中文注解,一共300行。
github地址:
https://github.com/xzc-coder/go-schedule

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/24001.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

6. grafana的graph简介

1. Settings功能 2. Visualization功能 &#xff08;可视化的方式&#xff0c;后续会写一些&#xff09; 3. Display 功能&#xff08;显示方面的设置&#xff09; bars 柱状图方式显示 lines&#xff08;不选不会出功能&#xff09; line width 线条的粗细 staircase 会让折…

前缀和代码解析

前缀和是指数组一定范围的数的总和,常见的有两种,一维和二维,我会用两道题来分别解析 一维 DP34 【模板】前缀和 题目: 题目解析: 暴力解法 直接遍历数组,遍历到下标为 l 时,开始进行相加,直到遍历到下标为 r ,最后返回总和.这样做的时间复杂度为: O(n) public class Main …

RoCBert:具有多模态对比预训练的健壮中文BERT

摘要 大规模预训练语言模型在自然语言处理&#xff08;NLP&#xff09;任务上取得了最新的最优结果&#xff08;SOTA&#xff09;。然而&#xff0c;这些模型容易受到对抗攻击的影响&#xff0c;尤其是对于表意文字语言&#xff08;如中文&#xff09;。 在本研究中&#xff0…

【原创工具】文件清单生成器 By怜渠客

【原创工具】文件清单生成器 By怜渠客 刚在论坛看到了一个文件列表生成器 文件列表生成器 - 吾爱破解 - 52pojie.cn &#xff0c;和我去年写的一个软件很像&#xff0c;当时我也是有需求&#xff0c;要把一个文件夹里及其子文件夹里所有的文件列出来&#xff0c;就临时弄了个小…

深度学习-6.用于计算机视觉的深度学习

Deep Learning - Lecture 6 Deep Learning for Computer Vision 简介深度学习在计算机视觉领域的发展时间线 语义分割语义分割系统的类型上采样层语义分割的 SegNet 架构软件中的SegNet 架构数据标注 目标检测与识别目标检测与识别问题两阶段和一阶段目标检测与识别两阶段检测器…

【Linux】初始操作系统和进程(一)

目录 前言&#xff1a; 一、冯诺依曼体系结构&#xff1a; 二、操作系统&#xff1a; 1.操作系统是什么&#xff1f; 2.为什么要有操作系统&#xff1f; 3.操作系统是如何管理下层的&#xff1f; 4.操作系统是如何对上层提供服务的&#xff1f; 三、进程&#xff1a; …

【链 表】

【链表】 一级目录1. 基本概念2. 算法分析2.1 时间复杂度2.2 空间复杂度2.3 时空复杂度互换 线性表的概念线性表的举例顺序表的基本概念顺序表的基本操作1. 初始化2. 插入操作3. 删除操作4. 查找操作5. 遍历操作 顺序表的优缺点总结优点缺点 树形结构图形结构单链表基本概念链表…

python-leetcode-字符串解码

394. 字符串解码 - 力扣&#xff08;LeetCode&#xff09; class Solution:def decodeString(self, s: str) -> str:stack []num 0curr_str ""for char in s:if char.isdigit():num num * 10 int(char)elif char [:stack.append((curr_str, num))curr_str, …

力扣 下一个排列

交换位置&#xff0c;双指针&#xff0c;排序。 题目 下一个排列即在组成的排列中的下一个大的数&#xff0c;然后当这个排列为降序时即这个排列最大&#xff0c;因为大的数在前面&#xff0c;降序排列的下一个数即升序。所以&#xff0c;要是想找到当前排列的下一个排列&…

ProGuard加密混淆SpringBoot应用代码

背景 我们的项目是基于SpringCloud架构的微服务应用&#xff0c;采用Docker离线部署方式交付客户&#xff0c;通过授权证书来控制应用的许可功能模块和使用时间。我们已经在代码层已经实现&#xff1a; 基于多维度硬件指纹的绑定验证&#xff0c;cpu id、mac地址、磁盘序列、…

动态链接器(九):.init和.init_array

ELF文件中的.init和.init_array段是程序初始化阶段的重要组成部分&#xff0c;用于在main函数执行前完成必要的初始化操作。 1 .init段和.init_array 段 1.1 作用 .init段包含编译器生成的初始化代码&#xff0c;通常由运行时环境&#xff08;如C标准库的启动例程&#xff0…

Ollama微调

Ollama是一款开源工具&#xff0c;其目标是简化大语言模型在本地环境的部署和使用。它支持多种流行的开源大语言模型&#xff0c;如 Llama 2、Qwen2.5等。在上一篇文章中我们部署Ollama&#xff0c;并使用简单命令管理Ollama。接下来我们学习Ollama的高级应用。通过Ollama的Mod…

DeepSeek开源周Day1:FlashMLA引爆AI推理性能革命!

项目地址&#xff1a;GitHub - deepseek-ai/FlashMLA 开源日历&#xff1a;2025-02-24起 每日9AM(北京时间)更新&#xff0c;持续五天&#xff01; ​ 一、开源周震撼启幕 继上周预告后&#xff0c;DeepSeek于北京时间今晨9点准时开源「FlashMLA」&#xff0c;打响开源周五连…

(七)懒加载预加载

&#xff08;一&#xff09;懒加载 1. 什么是懒加载 懒加载&#xff0c;即延迟加载。在访问页面时&#xff0c;先将 img 元素或其他元素的背景图片路径替换为占位图&#xff08;通常是 1*1px 的小图片&#xff09;&#xff0c;仅当元素进入浏览器可视区域时&#xff0c;才设置…

Revisiting Reverse Distillation for Anomaly Detection

重新审视反向蒸馏在异常检测中的应用 文章链接&#xff1a;点这里 源码链接&#xff1a;点这里 前言 此篇文章是在 Anomaly detection via reverse distillation from one-class embedding 这篇的基础上改进创新的。重新审视了反向蒸馏&#xff08;KD&#xff09;这一想法&am…

Windows CMD 命令大全(Complete List of Windows CMD Commands)

Windows CMD 命令大全&#xff1a; Windows CMD 是 Windows 系统内置的命令行工具&#xff0c;用于执行各种命令和管理任务。 称为Command Prompt。它提供了一个通过键入命令来与计算机系统进行交互的方式&#xff0c;类似于早期的DOS操作系统。以下是 CMD 的基础知识和常用命…

hot100-二叉树

二叉树 二叉树递归 相当于这个的顺序来回调换 class Solution {private List<Integer> res new ArrayList<>();public List<Integer> inorderTraversal(TreeNode root) {if(root null)return res;inorderTraversal(root.left);res.add(root.val);inorde…

【JavaWeb13】了解ES6的核心特性,对于提高JavaScript编程效率有哪些潜在影响?

文章目录 &#x1f30d;一. ES6 新特性❄️1. ES6 基本介绍❄️2. 基本使用2.1 let 声明变量2.2 const 声明常量/只读变量2.3 解构赋值2.4 模板字符串2.5 对象拓展运算符2.6 箭头函数 &#x1f30d;二. Promise❄️1. 基本使用❄️2. 如何解决回调地狱问题2.1回调地狱问题2.2 使…

ROS的action通信——实现阶乘运算(三)

在ROS中除了常见的话题(topic&#xff09;通信、服务(server)通信等方式&#xff0c;还有action通信这一方式&#xff0c;由于可以实时反馈任务完成情况&#xff0c;该通信方式被广泛运用于机器人导航等任务中。本文将通过三个小节的分享&#xff0c;实现基于action通信的阶乘运…

centos系统MBR格式转换成gpt格式 (华为云)

在华为云上的centos7.9系统MBR格式转换成GPT格式的步骤 华为云上关于转换的步骤 这个链接里面 gdisk -g /dev/vda 是不对的&#xff0c;-g参数是新创建一个分区&#xff0c;慎用 自己步骤如下&#xff1a;&#xff08;已经试验过&#xff09; 1、gdisk /dev/sda (这里是盘 不…