本程序使用传统的[TuShare接口],并非需要捐赠的[pro接口]获取数据无限制;
另,由于TuShare的增量更新接口有bug(最近一个交易日的数据获取不到),所以每次计算前都是删除所有数据,全部重新获取。
本程序实现了若干种选股策略,大家可以自行选择其中的一到多种策略组合使用,参见work_flow.py
各策略中的end_date参数主要用于回测。
安装依赖:
根据不同的平台安装TA-Lib程序
Mac OS X
$ brew install ta-lib
Windows
下载 ta-lib-0.4.0-msvc.zip,解压到 C:\ta-lib
安装方法详见百度
Linux
下载 ta-lib-0.4.0-src.tar.gz :
$ untar and cd
$ ./configure --prefix=/usr
$ make
$ sudo make install
推荐使用Python3.8以上以及pip3
Python 依赖:
pip install -r requirements.txt
运行
本地运行
$ python main.py
运行结果查看日志文件sequoia.log
服务器端运行
用户也可以将本程序作为定时任务运行在服务端,需要做以下工作:
将config.yaml中的cron配置改为true,push.enable改为true
参考README_PUSH.md文档搭建 ejabberd 推送服务
客户端Android推荐使用 Conversations ,iOS没有开发者证书的话推送不了,有证书推荐使用 ChatSecure-iOS ,我采用的推送方案是ejabberd搭配Conversations。 效果如图
statistics strategy
如何回测
修改 work_flow.py#L61 中end为指定日期,格式为’YYYY-MM-DD’,如:
end = '2019-06-17'
运行截图:
main.py
# -*- encoding: UTF-8 -*-import utils
import logging
import work_flow
import settings
import schedule
import timedef job():if utils.is_weekday():work_flow.process()logging.basicConfig(format='%(asctime)s %(message)s', filename='sequoia.log')
logging.getLogger().setLevel(logging.INFO)
settings.init()if settings.config['cron']:EXEC_TIME = "15:15"schedule.every().day.at(EXEC_TIME).do(job)while True:schedule.run_pending()time.sleep(1)
else:work_flow.process()
data_fetcher.py
# -*- encoding: UTF-8 -*-import tushare as ts
import pandas as pd
import datetime
import logging
import settings
import talib as tlimport utilsimport concurrent.futuresfrom pandas.tseries.offsets import *# def update_data(code_name):
# stock = code_name[0]
# old_data = utils.read_data(code_name)
# if not old_data.empty:
# start_time = utils.next_weekday(old_data.iloc[-1].date)
# current_time = datetime.datetime.now()
# if start_time > current_time:
# return
#
# df = ts.get_k_data(stock, autype='qfq')
# mask = (df['date'] >= start_time.strftime('%Y-%m-%d'))
# appender = df.loc[mask]
# if appender.empty:
# return
# else:
# return appenderdef init_data(code_name):stock = code_name[0]data = ts.get_k_data(stock, autype='qfq')if data is None or data.empty:logging.debug("股票:"+stock+" 没有数据,略过...")returndata['p_change'] = tl.ROC(data['close'], 1)return datadef run(stocks):append_mode = Falseupdate_fun = init_datawith concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:future_to_stock = {executor.submit(update_fun, stock): stock for stock in stocks}for future in concurrent.futures.as_completed(future_to_stock):stock = future_to_stock[future]try:data = future.result()data['code'] = data['code'].apply(lambda x: str(x))if data is not None:file_name = stock[0] + '-' + stock[1] + '.h5'data.to_hdf(settings.config['data_dir'] + "/" + file_name, 'data', append=append_mode, format='table')except Exception as exc:print('%s(%r) generated an exception: %s' % (stock[1], stock[0], exc))
work_flow.py
# -*- encoding: UTF-8 -*-import data_fetcher
import utils
import strategy.enter as enter
from strategy import turtle_trade
from strategy import backtrace_ma250
from strategy import breakthrough_platform
from strategy import parking_apron
from strategy import low_backtrace_increase
from strategy import keep_increasing
import tushare as ts
import push
import logging
import db
import time
import datetime
import urllib
import settings
import pandas as pddef process():logging.info("************************ process start ***************************************")try:all_data = ts.get_today_all()subset = all_data[['code', 'name', 'nmc']]subset.to_csv(settings.config['stocks_file'], index=None, header=True)stocks = [tuple(x) for x in subset.values]statistics(all_data, stocks)except urllib.error.URLError as e:subset = pd.read_csv(settings.config['stocks_file'])subset['code'] = subset['code'].astype(str)stocks = [tuple(x) for x in subset.values]if utils.need_update_data():utils.prepare()data_fetcher.run(stocks)check_exit()strategies = {'海龟交易法则': turtle_trade.check_enter,'放量上涨': enter.check_volume,'突破平台': breakthrough_platform.check,'均线多头': keep_increasing.check,'无大幅回撤': low_backtrace_increase.check,'停机坪': parking_apron.check,'回踩年线': backtrace_ma250.check,}if datetime.datetime.now().weekday() == 0:strategies['均线多头'] = keep_increasing.checkfor strategy, strategy_func in strategies.items():check(stocks, strategy, strategy_func)time.sleep(2)logging.info("************************ process end ***************************************")def check(stocks, strategy, strategy_func):end = Nonem_filter = check_enter(end_date=end, strategy_fun=strategy_func)results = list(filter(m_filter, stocks))push.strategy('**************"{0}"**************\n{1}\n**************"{0}"**************\n'.format(strategy, results))def check_enter(end_date=None, strategy_fun=enter.check_volume):def end_date_filter(code_name):data = utils.read_data(code_name)if data is None:return Falseelse:return strategy_fun(code_name, data, end_date=end_date)# if result:# message = turtle_trade.calculate(code_name, data)# push.strategy("{0} {1}".format(code_name, message))return end_date_filter# 统计数据
def statistics(all_data, stocks):limitup = len(all_data.loc[(all_data['changepercent'] >= 9.5)])limitdown = len(all_data.loc[(all_data['changepercent'] <= -9.5)])up5 = len(all_data.loc[(all_data['changepercent'] >= 5)])down5 = len(all_data.loc[(all_data['changepercent'] <= -5)])def ma250(stock):stock_data = utils.read_data(stock)return enter.check_ma(stock, stock_data)ma250_count = len(list(filter(ma250, stocks)))msg = "涨停数:{} 跌停数:{}\n涨幅大于5%数:{} 跌幅大于5%数:{}\n年线以上个股数量: {}"\.format(limitup, limitdown, up5, down5, ma250_count)push.statistics(msg)def check_exit():t_shelve = db.ShelvePersistence()file = t_shelve.open()for key in file:code_name = file[key]['code_name']data = utils.read_data(code_name)if turtle_trade.check_exit(code_name, data):push.strategy("{0} 达到退出条件".format(code_name))del file[key]elif turtle_trade.check_stop(code_name, data, file[key]):push.strategy("{0} 达到止损条件".format(code_name))del file[key]file.close()
完整程序代码下载:A股选股软件源代码
Python代码大全,海量代码任你下载