目录
开通方式
参考代码
使用示范
其他的
企业微信
TG
Plusplus
Server
Server酱
开通方式
免费,有群就能开,任何用户都可开
官方文档:自定义机器人接入 - 钉钉开放平台
注意事项:用电脑版钉钉来开通,手机上的不行。
参考代码
import time
import hmac
import hashlib
import base64
import urllib.parse
import io
from pyzbar import pyzbar
from PIL import Image
import requests, json # 导入依赖库def decode_img(data):img_b64decode = base64.b64decode(data[data.index(';base64,')+8:])decoded = pyzbar.decode(Image.open(io.BytesIO(img_b64decode)))return decoded[0].data.decode("utf-8")class DingDingHandler:def __init__(self, token, secret):self.token = tokenself.secret = secretdef get_url(self):timestamp = round(time.time() * 1000)secret_enc = self.secret.encode("utf-8")string_to_sign = "{}\n{}".format(timestamp, self.secret)string_to_sign_enc = string_to_sign.encode("utf-8")hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))# 完整的urlapi_url = "https://oapi.dingtalk.com/robot/send?access_token={}×tamp={}&sign={}".format(self.token, timestamp, sign)print("钉钉机器人url: ", api_url)return api_urldef ddimgsend(self, img_data, retry=0):# ...self.ddlinksend(img_data, title=f"学习吧{'-重试:'+str(retry) if retry>0 else ''}")self.ddtextsend(decode_img(img_data))def ddlinksend(self, link, text='学习', title='学习吧'):headers = {"Content-Type": "application/json"} # 定义数据类型data = {"msgtype": "link","link": {"text": text,"title": title, #f"学习吧{'-重试:' + str(retry) if retry > 0 else ''}","messageUrl": link,},}res = requests.post(self.get_url(), data=json.dumps(data), headers=headers) # 发送post请求print(res.text)def ddtextsend(self, text):data={}headers = {"Content-Type": "application/json"} # 定义数据类型if text.startswith('dtxuexi://appclient/'):data = {"msgtype": "link","link": {"text": "请点击重新登录","title": "登录失效","messageUrl": text,},}else:data = {"msgtype": "text","text": {"content": text,},}res = requests.post(self.get_url(), data=json.dumps(data), headers=headers) # 发送post请求print(res.text)
使用示范
accesstoken = 'xxx'
secret = 'xxx'
msg = 'xxxx'push = DingDingHandler(accesstoken, secret)
push.ddtextsend(msg)
其他的
企业微信
# 企业微信推送
def wxpush(msg, usr, corpid, corpsecret, agentid=1000002):base_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?'req_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token='corpid = corpidcorpsecret = corpsecretagentid = agentidif agentid == 0:agentid = 1000002#获取access_token,每次的access_token都不一样,所以需要运行一次请求一次def get_access_token(base_url, corpid, corpsecret):urls = base_url + 'corpid=' + corpid + '&corpsecret=' + corpsecretresp = requests.get(urls).json()access_token = resp['access_token']return access_tokendef send_message(msg, usr):data = get_message(msg, usr)req_urls = req_url + get_access_token(base_url, corpid, corpsecret)res = requests.post(url=req_urls, data=data)ret = res.json()if ret["errcode"] == 0:print(f"[{now}] 企业微信推送成功")else:print(f"[{now}] 推送失败:{ret['errcode']} 错误信息:{ret['errmsg']}")def get_message(msg, usr):data = {"touser": usr,"toparty": "@all","totag": "@all","msgtype": "text","agentid": agentid,"text": {"content": msg},"safe": 0,"enable_id_trans": 0,"enable_duplicate_check": 0,"duplicate_check_interval": 1800}data = json.dumps(data)return datamsg = msgusr = usrif corpid == '':print("[注意] 未提供corpid,不进行企业微信推送!")elif corpsecret == '':print("[注意] 未提供corpsecret,不进行企业微信推送!")else:send_message(msg, usr)
TG
# 推送tg
def push_tg(token, chat_id, desp=""):"""推送消息到TG"""if token == '':print("[注意] 未提供token,不进行tg推送!")elif chat_id == '':print("[注意] 未提供chat_id,不进行tg推送!")else:server_url = f"https://api.telegram.org/bot{token}/sendmessage"params = {"text": '消息推送\n\n' + desp,"chat_id": chat_id}response = requests.get(server_url, params=params)json_data = response.json()if json_data['ok'] == True:print(f"[{now}] 推送成功。")else:print(f"[{now}] 推送失败:{json_data['error_code']}({json_data['description']})")
Plusplus
# 推送pushplus
def push_pushplus(token, content=""):"""推送消息到pushplus"""if token == '':print("[注意] 未提供token,不进行pushplus推送!")else:server_url = f"http://www.pushplus.plus/send"params = {"token": token,"title": '消息推送',"content": content}response = requests.get(server_url, params=params)json_data = response.json()if json_data['code'] == 200:print(f"[{now}] 推送成功。")else:print(f"[{now}] 推送失败:{json_data['code']}({json_data['message']})")
Server
# 推送server
def push_server(sckey, desp=""):"""推送消息到微信"""if sckey == '':print("[注意] 未提供sckey,不进行微信推送!")else:server_url = f"https://sctapi.ftqq.com/{sckey}.send"params = {"title": '消息推送',"desp": desp}response = requests.get(server_url, params=params)json_data = response.json()if json_data['code'] == 0:print(f"[{now}] 推送成功。")else:print(f"[{now}] 推送失败:{json_data['code']}({json_data['message']})")
Server酱
# 推送server酱
def push_wx(sckey, desp=""):"""推送消息到微信"""if sckey == '':print("[注意] 未提供sckey,不进行推送!")else:server_url = f"https://sc.ftqq.com/{sckey}.send"params = {"text": '消息推送',"desp": desp}response = requests.get(server_url, params=params)json_data = response.json()if json_data['errno'] == 0:print(f"[{now}] 推送成功。")else:print(f"[{now}] 推送失败:{json_data['errno']}({json_data['errmsg']})")