最近在研究Websocket功能,本来想接入抖音和快手的弹幕功能,以及短消息功能。
在了解的过程中,也开发了一些测试项目。
这不是,就把CSDN的短消息项目给弄出来了。
直接上代码:
# !/usr/bin python3
# -*- encoding=utf-8 -*-
# Description :
# Author :
# @Email :
# @File : csdn.py
# @Time : 2023-09-05 15:18:58
# @Project : live_coreimport requests
import websocket
import threading
import time
import json
import logging
import reclass CSDN(object):user_name = ''device_id = ''cookie = ''def init_wws(self, cookie):try:self.cookie = cookieself.user_name = re.search('UserName=(.*?);', cookie).group(1)self.device_id = re.search('uuid_tt_dd=(.*?);', cookie).group(1)except Exception as e:logging.error('错误'.format(e.args))def _connect_data(self):_url = 'https://bizapi.csdn.net/im-manage/v1.0/dispatch/do'_h = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36','Cookie': self.cookie}_d = {'appId': "CSDN-PC",'channelType': "privateMsg",'deviceId': self.device_id,'groupId': "CSDN-private-MSG",'linkType': 1,'token': "",'userId': self.user_name}_req = requests.post(_url, headers=_h, json=_d)return _req.json()def _on_open(self, ws):_body = self._connect_data()# print(_body.get('data'))_body_data = _body.get('data')_data = {'body': {'userId': self.user_name,'appId': 'CSDN-PC','imToken': _body_data.get('imToken'),'groupId': 'CSDN-private-MSG'},'cmdId': 2,'isZip': 0,'ver': '1.0'}ws.send(json.dumps(_data))threading.Thread(target=self._keep_heart_beat, args=(ws,), daemon=True).start()def _keep_heart_beat(self, ws: websocket.WebSocketApp):while True:time.sleep(5)_payload = {'cmdId': 1}ws.send(json.dumps(_payload))def _on_message(self, ws: websocket.WebSocketApp, message):logging.info(message)def _on_error(self, ws: websocket.WebSocketApp, error):logging.error(error)def _on_close(self, ws):logging.info('close')def wss_start(self):websocket.enableTrace(False)ws = websocket.WebSocketApp('wss://im-linkserver-66.csdn.net/', on_message=self._on_message, on_error=self._on_error, on_close=self._on_close,on_open=self._on_open)ws.run_forever()if __name__ == '__main__':LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)# cookies 这里填CSDN的Cookies_c = ''_csdn = CSDN()_csdn.init_wws(_c)_csdn.wss_start()
运行结果如下: