量化投资学习笔记182——实现经典交易策略价格动量策略

参考ZuraKakushadze,JuanAndrésSerur. 151 Trading Strategies.
我只看跟股票相关的,期权、期货、债券等等的就跳过了。
价格动量策略(Price Momentum)
原理:价格的动量效应,未来的收益与过去的收益正相关。
先要解决backtrade回测股票池的问题:
https://www.backtrader.com/blog/posts/2017-04-09-multi-example/multi-example/
https://backtest-rookies.com/2017/08/22/backtrader-multiple-data-feeds-indicators/
修改了我的程序,现在回测需要传入股票列表,哪怕只有一只股票。
写策略也有一些变化,核心是用

1
for i, d in enumerate(self.datas):

来遍历股票池。
具体策略:选出表现(依据累积收益率、平均收益等)最佳的N只股票,定期淘汰最差的,选入最好的。每只股票的权重可以相同(如1/N)或不同。
这样策略分成两个阶段:首先选出股票池中表现最好的N只买入,然后定期(每月)轮换。

接下来开始干活,首先是形成股票池,然后计算回测期间每个交易日每只股票的累积收益率。这个就比较耗时了,计算了10年的数据,在服务器上跑了近两天。原因可能是我把每个交易日所有股票的累积收益率排序,再选出前10名来。应该有更好的方法:排序时间复杂度是O(nlogn),如果直接找出前10来,可能遍历一次就行了,复杂度是O(n)。但这个跑一次就行了,结果直接保存到文件,回测的时候直接调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# 形成股票池
@run.change_dir
def make_pool(refresh = False):
data = pd.DataFrame()
path = "./datas/"
stockfile = path + "stocks.csv"
if os.path.exists(stockfile) and refresh == False:
data = pd.read_csv(stockfile, dtype = {"code":str, "昨日收盘":np.float64})
else:
stock_zh_a_spot_df = ak.stock_zh_a_spot()
stock_zh_a_spot_df.to_csv(stockfile)
data = stock_zh_a_spot_df
codes = select(data)
return codes


# 对股票数据进行筛选
def select(data, highprice = sys.float_info.max, lowprice = 0.0):
# 对股价进行筛选
smalldata = data[(data.最高 < highprice) & (data.最低 > lowprice)]
# 排除ST个股
smalldata = smalldata[~ smalldata.名称.str.contains("ST")]
# 排除要退市个股
smalldata = smalldata[~ smalldata.名称.str.contains("退")]

codes = []
for code in smalldata.代码.values:
codes.append(code[2:])

return codes


# 下载数据并形成累积收益率
def make_data(codes, start_date, end_date, refresh = False):
cumret = pd.Series()
n = len(codes)
i = 0
start = np.datetime64(datetime.datetime.strptime(start_date, "%Y%m%d"))
end = np.datetime64(datetime.datetime.strptime(end_date, "%Y%m%d"))
for code in codes:
print("下载数据进度", i/n)
i += 1
stock_data = ts.get_data(code = code,
start_date = start_date,
end_date = end_date,
adjust = "qfq",
period = "daily",
refresh = refresh)
if len(stock_data) == 0:
continue
date = stock_data.日期.values
start_gap = gap_days(start, date[0])
end_gap = gap_days(end, date[-1])
if start_gap == 0 and end_gap == 0:
# 生成累积收益率数据
stock_data["累积收益率"] = stock_data["收盘"]/stock_data["收盘"][0] - 1.0
cumret[code] = stock_data["累积收益率"]
return cumret


# 计算每日累积收益率
@run.change_dir
def get_top10(cumret, retry = False):
datafile = "./datas/cumreturn.csv"
if os.path.exists(datafile) and retry == False:
results = pd.read_csv(datafile)
results.日期 = pd.to_datetime(results.日期)
results.set_index("日期", drop = True, inplace = True)
return results
cumreturn = pd.DataFrame()
temp = pd.Series()
n = len(cumret)
m = len(cumret[0].index)
j = 0
print(m, n)
# input("按任意键继续")
for date in cumret[0].index:
# print(date)
i = 0
for stock in cumret:
j += 1
print("计算累积收益率进度:", j/(m*n))
temp["日期"] = date
temp["股票代码"] = cumret.index[i]
ret = stock[stock.index == date].values
if len(ret) == 0:
temp["累积收益率"] = np.NaN
else:
temp["累积收益率"] = stock[stock.index == date].values[0]
# print(temp["累积收益率"])
cumreturn = cumreturn.append(temp, ignore_index = True)
i += 1
# print(cumreturn)
results = pd.DataFrame()
top10 = []
for date in cumret[0].index:
temp = cumreturn[cumreturn.日期 == date]
temp = temp.sort_values(by = "累积收益率", ascending = False)
top10.append(temp.loc[:, ["股票代码"]].values[:10].T[0])
# results.append(temp)
# results = results.append({"日期": date, "累积收益率": temp}, ignore_index = True)
results["日期"] = cumret[0].index
results["累积收益率"] = top10
results.set_index("日期", drop = True, inplace = True)
results.to_csv(datafile)
print(results.info(), results.head())
return results


# 两个日期之间相差的天数
def gap_days(date1, date2):
return (date1 - date2)/np.timedelta64(1, 'D')


# 重新计算数据
def init_data(start_date = "20100108", end_date = "20201231", retry = False):
codes = make_pool()
cumret = make_data(codes, start_date, end_date, refresh = retry)
codes = cumret.index.values
if retry == True:
results = get_top10(cumret, retry = False)
return codes

得到累积收益率数据后,就开始写回测策略了。基本上就是每个周期间隔剔除累积收益率最低的股票,买入当天股票池累积收益最高的股票。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# 策略类
class PMStrategy(ts.Strategy):
"""
N,交易股票只数
period, 调仓周期
bprint, 是否输出交易过程
"""
params = (("N", 10),
("period", 20),
("bprint", False),)
def __init__(self, refresh = False):
super(PMStrategy, self).__init__()
datafile = "./datas/cumreturn.csv"
self.cumreturns = pd.read_csv(datafile)
self.cumreturns.日期 = pd.to_datetime(self.cumreturns.日期)
self.cumreturns.set_index("日期", drop = True, inplace = True)
self.bIn = False
self.bstart = True
self.days = 0 # 记录交易天数
self.bookmarker = pd.DataFrame()

# 数据转换
def transform(self, date):
stock_list = self.cumreturns.loc[str(date)]
s = stock_list.values[0][1:-1]
s = s.replace("'", "")
s_list = s.split()
return s_list

# 交易数量取整
def downcast(self, amount, lot):
return abs(amount//lot*lot)

def next(self):
s_list = self.transform(self.datas[0].datetime.date(0))
if self.bIn == False:
cash = self.broker.get_cash()/self.p.N
# 如果是第一次交易,直接使用排序结果
if self.bstart:
self.bookmarker["股票代码"] = s_list
self.bookmarker["买入价"] = 0.0
self.bstart = False
for stock in self.bookmarker["股票代码"].values:
data = self.getdatabyname(stock)
pos = self.getposition(data).size
# 计算交易数量
amount = self.downcast(cash*0.9/data.close[0], 100)
if not pos:
self.buy(data = data, size = amount)
self.p.N -= 1

self.bookmarker.买入价[self.bookmarker.股票代码 == stock] = data.close[0]
if self.is_lastday(data = data):
self.close(data = data)
self.bIn = True
# 到达交易天数
elif self.days == self.p.period:
self.bookmarker["现价"] = 0.0
self.bookmarker["累积收益率"] = 0.0
for code in self.bookmarker.股票代码.values:
data = self.getdatabyname(code)
self.bookmarker.现价[self.bookmarker.股票代码 == code] = data.close[0]
self.bookmarker.累积收益率[self.bookmarker.股票代码 == code] = data.close[0]/self.bookmarker[self.bookmarker.股票代码 == code].买入价 - 1.0

# 找出累积收益率最低的股票,卖出
min_code = self.bookmarker[self.bookmarker.累积收益率 == self.bookmarker.min().累积收益率].股票代码.values[0]
min_data = self.getdatabyname(min_code)
self.close(data = min_data)
self.bookmarker = self.bookmarker[self.bookmarker.股票代码 != min_code]
self.p.N += 1
self.bIn = False
# 放入累积收益最高的股票
self.bookmarker = self.bookmarker.append({"股票代码":s_list[0], "买入价":0.0, "现价":0.0, "累积收益率":0.0}, ignore_index = True)
self.days = 0
else:
self.days += 1

def is_lastday(self,data):
try:
next_next_close = data.close[2]
except IndexError:
return True
except:
print("发生其它错误")
return False

回测结果

年化3.2%,好像不咋样。可以进行参数调优的,但回测一次就要跑半个多小时,还是就算了。

源代码

我发文章的三个地方,欢迎大家在朋友圈等地方分享,欢迎点“在看”。

我的个人博客地址:https://zwdnet.github.io

我的知乎文章地址: https://www.zhihu.com/people/zhao-you-min/posts

我的微信个人订阅号:赵瑜敏的口腔医学学习园地

欢迎打赏!感谢支持!