量化投资学习笔记77——重构回测程序

上次利用Backtrader对实盘交易记录进行了回测。现在打算对程序进行重构,参考《重构——改善既有代码的设计》。
先用git branch refactoring开一个分支,checkout到该分支开始干活。
先是一些基本理论。
如果发现由于代码结构无法方便的为程序添加特性,就先重构程序,使特性添加比较容易,再添加特性。
重构的第一步:建立可靠的测试环境。
重构步骤的本质:由于每次修改的幅度都很小,所以任何错误都很容易发现。
任何一个傻瓜都能写出计算机可以理解的代码。惟有写出人类容易理解的代码,才是优秀的程序员。
重构是在不改变软件可见行为的前提下,提高其可读性,降低修改成本。
增添功能与重构分开。
重构改进软件设计,使软件更容易被理解,帮助调试,提高编程速度。
何时重构?事不过三,三则重构。添加功能,修补错误,代码复审时重构。
下面就来看看我上次写的代码有什么”坏味道”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# coding:utf-8
# 用backtrader对定投实盘记录进行回测


import backtrader as bt
import backtrader.analyzers as btay
import tushare as ts
import os
import pandas as pd
import datetime
import matplotlib.pyplot as plt


# 获取数据
def getData(code, start, end):
filename = code+".csv"
print("./" + filename)
# 已有数据文件,直接读取数据
if os.path.exists("./" + filename):
df = pd.read_csv(filename)
else: # 没有数据文件,用tushare下载
df = ts.get_k_data(code, autype = "qfq", start = start, end = end)
df.to_csv(filename)
df.index = pd.to_datetime(df.date)
df['openinterest']=0
df=df[['open','high','low','close','volume','openinterest']]
return df


# 交易策略
class TradeStrategy(bt.Strategy):
params = (
("recordFilename", "etfdata.csv"),
("printlog", False)
)

def __init__(self):
self.df_record = pd.read_csv(self.params.recordFilename)
self.df_record.成交日期 = pd.to_datetime(self.df_record.成交日期, format = "%Y%m%d")
self.df_record.index = self.df_record.成交日期
self.df_record.drop(labels = "成交日期", axis = 1, inplace = True)
# print(self.df_record.head(), self.df_record.info())
self.order = None
ad = bt.indicators.AroonDown(plotname = "AD")
ad.plotinfo.subplot = True


def log(self, txt, dt=None, doprint=False):
'''log记录'''
if self.params.printlog or doprint:
dt = dt or self.datas[0].datetime.date(0)
print('%s, %s' % (dt.isoformat(), txt))

def notify_order(self, order):
# 有交易提交/被接受,啥也不做
if order.status in [order.Submitted, order.Accepted]:
return

# 检查一个交易是否完成。
# 如果钱不够,交易会被拒绝。
if order.status in [order.Completed]:
if order.isbuy():
self.log(
'执行买入, 价格: %.2f, 成本: %.2f, 手续费 %.2f' %
(order.executed.price,
order.executed.value,
order.executed.comm))
# self.buyprice = order.executed.price
# self.buycomm = order.executed.comm
elif order.issell():
self.log(
'执行卖出, 价格: %.2f, 成本: %.2f, 手续费 %.2f' %
(order.executed.price,
order.executed.value,
order.executed.comm))

self.bar_executed = len(self)

elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log('交易取消/被拒绝。')

self.order = None

def next(self):
if self.order:
return
tradeData = pd.DataFrame()
orderType = bt.Order.Market
for data in self.datas:
date = data.datetime.date(0)
tradeBar = self.df_record.loc[date.strftime("%Y-%m-%d"),:]
# print("bar数据", date, data._name)
if len(tradeBar) != 0:
for i in range(len(tradeBar)):
name = tradeBar.iloc[i].证券名称
price = tradeBar.iloc[i].成交均价
stock = tradeBar.iloc[i].成交量
commit = tradeBar.iloc[i].手续费
# 进行交易
if stock > 0 and name == data._name:
# print("测试a", date, name, price, stock, commit)
self.broker.add_cash(price*stock + commit)
# print(self.broker.get_cash())
self.order = self.buy(data = data, size = stock, price = price, exectype = orderType)
elif stock < 0 and name == data._name:
# print("测试b", date, name, price, stock, commit)
self.order = self.sell(data = data, size = -1*stock, price = price, exectype = orderType)
def stop(self):
self.log("最大回撤:-%.2f%%" % self.stats.drawdown.maxdrawdown[-1], doprint=True)



if __name__ == "__main__":
start = "2018-01-01"
end = "2020-07-05"
df_300 = getData("510300", start, end)
df_nas = getData("513100", start, end)
# print(df_300.info(), df_nas.info())
# 建立数据源
start_date = list(map(int, start.split("-")))
end_date = list(map(int, end.split("-")))
data300 = bt.feeds.PandasData(dataname = df_300, name = "300ETF", fromdate = datetime.datetime(start_date[0], start_date[1], start_date[2]), todate = datetime.datetime(end_date[0], end_date[1], end_date[2]))
dataNas = bt.feeds.PandasData(dataname = df_nas, name = "nasETF", fromdate = datetime.datetime(start_date[0], start_date[1], start_date[2]), todate = datetime.datetime(end_date[0], end_date[1], end_date[2]))
# 建立回测实例,加载数据,策略。
cerebro = bt.Cerebro()
cerebro.addstrategy(TradeStrategy)
cerebro.adddata(data300, name = "300ETF")
cerebro.adddata(dataNas, name = "nasETF")
# 添加回撤观察器
cerebro.addobserver(bt.observers.DrawDown)
# 设置手续费
cerebro.broker.setcommission(commission=0.0003)
# 设置初始资金为0.01
cerebro.broker.setcash(0.01)
print("初始资金:%.2f" % cerebro.broker.getvalue())
# 添加分析对象
cerebro.addanalyzer(btay.SharpeRatio, _name = "sharpe", riskfreerate = 0.02)
cerebro.addanalyzer(btay.AnnualReturn, _name = "AR")
cerebro.addanalyzer(btay.DrawDown, _name = "DD")
cerebro.addanalyzer(btay.Returns, _name = "RE")
cerebro.addanalyzer(btay.TradeAnalyzer, _name = "TA")
# 运行回测
results = cerebro.run()
# cerebro.broker.add_cash(-10000.0)
print("期末资金:%.2f" % cerebro.broker.getvalue())
cerebro.plot(numfigs = 2)
plt.savefig("result.png")
print("夏普比例:", results[0].analyzers.sharpe.get_analysis()["sharperatio"])
print("年化收益率:", results[0].analyzers.AR.get_analysis())
print("最大回撤:%.2f,最大回撤周期%d" % (results[0].analyzers.DD.get_analysis().max.drawdown, results[0].analyzers.DD.get_analysis().max.len))
print("总收益率:%.2f" % (results[0].analyzers.RE.get_analysis()["rtot"]))
results[0].analyzers.TA.print()

1.重复的代码
notify_order成员函数里,以及数据初始化里都有重复的代码。
2.过长函数
main函数,next成员函数都太长了。
每当感觉需要以注释来说明点什么的时候,我们就把需要说明的东西写进一个独立函数中,并以其用途(而非实现手法)命名。我们
可以对一组或甚至短短一行代码做这件事。哪怕替换后的函数调用动作比函数自身还长,只要函数名称能够解释其用途,我们也该毫不犹豫地那么做。
3.过大的类
单一类不要做太多事,python这个问题貌似不严重。
4.过长参数列表
面向对象,函数需要的某些参数可以设为类成员变量,而不必作为函数参数。问题是很多python库都有长参数列的问题。
5.发散式变化
针对某一变化需要修改多个类的情况,最好将类拆分为数个,使每个变化只需修改一个类。
策略类貌似还可以改。
6.散弹式修改
一个变化要修改多个类,可将这些类合并,使得每个变化只修改一个类。
7.依恋情结
成员函数对某个对象的兴趣高于对自己的类的兴趣。
8.数据泥团
几个类中有相同的数据项。
9.基本类型偏执
用基本类型组成一些类型。
10.switch问题
少用,考虑用多态替代。python貌似没有。
11.平行继承体系
为某个类增加之类时需要为另一个类也增加子类。
12.冗余类
没啥用的类,删!
13.为未来设计
函数和类的唯一用户是测试程序。
14.临时变量
仅为某种特定情况设置的临时变量。
15.过度耦合的消息链
一个对象要求另一个对象,另一个对象在要求别的对象……
16.中间人
一个类需要太多调用另一个类完成其功能。直接调用实际工作的类。
17.亲密关系
两个类关系密切。分开或合并。
18.异曲同工的类
做类似的工作却有不同名称的类。合并。
19.不完美的程序库
自己改吧。
20.纯数据类
将使用该类的地方移入类中。
21.被拒绝的遗赠
子类不想继承父类某些内容。设计错误,重新设计。
22.过多的注释
多余的注释,说明代码需要重构。
进行重构,首要前提是有一个可靠的测试环境。就是要先有测试再重构啦,搜了一下,python标准库自带了unittest,但不太好用。还有个pytest,试试这个。
花了几天看了一下pytest,会基本操作了。主要就是以test_开头或_test结尾命名测试函数或类,可以保存到以test_xxx为文件名的单独文件里,然后在命令行里用pytest执行测试。测试里主要用assert语句进行测试。其它用法参见文档吧。继续重构。本来想先为原来的程序写单元测试,但是原来的程序都搅和到一起了,很难测试。这也是我想要重构的原因。
提炼函数(extract method)
将长的函数提炼成几个短的函数,或者类。main函数部分太长了,分成几个部分吧。

1
2
3
4
5
6
7
8
9
if __name__ == "__main__":
# 加载数据,建立数据源
data300, dataNas = createDataFeeds()
cerebro = createBacktesting(data300, dataNas)
# 运行回测
print("初始资金:%.2f" % cerebro.broker.getvalue())
results = cerebro.run()
print("期末资金:%.2f" % cerebro.broker.getvalue())
outputResult(cerebro)

分成这么几个函数,主函数短多了。
TradeStrategy类里的next函数也太长了,把交易功能提成函数吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def __doTrade(self, data, name, price, stock, commit, orderType):
if stock > 0 and name == data._name:
self.broker.add_cash(price*stock + commit)
self.order = self.buy(data = data, size = stock, price = price, exectype = orderType)
elif stock < 0 and name == data._name:
self.order = self.sell(data = data, size = -1*stock, price = price, exectype = orderType)

def next(self):
if self.order:
return
tradeData = pd.DataFrame()
orderType = bt.Order.Market
for data in self.datas:
date = data.datetime.date(0)
tradeBar = self.df_record.loc[date.strftime("%Y-%m-%d"),:]
# print("bar数据", date, data._name)
if len(tradeBar) != 0:
for i in range(len(tradeBar)):
name = tradeBar.iloc[i].证券名称
price = tradeBar.iloc[i].成交均价
stock = tradeBar.iloc[i].成交量
commit = tradeBar.iloc[i].手续费
# 进行交易
self.__doTrade(data, name, price, stock, commit, orderType)

有几种情况:没有使用变量的,直接提出去即可。使用了变量但是没有改变的,只在提炼区里使用的,在提炼函数里声明使用;在提炼区外也使用的,作为参数传入。提炼区改变了并且提炼区外要使用的变量,提炼函数返回值返回。
再把交易过程从next()函数中完全提取出来吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 具体交易逻辑,可以改的。
def doTrade(self):
tradeData = pd.DataFrame()
orderType = bt.Order.Market
for data in self.datas:
date = data.datetime.date(0)
tradeBar = self.df_record.loc[date.strftime("%Y-%m-%d"),:]
# print("bar数据", date, data._name)
if len(tradeBar) != 0:
for i in range(len(tradeBar)):
name = tradeBar.iloc[i].证券名称
price = tradeBar.iloc[i].成交均价
stock = tradeBar.iloc[i].成交量
commit = tradeBar.iloc[i].手续费
# 进行交易
self.__doTrade(data, name, price, stock, commit, orderType)

def next(self):
if self.order:
return
self.doTrade()

再改策略的时候直接改doTrade就行了。
再改一下notify_order成员函数,把重复的地方提炼成函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 输出交易过程
def __displayOrder(self, buy, order):
if buy:
self.log(
'执行买入, 价格: %.2f, 成本: %.2f, 手续费 %.2f' %
(order.executed.price,
order.executed.value,
order.executed.comm))
else:
self.log(
'执行卖出, 价格: %.2f, 成本: %.2f, 手续费 %.2f' %
(order.executed.price,
order.executed.value,
order.executed.comm))

def notify_order(self, order):
# 有交易提交/被接受,啥也不做
if order.status in [order.Submitted, order.Accepted]:
return

# 检查一个交易是否完成。
# 如果钱不够,交易会被拒绝。
if order.status in [order.Completed]:
if order.isbuy():
self.__displayOrder(True, order)
elif order.issell():
self.__displayOrder(False, order)

self.bar_executed = len(self)

elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log('交易取消/被拒绝。')

self.order = None

再把整个数据准备,建立回测对象的过程封装到类里吧。
放到一个新的文件backtest.py里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# coding:utf-8
# 量化交易回测类


import backtrader as bt
import backtrader.analyzers as btay
import tushare as ts
import os
import pandas as pd
import datetime
import matplotlib.pyplot as plt


# 回测类
class BackTest:
def __init__(self, strategy, start, end, code, name):
self.__cerebro = None
self.__strategy = strategy
self.__start = start
self.__end = end
self.__code = code
self.__name = name
self.__result = None
self.__commission = 0.0003
self.__initcash = 0.01
self.init()

# 真正进行初始化的地方
def init(self):
self.__cerebro = bt.Cerebro()
self.__cerebro.addstrategy(self.__strategy)
self.settingCerebro()
self.createDataFeeds()

# 设置cerebro
def settingCerebro(self):
# 添加回撤观察器
self.__cerebro.addobserver(bt.observers.DrawDown)
# 设置手续费
self.__cerebro.broker.setcommission(commission=self.__commission)
# 设置初始资金为0.01
self.__cerebro.broker.setcash(self.__initcash)
# 添加分析对象
self.__cerebro.addanalyzer(btay.SharpeRatio, _name = "sharpe", riskfreerate = 0.02)
self.__cerebro.addanalyzer(btay.AnnualReturn, _name = "AR")
self.__cerebro.addanalyzer(btay.DrawDown, _name = "DD")
self.__cerebro.addanalyzer(btay.Returns, _name = "RE")
self.__cerebro.addanalyzer(btay.TradeAnalyzer, _name = "TA")

# 建立数据源
def createDataFeeds(self):
for i in range(len(self.__code)):
df_data = self._getData(self.__code[i])
start_date = list(map(int, self.__start.split("-")))
end_date = list(map(int, self.__end.split("-")))
dataFeed = bt.feeds.PandasData(dataname = df_data, name = self.__name[i], fromdate = datetime.datetime(start_date[0], start_date[1], start_date[2]), todate = datetime.datetime(end_date[0], end_date[1], end_date[2]))
self.__cerebro.adddata(dataFeed, name = self.__name[i])

# 获取账户总价值
def getValue(self):
return self.__cerebro.broker.getvalue()

# 执行回测
def run(self):
print("初始资金:%.2f" % self.getValue())
self.__results = self.__cerebro.run()
print("期末资金:%.2f" % self.getValue())

# 输出回测结果
def output(self):
self.__cerebro.plot(numfigs = 2)
plt.savefig("result.png")
print("夏普比例:", self.__results[0].analyzers.sharpe.get_analysis()["sharperatio"])
print("年化收益率:", self.__results[0].analyzers.AR.get_analysis())
print("最大回撤:%.2f,最大回撤周期%d" % (self.__results[0].analyzers.DD.get_analysis().max.drawdown, self.__results[0].analyzers.DD.get_analysis().max.len))
print("总收益率:%.2f" % (self.__results[0].analyzers.RE.get_analysis()["rtot"]))
self.__results[0].analyzers.TA.print()

# 获取数据
def _getData(self, code):
filename = code+".csv"
print("./" + filename)
# 已有数据文件,直接读取数据
if os.path.exists("./" + filename):
df = pd.read_csv(filename)
else: # 没有数据文件,用tushare下载
df = ts.get_k_data(code, autype = "qfq", start = start, end = end)
df.to_csv(filename)
df.index = pd.to_datetime(df.date)
df['openinterest']=0
df=df[['open','high','low','close','volume','openinterest']]
return df

再调用

1
2
3
4
5
6
7
8
9
if __name__ == "__main__":
# 加载数据,建立数据源
start = "2018-01-01"
end = "2020-07-05"
name = ["300ETF", "nasETF"]
code = ["510300", "513100"]
backtest = backtest.BackTest(TradeStrategy, start, end, code, name)
backtest.run()
backtest.output()

跟重构以前的运行结果一致。merge到主分支上,删除refactoring分支,提交。
本文代码: https://github.com/zwdnet/MyQuant/tree/master/46 trade.py和backtest.py两个文件。
总结一下,重构主要目的是在不改变程序功能的前提下消除代码的“坏味道”,让代码更加可读,bug更少。我也尝试了一下用pytest进行测试驱动开发,感觉完全先写测试再写代码还是比较困难。尤其是一些读取数据,文件操作等地方,测试很难写。还是先写出个能干活的程序,再用重构的原则,一点一点改,改一点就运行吧。接下来打算用这些代码实现一些经典的交易策略吧。

我发文章的三个地方,欢迎大家在朋友圈等地方分享,欢迎点“在看”。
我的个人博客地址:https://zwdnet.github.io
我的知乎文章地址: https://www.zhihu.com/people/zhao-you-min/posts
我的微信个人订阅号:赵瑜敏的口腔医学学习园地

欢迎打赏!感谢支持!