引言
https://github.com/cnstark/gputasker
随着 AI 模型的广泛应用,GPU 成为团队中最重要的资源之一。然而,如何实时监控 GPU 的使用情况并及时通知团队是一个值得关注的问题。为了更好地管理显卡资源,本文基于 GPUTasker,实现了一个定期向钉钉群推送显卡使用情况的机器人。
我们通过钉钉自定义机器人 API 和 GPU 监控工具,结合 Python 脚本实现了以下功能:
- 根据设定的 工作时间 和 节假日规则,控制消息推送;
- 按指定时间间隔发送 GPU 的利用率、显存使用量以及正在使用显卡的用户信息;
- 自动跳过节假日和非工作时间,减少不必要的推送。
实现步骤
1. 获取钉钉机器人 Token 和 Secret
在钉钉群中创建一个自定义机器人,获取 Token 和 Secret。具体步骤如下:
- 登录钉钉 Web 端:
打开 钉钉开放平台 或在钉钉桌面端打开需要管理的工作群。 - 添加机器人:
- 点击群设置 -> 智能群助手 -> 添加机器人;
- 选择 自定义机器人,并设置一个名称(如:GPU 使用监控机器人);
- 配置机器人安全设置,选择 自定义关键词 或 签名校验。
- 记录 Token 和 Secret:
- 添加完成后,系统会生成一个 Token;
- 如果选择了签名校验,还会生成一个 Secret;
- 这两个字段将在脚本中用于身份验证。
2. Messenger 类的实现
Messenger 类是整个系统的核心,负责构建和发送消息到钉钉群。以下是该类的详细实现及功能介绍。
2.1 文件路径
在项目中,新建以下文件路径:
dingding/dingding.py
将 Messenger 类的代码放入 dingding.py 文件中,供其他模块调用。
2.2 核心功能
以下是 Messenger 类的关键功能:
- 节假日跳过:
使用 chinese_calendar 库判断当前日期是否为中国法定节假日。如果是节假日,机器人将自动跳过消息推送。 - 工作时间设置:
支持自定义工作时间段(如上午 8:20 到 11:50,下午 13:10 到 17:30),并在非工作时间内停止推送消息。 - 固定时间间隔推送:
支持设置推送间隔时间(如每 30 分钟推送一次),避免频繁发送消息。 - 显卡使用信息格式化:
将显卡使用情况转化为 Markdown 格式,方便在钉钉群中以表格形式展示。
以下是 Messenger 类的完整代码:
import os
import time
import hmac
import json
import base64
import hashlib
import requests
import chinese_calendar as calendar
from urllib.parse import quote_plus
from datetime import datetimeclass Messenger:def __init__(self, token=os.getenv("DD_ACCESS_TOKEN"), secret=os.getenv("DD_SECRET")):"""初始化方法@param token: str, 钉钉机器人访问令牌@param secret: str, 钉钉机器人密钥"""self.token = tokenself.secret = secretself.URL = "https://oapi.dingtalk.com/robot/send"self.headers = {'Content-Type': 'application/json'}self.params = {'access_token': self.token}self.update_timestamp_and_sign()# GPU 参数self.total_memory_GB = 24self.utilization_thred = 0.6self.memory_used_thred = 0.5# 时间控制参数self.time_range = [('08:20', '11:50'), ('13:10', '17:30')]self.last_true_time = {}self.time_interval = 30 # 间隔30分钟推送一次def send_md(self, message_json, server_ip):"""发送 Markdown 格式的消息到钉钉。"""self.update_timestamp_and_sign()if self.should_call_function_during_chinese_workdays(server_ip):if not message_json:text = f"**服务器IP**: `{server_ip}`\n**状态**: **连接失败**"self.send_markdown_to_dingtalk("服务器连接失败", text)else:content, is_free = self.format_gpu_usage_to_markdown(message_json, server_ip)if is_free:self.send_markdown_to_dingtalk("显卡使用情况", content)def update_timestamp_and_sign(self):"""更新时间戳和签名。"""self.timestamp = str(round(time.time() * 1000))secret_enc = self.secret.encode('utf-8')string_to_sign = '{}\n{}'.format(self.timestamp, self.secret)string_to_sign_enc = string_to_sign.encode('utf-8')hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()self.sign = quote_plus(base64.b64encode(hmac_code))self.params['timestamp'] = self.timestampself.params['sign'] = self.signdef send_markdown_to_dingtalk(self, title, text):"""构建并通过钉钉发送 Markdown 消息。"""data = {"msgtype": "markdown","markdown": {"title": title,"text": text}}try:requests.post(url=self.URL, data=json.dumps(data), params=self.params, headers=self.headers)except Exception as e:print(f"发生错误: {e}")def format_gpu_usage_to_markdown(self, message_json, server_ip):"""格式化 GPU 使用信息为 Markdown 文本。"""rows = []rows.append(f"**{server_ip}**")rows.append("")rows.append("| ID | GPU利用率 | 显存使用量 | 用户 |")rows.append("|:-------:|:------------:|:----------------:|:------:|")is_any_free = Falsefor gpu in message_json:index = gpu['index']utilization = gpu['utilization.gpu']memory_used_MB = gpu['memory.used']memory_used_GB = memory_used_MB / 1024memory_percentage = (memory_used_MB / (self.total_memory_GB * 1024)) * 100users = [process['username'] for process in gpu['processes']]users_str = ', '.join(set(users)) if users else '-'is_free = utilization < 100 * self.utilization_thred and memory_used_MB < (self.total_memory_GB * 1024 * self.memory_used_thred)if is_free:is_any_free = Truerow = f"| <font color='green'>**{index}**</font> | <font color='green'>**{utilization}%**</font> | <font color='green'>**{memory_used_GB:.1f}GB ({memory_percentage:.0f}%)**</font> | <font color='green'>**{users_str}**</font> |"else:row = f"| {index} | {utilization}% | {memory_used_GB:.1f}GB ({memory_percentage:.0f}%) | {users_str} |"rows.append(row)return '\n'.join(rows), is_any_freedef should_call_function_during_chinese_workdays(self, server_ip):"""检查是否为中国工作日以及指定时间段。"""now = datetime.now()current_time = now.time()if not calendar.is_workday(now):return Falsein_any_time_range = Falsefor time_range in self.time_range:start_time = datetime.strptime(time_range[0], '%H:%M').time()end_time = datetime.strptime(time_range[1], '%H:%M').time()if start_time <= end_time:in_time_range = start_time <= current_time <= end_timeelse:in_time_range = start_time <= current_time or current_time <= end_timeif in_time_range:in_any_time_range = Truebreakif in_any_time_range:last_time = self.last_true_time.get(server_ip)if last_time is None or (now - last_time).total_seconds() >= self.time_interval * 60:self.last_true_time[server_ip] = nowreturn Truereturn False# 实例化类
messager = Messenger(token="xxxxxx",secret="xxxxxx")
2.3 调用 Messenger 类
将以下代码加入 gputasker/gpu_info/utils.py 中,通过 try 捕获异常并调用钉钉推送功能:
from dingding.dingding import messagerclass GPUInfoUpdater:def update_gpu_info(self):server_list = GPUServer.objects.all()for server in server_list:try:gpu_info_json = get_gpu_status(server.ip, self.user, server.port, self.private_key_path)except:gpu_info_json = Nonefinally:messager.send_md(gpu_info_json, server.ip)
3. 效果展示
以下是钉钉群中接收到的 GPU 使用情况推送示例:
**172.20.3.27**
| ID | GPU利用率 | 显存使用量 | 用户 |
|:-------:|:------------:|:----------------:|:------:|
| 0 | 0% | 12.7GB (53%) | root|
| 1 | 87% | 16.7GB (70%) | root|
| 2 | 92% | 14.2GB (59%) | root|
| 3 | 87% | 14.2GB (59%) | root|
| 4 | 86% | 14.2GB (59%) | root|
| 5 | 83% | 14.2GB (59%) | root|
| 6 | 86% | 17.0GB (71%) | root|
| 7 | 0% | 2.1GB (9%) | root|
总结
通过本文的实现,可以将 GPU 使用情况实时推送到钉钉群,方便团队成员及时了解资源状态,提高显卡的利用效率。