Python Tips6 基于数据库和钉钉机器人的通知

说明

起因是我第一版quant程序的短信通知失效了。最初认为短信是比较即时且比较醒目的通知方式,现在看来完全不行。

列举三个主要问题:

  • 1 延时。在早先还能收到消息的时候,迟滞就很严重,几分钟都算短的。
  • 2 完全丢失。我手机没有做任何设置,但是没有消息了。反正华为和腾讯,总有一个是锅。
    在这里插入图片描述
  • 3 短信模板。由于短信的模板需要审批,所以内容缩减的不像样子。所以信息阅读起来很累。

结论:不再采用短信方式,而是使用机器、邮件的方式通知。

内容

以下主要的内容有两部分:

  • 1 通过ORM方式操作数据库。
  • 2 使用钉钉机器人

1 数据

数据存在Mongo中,形如

在这里插入图片描述
在每次生成交易订单时,都会发出一条短信。所以这几天miss掉的订单,每个都是5个点,甚至十几个点的利润,太可惜了。100个短信包也比不上这些损失啊。

需要在订单的买和卖时间发生变动时发送通知信息。

对于发送程序来说,可以每30秒执行一次查询,每次执行时基于「当前时间」来判断需要发送的消息:

  • 1 基础筛选。只查询3天的候选数据。
  • 2 筛选买单。当 is_buy 字段不为1时,说明该笔订单的买信号没有发送,进行发送。
  • 3 筛选卖单。当 is_sell字段为不为1时,处理卖单消息。

所以对于业务来说,Mongo真的是更方便的,当需要增加字段时完全不用管表结构。

最近因为要做一些数据库方面的简单培训,在梳理的过程中,我发现其实很多数据库的出现是因为其特定的业务场景塑造的,而且通常能适应A场景的数据库,在B场景就不太行。所以A数据库通常无法取代B数据库

以Mongo和Mysql为例,是否A能取代B的问题应该是研究最多的(更大的考虑是否能用NoSQL取代SQL),现在来看,结论是不能。

目前可以做一些简单结论:

  • 1 当数据维度经常需要变动时,用Mongo。所以与操作和控制相关的场景下,主要用Mongo。业务形态不固定的时候,也用Mongo。例如本次,最初我并没有设计is_buy字段,但是要更改时几乎是瞬时完成的,似乎一开始就这样。
  • 2 当数据需要连表时,用Mysql。特别是结构化数据,用Mysql来存是比较方便的,每个字段也预先做了限定,不会犯错。

以下也简单总结了一些数据库的特点/场景:

  • 1 Redis : 高速kv存取,可用于实时计数等。
  • 2 Mongo: 文档性数据库,可用于集成部分的数据,适合作为主库。
  • 3 Mysql: 表格型数据库,可用于结构化部分的存储。
  • 4 Postgres: 算是Mysql的平替吧,在功能上更完善一些,许可更open。
  • 5 ES: 文档性数据库,主要用于解决模糊搜索效率的问题,用于搜索、推荐。
  • 6 Milvus: 向量数据库,主要解决大量中间向量的存储,以及相似性检索,用于搜图等功能。
  • 7 ClickHouse: 列式数据库。用于高效存储和统计,某种程度上也适合备份。速度太快,压缩比太高了。
  • 8 InfluxDB: 时序数据库。快速存储和分析时间数据。可用于物联网、金融等。
  • 9 Neo4j: 图库。用于存储真正的关联信息,适合用于知识图谱。
  • 10 SQLite:容易被忽略,但是这个还是非常棒的数据库。适合微型应用,嵌入式应用等。

还有很多数据库,对应着一些其他维度的细分。比如FAISS也是向量数据库,但是是在内存里;而Milvus是在硬盘上。然后还有很多是功能近似的,就不展开了。

2 ORM

之前我用pymongo自己搭了微服务,然后基于这个微服务又做了WMongo对象封装,本质上还是函数式的。结论是,函数式也好,agent(微服务)也好,的确是有其存在的必要和应用场景的。不过这种场景更偏向后端纯粹的数据处理与分析,在搭建应用对象(强调集成性和灵活性)时的确不方便。WMongo如果之后继续和pydantic结合,我应该也会做出一个类似工具。

言归正传, 操作Mongo时pymongo和mongoengine的关系就类似与pymysql和sqlalchemy;而Motor就像 aiomysql + sqlalchemy。

先从同步入手:

2.1 连接

from mongoengine import connect, disconnect,Document, StringField, IntField,DictField# 可以直接使用函数式连接
connect(db='your_database_name',       # 数据库名称host='localhost',              # 主机地址,默认为localhostport=27017,                    # 端口号,默认为27017username='your_username',      # 可选,用户名password='your_password',      # 可选,密码authentication_source='admin'  # 可选,认证数据库,默认为admin
)
# 也可以使用url方式连接
# 集群
connect(host='mongodb://USER:PASSWD@IP1:PORT1,IP2:PORT2,IP3:PORT3,IP4:PORT4/%s?authSource=admin&replicaSet=mymeta' % db_name
)

本次仅按单机方式连接

# 单机模式
host_url = 'mongodb://USER:PASSWD@IP1:PORT1/%s?authSource=admin' % db_name
client = connect(host=host_url)
# 获取所有数据库
databases = client.list_database_names()
print("Databases:")
for db in databases:print(db)# 遍历每个数据库,获取集合
for db_name in databases:db = client[db_name]collections = db.list_collection_names()print(f"\nCollections in {db_name}:")for collection in collections:print(collection)# 断开连接
disconnect()

连接之后,做了一些函数式的操作,这个虽然不是ORM的主要方向,但有时候还是有点用的。接下来则是对数据的操作:

设置对象时,如果meta没有设置,对应的表名将会按类名的小写拆开建立并严格校验字段的一致性。这里我并不会映射现有表的全部字段,需要设置非严格模式。

    meta = {'collection': 'trade_orders',  # MongoDB 中的集合名'strict': False,  # 不强制字段验证,允许动态字段}

2.2 查询

我发现关于mongoengine的内容,大模型说的很多是错误的。
这是一个还不错的介绍。一文带你深入浅出 MongoEngine 经典查询【内附详细案例】

因为之前用pymongo做了很多开发,所以知道字符串是可以比较的。程序里的这个报错TypeError: '>=' not supported between instances of 'StringField' and 'str'是指>=只能用于数值型的筛选。而mongo里的gte方法则是通过变量名称的特殊构造来传递的(x__gte), 虽然有点妖,但的确是个好点子。

另外如果返回后会进行批量处理(而不是使用单个实例),那么可以在定义映射模型时定义返回字典。


# 定义与 MongoDB 集合关联的类
class TradeOrders(Document):# 指定 MongoDB 中的集合名称# 启用动态模式,这样可以处理未定义的字段meta = {'collection': 'trade_orders',  # MongoDB 中的集合名'strict': False,  # 不强制字段验证,允许动态字段'indexes': [{'fields': ['is_buy_sms_tag']},{'fields': ['is_sell_sms_tag']}]}# 定义字段model_signal = StringField(required=True, max_length=50)reason = StringField()np = FloatField()is_win = IntField()buy_price = FloatField()sell_price = FloatField()buy_dt = StringField()sell_dt = StringField()buy_amt = FloatField()strategy_name = StringField()is_buy_sms_tag = IntField()is_sell_sms_tag = IntField()def dict(self):res_dict = {}res_dict['model_signal'] = self.model_signalres_dict['reason'] = self.reasonres_dict['np'] = self.npres_dict['is_win'] = self.is_winres_dict['buy_price'] = self.buy_priceres_dict['sell_price'] = self.sell_priceres_dict['buy_dt'] = self.buy_dtres_dict['buy_amt'] = self.buy_amtres_dict['strategy_name'] = self.strategy_nameres_dict['is_buy_sms_tag'] = self.is_buy_sms_tagres_dict['is_sell_sms_tag'] = self.is_sell_sms_tagreturn res_dict# 实例化时会为现有集合创建索引(是ensure index的方式)
from datetime import datetime, timedelta
# 获取当前时间
now = datetime.now()# 计算7天前的时间
seven_days_ago = now - timedelta(days=7)# 将 seven_days_ago 转换为字符串格式,以便与 buy_dt 和 sell_dt 进行比较
seven_days_ago_str = seven_days_ago.strftime('%Y-%m-%d %H:%M:%S')# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(buy_dt__gt=seven_days_ago_str).all()
recent_orders1 = [x.dict() for x in recent_orders]{'model_signal': '510500_normal_strong_2_near_120_far_1200_top_85_standard_gbdt_running_B','reason': 'Control Sell','np': 392.1,'is_win': 1,'buy_price': 5.533,'sell_price': 5.999,'buy_dt': '2024-09-30 09:54:00','buy_amt': 4980.0,'strategy_name': '510500_normal_strong_2_near_120_far_1200_top_85_standard_gbdt_running_B_15_15','is_buy_sms_tag': None,'is_sell_sms_tag': None},{'model_signal': None,'reason': None,'np': None,'is_win': None,'buy_price': 4.162,'sell_price': None,'buy_dt': '2024-09-30 13:43:00','buy_amt': 4994.0,'strategy_name': '510300_normal_strong_2_near_60_far_1800_top_80_all_notnull_gbdt_running_A_10_15','is_buy_sms_tag': None,'is_sell_sms_tag': None}]

3 钉钉

以下是发送函数。webhook url需要在钉钉群里创建机器人得到。

import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse# 钉钉 Webhook 和加签密钥# mine
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxx'
secret ='xxx'# 生成签名
def generate_sign(secret):timestamp = str(round(time.time() * 1000))secret_enc = secret.encode('utf-8')string_to_sign = '{}\n{}'.format(timestamp, secret)string_to_sign_enc = string_to_sign.encode('utf-8')hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))return timestamp, sign# 发送信息到群里面
def send_dingtalk_message_markdown(webhook, secret, title = None, message = None):try:timestamp, sign = generate_sign(secret)url = f"{webhook}&timestamp={timestamp}&sign={sign}"headers = {'Content-Type': 'application/json'}data = {"msgtype": 'markdown',"markdown": {'title':title,"text": message}} print(f"Sending message to URL: {url}")print(f"Message content: {data}")response = requests.post(url, headers=headers, data=json.dumps(data))print(f"Response: {response.status_code}, {response.text}")return response.status_code, response.textexcept Exception as e:print(f"Error sending message: {e}")return None, str(e)send_dingtalk_message_markdown('https://oapi.dingtalk.com/robot/send?access_token=xxx',
'xxx',title ='It is Title', message= '# It is Content\n## AA' )

在这里插入图片描述
效果比短信好多了。

4 整合逻辑

按照买消息和卖消息两个维度进行数据的整理和通知。对于买消息而言,消息的内容更少,而对于卖消息,需要展示的内容更多。在发布完消息后要把对应的标记打上,避免再次被筛选到。

作为Buy,提取以下字段发布

  • 1 Code
  • 2 Buy Datetime
  • 3 Buy Price
  • 4 Buy Amout
  • 5 Strategy Name

发布消息之后进行ACK(也就是把对应的字段置为1),所以,某种程度上说使用消息队列可能更自然一些。

# 先获取未消息的买单数据
from mongoengine.queryset.visitor import Q
# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(Q(buy_dt__gt=seven_days_ago_str) & Q(is_buy_sms_tag__ne = 1)).all()
recent_orders1 = [x.dict() for x in recent_orders]# 对任何一个买单
some_buy_order = recent_orders[0]
code = some_buy_order.strategy_name.split('_')[0]
msg_title = '### Buy %s' % code 
msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_buy_order.buy_dt, some_buy_order.buy_price,some_buy_order.buy_vol,some_buy_order.buy_amt ,some_buy_order.strategy_name)send_dingtalk_message_markdown('https://oapi.dingtalk.com/robot/send?access_token=xxx',
'xxx',title =msg_title, message= msg_text )

在这里插入图片描述
发送消息后,对相应的买单执行ACK

# ack
some_buy_order.is_buy_sms_tag = 1
some_buy_order.save()

循环执行买消息


from mongoengine.queryset.visitor import Q
# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(Q(buy_dt__gt=seven_days_ago_str) & Q(is_buy_sms_tag__ne = 1)).all()
# buy / one
import time 
if len(recent_orders):print('共有%s个买消息' % len(recent_orders))for some_buy_order in recent_orders:# some_buy_order = recent_orders[0]code = some_buy_order.strategy_name.split('_')[0]msg_title = '### Buy %s' % code msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_buy_order.buy_dt, some_buy_order.buy_price,some_buy_order.buy_vol,some_buy_order.buy_amt ,some_buy_order.strategy_name)# msg send_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text )# acksome_buy_order.is_buy_sms_tag = 1some_buy_order.save()# 防止消息发的太快time.sleep(1)

然后每次执行时都执行一次查询,如果有买单,按顺序循环输出即可。
在这里插入图片描述
同样的,对于卖单

  • 1 查询未发送的卖单
  • 2 与买单相比,卖单会输出更多的字段
if len(recent_sell_orders):print('共有%s个卖消息' % len(recent_sell_orders))for some_sell_order in recent_sell_orders:# some_sell_order = recent_sell_orders[0]code = some_sell_order.strategy_name.split('_')[0]msg_title = '### Sell %s' % code msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_sell_order.buy_dt, some_sell_order.buy_price,some_sell_order.buy_vol,some_sell_order.buy_amt ,some_sell_order.strategy_name)msg_sell_text = '''\n### Sell DT:%s\n### Sell Price:%s\n### Sell Vol:%s\n ### Sell Amt:%s\n### NetProfit: %s\n### Reason:%s ''' % (some_sell_order.sell_dt, some_sell_order.sell_price, some_sell_order.sell_vol, some_sell_order.sell_amt, some_sell_order.np,some_sell_order.reason)send_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text+ msg_sell_text)# acksome_sell_order.is_sell_sms_tag = 1some_sell_order.save()# 防止消息发的太快time.sleep(1)

效果类似
在这里插入图片描述

5 APS运行

ubuntu系统自带的cron并不好用,所以我用python的apscheduler。只是在系统重启的时候需要自己管理,有两个办法可以解决这个问题:

  • 1 注册systemd服务,这样开机时服务可以跟随启动
  • 2 使用docker运行。

不过我给机器配了ups,一般情况下也不会关机,这个问题等之后再考虑吧,暂时就用nohup后台运行。

  • 1 在/home/workers/ 下放py文件
  • 2 在/home/local_aps/ 下放脚本文件
  • 3 在/home/local_aps/ 下放主启动文件

aps.py 主程序启动,aps默认是采用多线程方式控制多个worker(所以应该也也不是完全阻塞的)

from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingSchedulerdef exe_sh(fpathname = None):os.system('sh %s ' % fpathname)# 后台启动命令 nohup python3 /root/prj27_timetask/cron_task/test_001.py >/dev/null 2>&1 &if __name__ == '__main__':sche1 = BlockingScheduler()sche1.add_job(exe_sh,'interval', seconds=30, kwargs ={'fpathname':'/home/local_aps/aps01.sh'})# sche1.add_job(exe_sh,'interval', seconds=86400, kwargs ={'fpathname':'/home/shs/clean_log.sh'})print('[S] starting inteverl')sche1.start()

aps01.sh用于灵活存放多个需要执行的python脚本

#!/bin/bash
python3 /home/workers/qtv102_dingtalk.py

qtv102_dingtalk.py 加上了logger部分

import logging
from logging.handlers import RotatingFileHandlerdef get_logger(name, lpath='/var/log/'):logger = logging.getLogger(name)fpath = lpath + name + '.log'handler = RotatingFileHandler(fpath, maxBytes=100 * 1024 * 1024, backupCount=10)# 设置日志格式为 [时间] - [日志级别] - 消息formatter = logging.Formatter('[%(asctime)s] - [%(levelname)s] - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')handler.setFormatter(formatter)logger.addHandler(handler)logger.setLevel(logging.INFO)return logger# 按日志格式写入数据
log_info_tmp = '[%s] - %s'
logger = get_logger('qtv102_dingtalk')from mongoengine import connect, disconnect,Document, StringField, IntField,DictField,FloatField# connect(
#     db='your_database_name',       # 数据库名称
#     host='localhost',              # 主机地址,默认为localhost
#     port=27017,                    # 端口号,默认为27017
#     username='your_username',      # 可选,用户名
#     password='your_password',      # 可选,密码
#     authentication_source='admin'  # 可选,认证数据库,默认为admin
# )db_name = 'QTV102_Strategy'# 单机模式
host_url = 'mongodb://xxx:xxx@192.168.0.159:24086/'
connect(db_name, host=host_url)# 定义与 MongoDB 集合关联的类
class TradeOrders(Document):# 指定 MongoDB 中的集合名称# 启用动态模式,这样可以处理未定义的字段meta = {'collection': 'trade_orders',  # MongoDB 中的集合名'strict': False,  # 不强制字段验证,允许动态字段'indexes': [{'fields': ['is_buy_sms_tag']},{'fields': ['is_sell_sms_tag']}]}# 定义字段# model_signal = StringField(required=True, max_length=50)reason = StringField()np = FloatField()is_win = IntField()buy_price = FloatField()sell_price = FloatField()buy_dt = StringField()sell_dt = StringField()buy_vol = IntField()sell_vol = IntField()buy_amt = FloatField()sell_amt = FloatField()strategy_name = StringField()is_buy_sms_tag = IntField()is_sell_sms_tag = IntField()def dict(self):res_dict = {}res_dict['reason'] = self.reasonres_dict['np'] = self.npres_dict['is_win'] = self.is_winres_dict['buy_price'] = self.buy_priceres_dict['sell_price'] = self.sell_priceres_dict['buy_dt'] = self.buy_dtres_dict['buy_amt'] = self.buy_amtres_dict['sell_dt'] = self.sell_dtres_dict['sell_amt'] = self.sell_amtres_dict['buy_vol'] = self.buy_volres_dict['sell_vol'] = self.sell_volres_dict['strategy_name'] = self.strategy_nameres_dict['is_buy_sms_tag'] = self.is_buy_sms_tagres_dict['is_sell_sms_tag'] = self.is_sell_sms_tagreturn res_dict# 实例化时会为现有集合创建索引(是ensure index的方式)
from datetime import datetime, timedelta
# 获取当前时间
now = datetime.now()# 计算7天前的时间
seven_days_ago = now - timedelta(days=7)# 将 seven_days_ago 转换为字符串格式,以便与 buy_dt 和 sell_dt 进行比较
seven_days_ago_str = seven_days_ago.strftime('%Y-%m-%d %H:%M:%S')# recent_orders1 = [x.dict() for x in recent_orders]# 发送消息
# test_md = '''# 关于订单 \n## 1 订单 allls'''
# test_html = '''<h1>关于订单</h1> <p>allls</p>'''# --- 发送消息
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse# 钉钉 Webhook 和加签密钥# mine
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxx'
secret ='xxx'# 生成签名
def generate_sign(secret):timestamp = str(round(time.time() * 1000))secret_enc = secret.encode('utf-8')string_to_sign = '{}\n{}'.format(timestamp, secret)string_to_sign_enc = string_to_sign.encode('utf-8')hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))return timestamp, sign# 发送信息到群里面
def send_dingtalk_message_markdown(webhook, secret, title = None, message = None):try:timestamp, sign = generate_sign(secret)url = f"{webhook}&timestamp={timestamp}&sign={sign}"headers = {'Content-Type': 'application/json'}data = {"msgtype": 'markdown',"markdown": {'title':title,"text": message}}print(f"Sending message to URL: {url}")print(f"Message content: {data}")response = requests.post(url, headers=headers, data=json.dumps(data))print(f"Response: {response.status_code}, {response.text}")return response.status_code, response.textexcept Exception as e:print(f"Error sending message: {e}")return None, str(e)from mongoengine.queryset.visitor import Q# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(Q(buy_dt__gt=seven_days_ago_str) & Q(is_buy_sms_tag__ne = 1)).all()
logger.info(log_info_tmp % ('get_open_orders', 'recs %s' % len(recent_orders)))
# buy / one
import time
if len(recent_orders):print('共有%s个买消息' % len(recent_orders))for some_buy_order in recent_orders:# some_buy_order = recent_orders[0]code = some_buy_order.strategy_name.split('_')[0]msg_title = '### Buy %s' % codemsg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_buy_order.buy_dt, some_buy_order.buy_price,some_buy_order.buy_vol,some_buy_order.buy_amt ,some_buy_order.strategy_name)# msgsend_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text )# acksome_buy_order.is_buy_sms_tag = 1some_buy_order.save()# 防止消息发的太快time.sleep(1)logger.info(log_info_tmp % ('sent_open_orders', 'recs %s' % len(recent_orders)))# 卖单
recent_sell_orders = TradeOrders.objects(Q(sell_dt__gt=seven_days_ago_str) & Q(is_sell_sms_tag__ne = 1)).all()logger.info(log_info_tmp % ('get_close_orders', 'recs %s' % len(recent_sell_orders)))
if len(recent_sell_orders):print('共有%s个卖消息' % len(recent_sell_orders))for some_sell_order in recent_sell_orders:# some_sell_order = recent_sell_orders[0]code = some_sell_order.strategy_name.split('_')[0]msg_title = '### Sell %s' % codemsg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_sell_order.buy_dt, some_sell_order.buy_price,some_sell_order.buy_vol,some_sell_order.buy_amt ,some_sell_order.strategy_name)msg_sell_text = '''\n### Sell DT:%s\n### Sell Price:%s\n### Sell Vol:%s\n ### Sell Amt:%s\n### NetProfit: %s\n### Reason:%s ''' % (some_sell_order.sell_dt, some_sell_order.sell_price, some_sell_order.sell_vol, some_sell_order.sell_amt, some_sell_order.np,some_sell_order.reason)send_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text+ msg_sell_text)# acksome_sell_order.is_sell_sms_tag = 1some_sell_order.save()# 防止消息发的太快time.sleep(1)logger.info(log_info_tmp % ('sent_close_orders', 'recs %s' % len(recent_sell_orders)))

观察日志

....
[2024-10-02 17:28:17] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:28:47] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:28:47] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:29:17] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:29:17] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:29:47] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:29:47] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:30:17] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:30:17] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:30:47] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:30:47] - [INFO] - [get_close_orders] - recs 0
...

修改数据的消息发送字段,然后就看到重发消息了
在这里插入图片描述

到这里,基本ok了。之前的sms可以扔掉了。

总结一下:

  • 1 消息存放在数据库里。
  • 2 程序通过ORM定期查询需要发送的消息。
  • 3 发送钉钉消息后将属性更新,回存数据库

以后的策略消息仍然会先存放在数据库中,不过是通过ORM访问数据库还是走消息队列,以及发送到钉钉还是邮件还是其他可以再探讨。

ps: 属性的更新应该只是部分更新(update),而不是替换(replace),这样开销比较小。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/439903.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI 时代:产品经理不“AI”就出局?

即便你没想去做“AI 产品经理”&#xff0c;那你也不能成为一个不会用 AI 的产品经理。 产品经理肯定是所有互联网从业者中&#xff0c;最先捕捉到 AI 趋势的岗位。 但只知道 AI、关注 AI 还不够&#xff0c;仔细审视一下&#xff1a;你自己的工作&#xff0c;被 AI 提效了么…

《Windows PE》4.1导入表

导入表顾名思义&#xff0c;就是记录外部导入函数信息的表。这些信息包括外部导入函数的序号、名称、地址和所属的DLL动态链接库的名称。Windows程序中使用的所有API接口函数都是从系统DLL中调用的。当然也可能是自定义的DLL动态链接库。对于调用方&#xff0c;我们称之为导入函…

安防监控/视频系统EasyCVR视频汇聚平台如何过滤134段的告警通道?

视频汇聚/集中存储EasyCVR安防监控视频系统采用先进的网络传输技术&#xff0c;支持高清视频的接入和传输&#xff0c;能够满足大规模、高并发的远程监控需求。平台支持国标GB/T 28181协议、部标JT808、GA/T 1400协议、RTMP、RTSP/Onvif协议、海康Ehome、海康SDK、大华SDK、华为…

STM32定时器(TIM)

目录 一、概述 二、定时器的类型 三、时序 四、定时器中断基本结构 五、定时器定时中断代码 六、定时器外部时钟代码 一、概述 TIM(Timer)定时器 定时器可以对输入的时钟进行计数&#xff0c;并在计数值达到设定值时触发中断16位计数器、预分频器、自动重装寄存器的时基…

力扣刷题 | 两数之和

目前主要分为三个专栏&#xff0c;后续还会添加&#xff1a; 专栏如下&#xff1a; C语言刷题解析 C语言系列文章 我的成长经历 感谢阅读&#xff01; 初来乍到&#xff0c;如有错误请指出&#xff0c;感谢&#xff01; 给定一个整数数组 nums 和…

网站建设完成后,切勿让公司官网成为摆设

在当今这个数字化时代&#xff0c;公司官网已经成为企业展示形象、传递信息、吸引客户的重要平台。然而&#xff0c;许多企业在网站建设完成后&#xff0c;往往忽视了对官网的持续运营和维护&#xff0c;导致官网逐渐沦为摆设&#xff0c;无法发挥其应有的作用。为了确保公司官…

15分钟学 Python 第40天:Python 爬虫入门(六)第一篇

Day40 &#xff1a;Python 爬取豆瓣网前一百的电影信息 1. 项目背景 在这个项目中&#xff0c;我们将学习如何利用 Python 爬虫技术从豆瓣网抓取前一百部电影的信息。通过这一练习&#xff0c;您将掌握网页抓取的基本流程&#xff0c;包括发送请求、解析HTML、存储数据等核心…

jvisualvm学习

系列文章目录 JavaSE基础知识、数据类型学习万年历项目代码逻辑训练习题代码逻辑训练习题方法、数组学习图书管理系统项目面向对象编程&#xff1a;封装、继承、多态学习封装继承多态习题常用类、包装类、异常处理机制学习集合学习IO流、多线程学习仓库管理系统JavaSE项目员工…

ssm服装店销售管理系统

系统包含&#xff1a;源码论文 所用技术&#xff1a;SpringBootVueSSMMybatisMysql 免费提供给大家参考或者学习&#xff0c;获取源码请私聊我 需要定制请私聊 目 录 摘 要 I Abstract II 第1章 绪论 1 1.1研究背景 1 1.2研究意义 1 1.3国内外研究现状 2 1.3.1国外研…

提高顾客满意度,餐饮业如何开展客户调研?

餐饮行业需明确调研目的&#xff0c;选择合适工具&#xff0c;设计问卷&#xff0c;收集并分析数据&#xff0c;持续追踪优化。通过客户调研&#xff0c;提升服务质量、顾客满意度和竞争力&#xff0c;利用ZohoSurvey等工具实现高效调研。 一、明确调研目的 进行客户调研前&am…

【hot100-java】【将有序数组转换为二叉搜索树】

二叉树篇 中序遍历实现 /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNode() {}* TreeNode(int val) { this.val val; }* TreeNode(int val, TreeNode left, TreeNode right…

yolo自动化项目实例解析(七)自建UI--工具栏选项

在上一章我们基本实现了关于预览窗口的显示&#xff0c;现在我们主要完善一下工具栏菜单按键 一、添加任务ui 先加个ui页面&#xff0c;不想看ui的复制完这个文件到ui目录下转下py直接从第二步开始看 vi ui/formpy.ui <?xml version"1.0" encoding"UTF-8&q…

Mysql数据库--删除和备份、约束类型

目录 1.删除操作 1.1表的删除操作 1.2数据库备份 2.约束 2.1基本概况 2.2not null约束演示 2.3unique约束演示 2.4default约束演示 2.5primary key约束演示 2.6foreign key约束演示 2.7check约束演示 1.删除操作 1.1表的删除操作 delete from 表名 where 条件…

kubeadm部署k8s

1.1 安装Docker [rootk8s-all ~]# wget -O /etc/yum.repos.d/docker-ce.repo https://mirrors.huaweicloud.com/docker-ce/linux/centos/docker-ce.repo [rootk8s-all ~]# sed -i sdownload.docker.commirrors.huaweicloud.com/docker-ce /etc/yum.repos.d/docker-ce.repo [ro…

数据结构双向链表和循环链表

目录 一、循环链表二、双向链表三、循环双向链表 一、循环链表 循环链表就是首尾相接的的链表&#xff0c;就是尾节点的指针域指向头节点使整个链表形成一个循环&#xff0c;这就弥补了以前单链表无法在后面某个节点找到前面的节点&#xff0c;可以从任意一个节点找到目标节点…

用HTML5+CSS+JavaScript庆祝国庆

用HTML5CSSJavaScript庆祝国庆 中华人民共和国的国庆日是每年的10月1日。 1949年10月1日&#xff0c;中华人民共和国中央人民政府成立&#xff0c;在首都北京天安门广场举行了开国大典&#xff0c;中央人民政府主席毛泽东庄严宣告中华人民共和国成立&#xff0c;并亲手升起了…

[单master节点k8s部署]31.ceph分布式存储(二)

Ceph配置 Ceph集群通常是一个独立的存储集群&#xff0c;可以部署在 Kubernetes 集群之外。Ceph 提供分布式存储服务&#xff0c;能够通过 RADOS、CephFS、RBD&#xff08;块存储&#xff09;、和 RGW&#xff08;对象存储&#xff09;等方式与 Kubernetes 集成。即使 Ceph 部…

王者农药更新版

一、启动文件配置 二、GPIO使用 2.1基本步骤 1.配置GPIO&#xff0c;所以RCC开启APB2时钟 2.GPIO初始化&#xff08;结构体&#xff09; 3.给GPIO引脚设置高/低电平&#xff08;WriteBit&#xff09; 2.2Led循环点亮&#xff08;GPIO输出&#xff09; 1.RCC开启APB2时钟。…

CSS | 响应式布局之媒体查询(media-query)详解

media type(媒体类型)是CSS 2中的一个非常有用的属性&#xff0c;通过media type我们可以对不同的设备指定特定的样式&#xff0c;从而实现更丰富的界面。media query(媒体查询)是对media type的一种增强&#xff0c;是CSS 3的重要内容之一。随着移动互联网的发展&#xff0c;m…

如何在算家云搭建CosyVoice(文生音频)

一、CosyVoice简介 CosyVoice 是一个开源的超强 TTS&#xff08;‌文本转语音&#xff09;‌模型&#xff0c;‌它支持多种生成模式&#xff0c;‌具有极强的语音自然可控性。‌ 具有以下特点&#xff1a; 语音合成 &#xff1a;能够将文本转换为自然流畅的语音输出。多语种…