说明
起因是我第一版quant程序的短信通知失效了。最初认为短信是比较即时且比较醒目的通知方式,现在看来完全不行。
列举三个主要问题:
- 1 延时。在早先还能收到消息的时候,迟滞就很严重,几分钟都算短的。
- 2 完全丢失。我手机没有做任何设置,但是没有消息了。反正华为和腾讯,总有一个是锅。
- 3 短信模板。由于短信的模板需要审批,所以内容缩减的不像样子。所以信息阅读起来很累。
结论:不再采用短信方式,而是使用机器、邮件的方式通知。
内容
以下主要的内容有两部分:
- 1 通过ORM方式操作数据库。
- 2 使用钉钉机器人
1 数据
数据存在Mongo中,形如
在每次生成交易订单时,都会发出一条短信。所以这几天miss掉的订单,每个都是5个点,甚至十几个点的利润,太可惜了。100个短信包也比不上这些损失啊。
需要在订单的买和卖时间发生变动时发送通知信息。
对于发送程序来说,可以每30秒执行一次查询,每次执行时基于「当前时间」来判断需要发送的消息:
- 1 基础筛选。只查询3天的候选数据。
- 2 筛选买单。当 is_buy 字段不为1时,说明该笔订单的买信号没有发送,进行发送。
- 3 筛选卖单。当 is_sell字段为不为1时,处理卖单消息。
所以对于业务来说,Mongo真的是更方便的,当需要增加字段时完全不用管表结构。
最近因为要做一些数据库方面的简单培训,在梳理的过程中,我发现其实很多数据库的出现是因为其特定的业务场景塑造的,而且通常能适应A场景的数据库,在B场景就不太行。所以A数据库通常无法取代B数据库。
以Mongo和Mysql为例,是否A能取代B的问题应该是研究最多的(更大的考虑是否能用NoSQL取代SQL),现在来看,结论是不能。
目前可以做一些简单结论:
- 1 当数据维度经常需要变动时,用Mongo。所以与操作和控制相关的场景下,主要用Mongo。业务形态不固定的时候,也用Mongo。例如本次,最初我并没有设计is_buy字段,但是要更改时几乎是瞬时完成的,似乎一开始就这样。
- 2 当数据需要连表时,用Mysql。特别是结构化数据,用Mysql来存是比较方便的,每个字段也预先做了限定,不会犯错。
以下也简单总结了一些数据库的特点/场景:
- 1 Redis : 高速kv存取,可用于实时计数等。
- 2 Mongo: 文档性数据库,可用于集成部分的数据,适合作为主库。
- 3 Mysql: 表格型数据库,可用于结构化部分的存储。
- 4 Postgres: 算是Mysql的平替吧,在功能上更完善一些,许可更open。
- 5 ES: 文档性数据库,主要用于解决模糊搜索效率的问题,用于搜索、推荐。
- 6 Milvus: 向量数据库,主要解决大量中间向量的存储,以及相似性检索,用于搜图等功能。
- 7 ClickHouse: 列式数据库。用于高效存储和统计,某种程度上也适合备份。速度太快,压缩比太高了。
- 8 InfluxDB: 时序数据库。快速存储和分析时间数据。可用于物联网、金融等。
- 9 Neo4j: 图库。用于存储真正的关联信息,适合用于知识图谱。
- 10 SQLite:容易被忽略,但是这个还是非常棒的数据库。适合微型应用,嵌入式应用等。
还有很多数据库,对应着一些其他维度的细分。比如FAISS也是向量数据库,但是是在内存里;而Milvus是在硬盘上。然后还有很多是功能近似的,就不展开了。
2 ORM
之前我用pymongo自己搭了微服务,然后基于这个微服务又做了WMongo对象封装,本质上还是函数式的。结论是,函数式也好,agent(微服务)也好,的确是有其存在的必要和应用场景的。不过这种场景更偏向后端纯粹的数据处理与分析,在搭建应用对象(强调集成性和灵活性)时的确不方便。WMongo如果之后继续和pydantic结合,我应该也会做出一个类似工具。
言归正传, 操作Mongo时pymongo和mongoengine的关系就类似与pymysql和sqlalchemy;而Motor就像 aiomysql + sqlalchemy。
先从同步入手:
2.1 连接
from mongoengine import connect, disconnect,Document, StringField, IntField,DictField# 可以直接使用函数式连接
connect(db='your_database_name', # 数据库名称host='localhost', # 主机地址,默认为localhostport=27017, # 端口号,默认为27017username='your_username', # 可选,用户名password='your_password', # 可选,密码authentication_source='admin' # 可选,认证数据库,默认为admin
)
# 也可以使用url方式连接
# 集群
connect(host='mongodb://USER:PASSWD@IP1:PORT1,IP2:PORT2,IP3:PORT3,IP4:PORT4/%s?authSource=admin&replicaSet=mymeta' % db_name
)
本次仅按单机方式连接
# 单机模式
host_url = 'mongodb://USER:PASSWD@IP1:PORT1/%s?authSource=admin' % db_name
client = connect(host=host_url)
# 获取所有数据库
databases = client.list_database_names()
print("Databases:")
for db in databases:print(db)# 遍历每个数据库,获取集合
for db_name in databases:db = client[db_name]collections = db.list_collection_names()print(f"\nCollections in {db_name}:")for collection in collections:print(collection)# 断开连接
disconnect()
连接之后,做了一些函数式的操作,这个虽然不是ORM的主要方向,但有时候还是有点用的。接下来则是对数据的操作:
设置对象时,如果meta没有设置,对应的表名将会按类名的小写拆开建立并严格校验字段的一致性。这里我并不会映射现有表的全部字段,需要设置非严格模式。
meta = {'collection': 'trade_orders', # MongoDB 中的集合名'strict': False, # 不强制字段验证,允许动态字段}
2.2 查询
我发现关于mongoengine的内容,大模型说的很多是错误的。
这是一个还不错的介绍。一文带你深入浅出 MongoEngine 经典查询【内附详细案例】
因为之前用pymongo做了很多开发,所以知道字符串是可以比较的。程序里的这个报错TypeError: '>=' not supported between instances of 'StringField' and 'str'
是指>=
只能用于数值型的筛选。而mongo里的gte
方法则是通过变量名称的特殊构造来传递的(x__gte
), 虽然有点妖,但的确是个好点子。
另外如果返回后会进行批量处理(而不是使用单个实例),那么可以在定义映射模型时定义返回字典。
# 定义与 MongoDB 集合关联的类
class TradeOrders(Document):# 指定 MongoDB 中的集合名称# 启用动态模式,这样可以处理未定义的字段meta = {'collection': 'trade_orders', # MongoDB 中的集合名'strict': False, # 不强制字段验证,允许动态字段'indexes': [{'fields': ['is_buy_sms_tag']},{'fields': ['is_sell_sms_tag']}]}# 定义字段model_signal = StringField(required=True, max_length=50)reason = StringField()np = FloatField()is_win = IntField()buy_price = FloatField()sell_price = FloatField()buy_dt = StringField()sell_dt = StringField()buy_amt = FloatField()strategy_name = StringField()is_buy_sms_tag = IntField()is_sell_sms_tag = IntField()def dict(self):res_dict = {}res_dict['model_signal'] = self.model_signalres_dict['reason'] = self.reasonres_dict['np'] = self.npres_dict['is_win'] = self.is_winres_dict['buy_price'] = self.buy_priceres_dict['sell_price'] = self.sell_priceres_dict['buy_dt'] = self.buy_dtres_dict['buy_amt'] = self.buy_amtres_dict['strategy_name'] = self.strategy_nameres_dict['is_buy_sms_tag'] = self.is_buy_sms_tagres_dict['is_sell_sms_tag'] = self.is_sell_sms_tagreturn res_dict# 实例化时会为现有集合创建索引(是ensure index的方式)
from datetime import datetime, timedelta
# 获取当前时间
now = datetime.now()# 计算7天前的时间
seven_days_ago = now - timedelta(days=7)# 将 seven_days_ago 转换为字符串格式,以便与 buy_dt 和 sell_dt 进行比较
seven_days_ago_str = seven_days_ago.strftime('%Y-%m-%d %H:%M:%S')# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(buy_dt__gt=seven_days_ago_str).all()
recent_orders1 = [x.dict() for x in recent_orders]{'model_signal': '510500_normal_strong_2_near_120_far_1200_top_85_standard_gbdt_running_B','reason': 'Control Sell','np': 392.1,'is_win': 1,'buy_price': 5.533,'sell_price': 5.999,'buy_dt': '2024-09-30 09:54:00','buy_amt': 4980.0,'strategy_name': '510500_normal_strong_2_near_120_far_1200_top_85_standard_gbdt_running_B_15_15','is_buy_sms_tag': None,'is_sell_sms_tag': None},{'model_signal': None,'reason': None,'np': None,'is_win': None,'buy_price': 4.162,'sell_price': None,'buy_dt': '2024-09-30 13:43:00','buy_amt': 4994.0,'strategy_name': '510300_normal_strong_2_near_60_far_1800_top_80_all_notnull_gbdt_running_A_10_15','is_buy_sms_tag': None,'is_sell_sms_tag': None}]
3 钉钉
以下是发送函数。webhook url需要在钉钉群里创建机器人得到。
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse# 钉钉 Webhook 和加签密钥# mine
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxx'
secret ='xxx'# 生成签名
def generate_sign(secret):timestamp = str(round(time.time() * 1000))secret_enc = secret.encode('utf-8')string_to_sign = '{}\n{}'.format(timestamp, secret)string_to_sign_enc = string_to_sign.encode('utf-8')hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))return timestamp, sign# 发送信息到群里面
def send_dingtalk_message_markdown(webhook, secret, title = None, message = None):try:timestamp, sign = generate_sign(secret)url = f"{webhook}×tamp={timestamp}&sign={sign}"headers = {'Content-Type': 'application/json'}data = {"msgtype": 'markdown',"markdown": {'title':title,"text": message}} print(f"Sending message to URL: {url}")print(f"Message content: {data}")response = requests.post(url, headers=headers, data=json.dumps(data))print(f"Response: {response.status_code}, {response.text}")return response.status_code, response.textexcept Exception as e:print(f"Error sending message: {e}")return None, str(e)send_dingtalk_message_markdown('https://oapi.dingtalk.com/robot/send?access_token=xxx',
'xxx',title ='It is Title', message= '# It is Content\n## AA' )
效果比短信好多了。
4 整合逻辑
按照买消息和卖消息两个维度进行数据的整理和通知。对于买消息而言,消息的内容更少,而对于卖消息,需要展示的内容更多。在发布完消息后要把对应的标记打上,避免再次被筛选到。
作为Buy,提取以下字段发布
- 1 Code
- 2 Buy Datetime
- 3 Buy Price
- 4 Buy Amout
- 5 Strategy Name
发布消息之后进行ACK(也就是把对应的字段置为1),所以,某种程度上说使用消息队列可能更自然一些。
# 先获取未消息的买单数据
from mongoengine.queryset.visitor import Q
# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(Q(buy_dt__gt=seven_days_ago_str) & Q(is_buy_sms_tag__ne = 1)).all()
recent_orders1 = [x.dict() for x in recent_orders]# 对任何一个买单
some_buy_order = recent_orders[0]
code = some_buy_order.strategy_name.split('_')[0]
msg_title = '### Buy %s' % code
msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_buy_order.buy_dt, some_buy_order.buy_price,some_buy_order.buy_vol,some_buy_order.buy_amt ,some_buy_order.strategy_name)send_dingtalk_message_markdown('https://oapi.dingtalk.com/robot/send?access_token=xxx',
'xxx',title =msg_title, message= msg_text )
发送消息后,对相应的买单执行ACK
# ack
some_buy_order.is_buy_sms_tag = 1
some_buy_order.save()
循环执行买消息
from mongoengine.queryset.visitor import Q
# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(Q(buy_dt__gt=seven_days_ago_str) & Q(is_buy_sms_tag__ne = 1)).all()
# buy / one
import time
if len(recent_orders):print('共有%s个买消息' % len(recent_orders))for some_buy_order in recent_orders:# some_buy_order = recent_orders[0]code = some_buy_order.strategy_name.split('_')[0]msg_title = '### Buy %s' % code msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_buy_order.buy_dt, some_buy_order.buy_price,some_buy_order.buy_vol,some_buy_order.buy_amt ,some_buy_order.strategy_name)# msg send_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text )# acksome_buy_order.is_buy_sms_tag = 1some_buy_order.save()# 防止消息发的太快time.sleep(1)
然后每次执行时都执行一次查询,如果有买单,按顺序循环输出即可。
同样的,对于卖单
- 1 查询未发送的卖单
- 2 与买单相比,卖单会输出更多的字段
if len(recent_sell_orders):print('共有%s个卖消息' % len(recent_sell_orders))for some_sell_order in recent_sell_orders:# some_sell_order = recent_sell_orders[0]code = some_sell_order.strategy_name.split('_')[0]msg_title = '### Sell %s' % code msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_sell_order.buy_dt, some_sell_order.buy_price,some_sell_order.buy_vol,some_sell_order.buy_amt ,some_sell_order.strategy_name)msg_sell_text = '''\n### Sell DT:%s\n### Sell Price:%s\n### Sell Vol:%s\n ### Sell Amt:%s\n### NetProfit: %s\n### Reason:%s ''' % (some_sell_order.sell_dt, some_sell_order.sell_price, some_sell_order.sell_vol, some_sell_order.sell_amt, some_sell_order.np,some_sell_order.reason)send_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text+ msg_sell_text)# acksome_sell_order.is_sell_sms_tag = 1some_sell_order.save()# 防止消息发的太快time.sleep(1)
效果类似
5 APS运行
ubuntu系统自带的cron并不好用,所以我用python的apscheduler。只是在系统重启的时候需要自己管理,有两个办法可以解决这个问题:
- 1 注册systemd服务,这样开机时服务可以跟随启动
- 2 使用docker运行。
不过我给机器配了ups,一般情况下也不会关机,这个问题等之后再考虑吧,暂时就用nohup后台运行。
- 1 在/home/workers/ 下放py文件
- 2 在/home/local_aps/ 下放脚本文件
- 3 在/home/local_aps/ 下放主启动文件
aps.py
主程序启动,aps默认是采用多线程方式控制多个worker(所以应该也也不是完全阻塞的)
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingSchedulerdef exe_sh(fpathname = None):os.system('sh %s ' % fpathname)# 后台启动命令 nohup python3 /root/prj27_timetask/cron_task/test_001.py >/dev/null 2>&1 &if __name__ == '__main__':sche1 = BlockingScheduler()sche1.add_job(exe_sh,'interval', seconds=30, kwargs ={'fpathname':'/home/local_aps/aps01.sh'})# sche1.add_job(exe_sh,'interval', seconds=86400, kwargs ={'fpathname':'/home/shs/clean_log.sh'})print('[S] starting inteverl')sche1.start()
aps01.sh
用于灵活存放多个需要执行的python脚本
#!/bin/bash
python3 /home/workers/qtv102_dingtalk.py
qtv102_dingtalk.py
加上了logger部分
import logging
from logging.handlers import RotatingFileHandlerdef get_logger(name, lpath='/var/log/'):logger = logging.getLogger(name)fpath = lpath + name + '.log'handler = RotatingFileHandler(fpath, maxBytes=100 * 1024 * 1024, backupCount=10)# 设置日志格式为 [时间] - [日志级别] - 消息formatter = logging.Formatter('[%(asctime)s] - [%(levelname)s] - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')handler.setFormatter(formatter)logger.addHandler(handler)logger.setLevel(logging.INFO)return logger# 按日志格式写入数据
log_info_tmp = '[%s] - %s'
logger = get_logger('qtv102_dingtalk')from mongoengine import connect, disconnect,Document, StringField, IntField,DictField,FloatField# connect(
# db='your_database_name', # 数据库名称
# host='localhost', # 主机地址,默认为localhost
# port=27017, # 端口号,默认为27017
# username='your_username', # 可选,用户名
# password='your_password', # 可选,密码
# authentication_source='admin' # 可选,认证数据库,默认为admin
# )db_name = 'QTV102_Strategy'# 单机模式
host_url = 'mongodb://xxx:xxx@192.168.0.159:24086/'
connect(db_name, host=host_url)# 定义与 MongoDB 集合关联的类
class TradeOrders(Document):# 指定 MongoDB 中的集合名称# 启用动态模式,这样可以处理未定义的字段meta = {'collection': 'trade_orders', # MongoDB 中的集合名'strict': False, # 不强制字段验证,允许动态字段'indexes': [{'fields': ['is_buy_sms_tag']},{'fields': ['is_sell_sms_tag']}]}# 定义字段# model_signal = StringField(required=True, max_length=50)reason = StringField()np = FloatField()is_win = IntField()buy_price = FloatField()sell_price = FloatField()buy_dt = StringField()sell_dt = StringField()buy_vol = IntField()sell_vol = IntField()buy_amt = FloatField()sell_amt = FloatField()strategy_name = StringField()is_buy_sms_tag = IntField()is_sell_sms_tag = IntField()def dict(self):res_dict = {}res_dict['reason'] = self.reasonres_dict['np'] = self.npres_dict['is_win'] = self.is_winres_dict['buy_price'] = self.buy_priceres_dict['sell_price'] = self.sell_priceres_dict['buy_dt'] = self.buy_dtres_dict['buy_amt'] = self.buy_amtres_dict['sell_dt'] = self.sell_dtres_dict['sell_amt'] = self.sell_amtres_dict['buy_vol'] = self.buy_volres_dict['sell_vol'] = self.sell_volres_dict['strategy_name'] = self.strategy_nameres_dict['is_buy_sms_tag'] = self.is_buy_sms_tagres_dict['is_sell_sms_tag'] = self.is_sell_sms_tagreturn res_dict# 实例化时会为现有集合创建索引(是ensure index的方式)
from datetime import datetime, timedelta
# 获取当前时间
now = datetime.now()# 计算7天前的时间
seven_days_ago = now - timedelta(days=7)# 将 seven_days_ago 转换为字符串格式,以便与 buy_dt 和 sell_dt 进行比较
seven_days_ago_str = seven_days_ago.strftime('%Y-%m-%d %H:%M:%S')# recent_orders1 = [x.dict() for x in recent_orders]# 发送消息
# test_md = '''# 关于订单 \n## 1 订单 allls'''
# test_html = '''<h1>关于订单</h1> <p>allls</p>'''# --- 发送消息
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse# 钉钉 Webhook 和加签密钥# mine
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxx'
secret ='xxx'# 生成签名
def generate_sign(secret):timestamp = str(round(time.time() * 1000))secret_enc = secret.encode('utf-8')string_to_sign = '{}\n{}'.format(timestamp, secret)string_to_sign_enc = string_to_sign.encode('utf-8')hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))return timestamp, sign# 发送信息到群里面
def send_dingtalk_message_markdown(webhook, secret, title = None, message = None):try:timestamp, sign = generate_sign(secret)url = f"{webhook}×tamp={timestamp}&sign={sign}"headers = {'Content-Type': 'application/json'}data = {"msgtype": 'markdown',"markdown": {'title':title,"text": message}}print(f"Sending message to URL: {url}")print(f"Message content: {data}")response = requests.post(url, headers=headers, data=json.dumps(data))print(f"Response: {response.status_code}, {response.text}")return response.status_code, response.textexcept Exception as e:print(f"Error sending message: {e}")return None, str(e)from mongoengine.queryset.visitor import Q# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(Q(buy_dt__gt=seven_days_ago_str) & Q(is_buy_sms_tag__ne = 1)).all()
logger.info(log_info_tmp % ('get_open_orders', 'recs %s' % len(recent_orders)))
# buy / one
import time
if len(recent_orders):print('共有%s个买消息' % len(recent_orders))for some_buy_order in recent_orders:# some_buy_order = recent_orders[0]code = some_buy_order.strategy_name.split('_')[0]msg_title = '### Buy %s' % codemsg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_buy_order.buy_dt, some_buy_order.buy_price,some_buy_order.buy_vol,some_buy_order.buy_amt ,some_buy_order.strategy_name)# msgsend_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text )# acksome_buy_order.is_buy_sms_tag = 1some_buy_order.save()# 防止消息发的太快time.sleep(1)logger.info(log_info_tmp % ('sent_open_orders', 'recs %s' % len(recent_orders)))# 卖单
recent_sell_orders = TradeOrders.objects(Q(sell_dt__gt=seven_days_ago_str) & Q(is_sell_sms_tag__ne = 1)).all()logger.info(log_info_tmp % ('get_close_orders', 'recs %s' % len(recent_sell_orders)))
if len(recent_sell_orders):print('共有%s个卖消息' % len(recent_sell_orders))for some_sell_order in recent_sell_orders:# some_sell_order = recent_sell_orders[0]code = some_sell_order.strategy_name.split('_')[0]msg_title = '### Sell %s' % codemsg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_sell_order.buy_dt, some_sell_order.buy_price,some_sell_order.buy_vol,some_sell_order.buy_amt ,some_sell_order.strategy_name)msg_sell_text = '''\n### Sell DT:%s\n### Sell Price:%s\n### Sell Vol:%s\n ### Sell Amt:%s\n### NetProfit: %s\n### Reason:%s ''' % (some_sell_order.sell_dt, some_sell_order.sell_price, some_sell_order.sell_vol, some_sell_order.sell_amt, some_sell_order.np,some_sell_order.reason)send_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text+ msg_sell_text)# acksome_sell_order.is_sell_sms_tag = 1some_sell_order.save()# 防止消息发的太快time.sleep(1)logger.info(log_info_tmp % ('sent_close_orders', 'recs %s' % len(recent_sell_orders)))
观察日志
....
[2024-10-02 17:28:17] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:28:47] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:28:47] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:29:17] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:29:17] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:29:47] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:29:47] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:30:17] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:30:17] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:30:47] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:30:47] - [INFO] - [get_close_orders] - recs 0
...
修改数据的消息发送字段,然后就看到重发消息了
到这里,基本ok了。之前的sms可以扔掉了。
总结一下:
- 1 消息存放在数据库里。
- 2 程序通过ORM定期查询需要发送的消息。
- 3 发送钉钉消息后将属性更新,回存数据库
以后的策略消息仍然会先存放在数据库中,不过是通过ORM访问数据库还是走消息队列,以及发送到钉钉还是邮件还是其他可以再探讨。
ps: 属性的更新应该只是部分更新(update),而不是替换(replace),这样开销比较小。