当前位置: 首页 > news >正文

【最后203篇系列】029 基于Machinations构造回测系统

1 背景

我发现我过去做了很多功能,但是大量的工作并没有让我得到更多的便利,主要矛盾有:

  • 1 一些长期工程很难推进。
  • 2 现有的小项目维护成本很高。

以量化系统为例,我在很早很早以前(大约10年前)就有了基础的想法,但在很长时间里,都处于一种比较混沌的状态。由于涉及的面的确很多,包括算法、架构、业务知识,所以一开始我是认为我的准备不足所以无法构建这样的系统的。而实际上,站在现在角度看,我在5年之前实际上就已经基本具备了这样的能力,问题在哪?

最近接触到了一个新概念,或者说我对这个概念有了更深的体会。

系统思考(Systems Thinking) 是一种分析和解决问题的方法,强调从整体视角看待复杂系统,关注系统各组成部分之间的相互作用和动态变化,而非孤立地研究单个元素。它认为系统的行为是由其结构和内在关联驱动的,而非简单的线性因果关系。

系统的整体框架:
在这里插入图片描述
思维的本质差异:
在这里插入图片描述
反思下来,我一开始关于系统性方面的考虑是不合格的,而且过于专注“长链深挖”,而忽视了“平面布局”。简单的比喻就是,我们看书是隔着一段距离的,当我们发现精彩内容时才会聚焦在一小段文字上来回思考,这时候在注意力上放大了一部分内容,进入深度思考。平面布局就是起码的书本,章节,页面,而长链深挖就是书中的精彩部分。

用长链深挖的方式能看完一本书吗? 或许能,但看的速度很慢,搞不好还近视眼。

这大概就是我做了很多,甚至还反复做了很多,但始终没有觉得从这些事从得到太多便利。很多时候只能证明我可以具有某种能力或者潜力。类似于你有了发动机、轮胎、方向盘、刹车,但是没有组装起来,并没有什么用。

然后就是线性思维,没有享受到车的便利,可能是因为做的不够好,然后改进发动机,轮胎… 最后可能到了某个时间点,可能也会自然的想到组装为系统,但是效率太低了( bottom up)。

特地又问了问大模型

自底向上的方式就是会产生类似的问题。(形而下者谓之器)
在这里插入图片描述
自顶向下的方式,最怕的适合现实脱节,所以如果没有很好的底层知识,容易走错方向。(形而上者谓之道)
在这里插入图片描述

用流形的话来说,最好的方案是“双向奔赴”,最终达到某个中间的协同层。这样才是有一个完成的方案,分工明确:

  • 1 顶层先提出一个正确的方案(从任何一个基本的哲学角度出发,都能达到)
  • 2 这个方案通常是一个两层的系统,分为整体和局部,以及流转逻辑。
  • 3 确定局部的组件设计,将其进一步分解为各具体功能。
  • 4 采用底层工具来支撑具体功能。
  • 5 反复调整,迭代,直到实现

2 Machinations

Machinations是一款非常优秀的游戏原型设计工具,我觉得最棒的地方是抽象。Machi只抽象出了若干个简洁的基本元素,然后就能够用于构造几乎任何的复杂模型。
在这里插入图片描述
然后正如这本书提到的
在这里插入图片描述

精确性、最小化和可视化是数据模型的三个基本特征。

所以到这里,我觉得Machi的本质是一种数据模型,与以往相比,是否可视化是检验的一个标准。功能被抽象后表达为十个以内的简单几何符号,真的很容易可视化。

这次会用到几个元素

应用的元素

1 Source

向上的三角形代表资源,这个是注入系统的部分。那个小星号,应该是代表automatic,即在每个周期都会启动一次(或者注入资源,或者没有)
在这里插入图片描述

2 Drain

从系统消耗资源,例如把资金从系统中提取出来
在这里插入图片描述

3 Pool

池,起到寄存器的作用。比如一个子系统,有一个资金池和一个股票池。
在这里插入图片描述

4 Gate

门,是重要的组件。在之后的简单示例里用不到,但是在后续会有非常重要的作用。Gate = Choices, 系统的优化,强化学习,不就是在一个庞大的选择空间中找到一个最合适的组合吗?
在这里插入图片描述

5 Trader

交换器。在回测中,需要使用交换器来完成资金和股票的交换。有另一个类似的概念是转化器,我细想了一下,是不应该放在回测系统里的。转换器意味着资源的永久消耗以及一种全新资源的创造,在交易环节,不会有这样的情况。

在这里插入图片描述

6 Register

注册器。按官方的解释是用于计算用于状态的公式。
在这里插入图片描述
在这里插入图片描述

从图的角度看,以上6种都是节点,没有边。其实在资源发生流动的时候,是有边的逻辑概念的,但是的确没有作为独立的对象出现。我们在程序中对pool a 增加10个资源,对pool b减少5个资源,这一对操作在发生时其实就是边了。

边1: Resource Connection

资源连接器,指明了由A到B的资源流动。
在这里插入图片描述

边2:State Connection

状态连接器,数据来自于某个节点
在这里插入图片描述

可以看到,即使设计一个很小的系统也用到了大部分组件;然而,即使做的更复杂,也不会用到更多的组件了(只剩下转换器和延时器这次不会被用到)

3 实例

现在来实现一个简单的回测系统的。
在这里插入图片描述
假设我们要对一个标的进行回测,基本上是先给到资金,然后随着时间推移,在每个时刻决定是否买/卖,在一段时间之后,来看最后的资源,以及在整个过程中的起伏。

在设计过程中,我发现原有的设计是不够的。因为此时系统并不是孤立的,而是需要追随一个更大的环境。在某个时刻的价格是什么?或者说类似温度之类的系统信息怎么表达?所以我想,必须要增加一些灰色的实心圆,用来表示系统的状态。

假设我们的回测内容如下:

  • 1 在回测开始时给到一笔资金,例如3万元
  • 2 开始执行后,每个回合更新信号和价格数据
  • 3 当信号为买入或者卖出时,交换器才会被激活
  • 4 交换被激活后,会尝试将钱转为股票,或者将股票转为钱
  • 5 每回合统计总资产,低于2.5万则停止(即产生了约15%的亏损)

首先准备好数据

在这里插入图片描述
对应的时间轴如下
在这里插入图片描述
接下来实现各组件:

资源池

资金和股票两个元素可以

from dataclasses import dataclass, asdict
from typing import List, Dict,Optional@dataclass
class ResourcePool:"""Machinations资源池"""name: strvalue: float = 0.0capacity: Optional[float] = None  # None表示无限容量def add(self, amount: float):"""向资源池添加资源"""if self.capacity is not None:self.value = min(self.value + amount, self.capacity)else:self.value += amountdef remove(self, amount: float) -> float:"""从资源池移除资源,返回实际移除的量"""actual = min(amount, self.value)self.value -= actualreturn actual# 输出为字典def dict(self):return asdict(self)# 测试
cash = ResourcePool(name = '资金', value= 30_000)
print(cash)
cash.add(2000)
print(cash)
cash.remove(1000)
print(cash)ResourcePool(name='资金', value=30000, capacity=None)
ResourcePool(name='资金', value=32000, capacity=None)
ResourcePool(name='资金', value=31000, capacity=None)

交换器

@dataclass
class Trader:"""Machinations转换器"""name: strinput_pool: ResourcePool  # 输入资源池output_pool: ResourcePool  # 输出资源池rate: float = 1.0  # 转换率gate: Optional[ThresGate] = None  # 可选的控制门def trade(self, amount : float=20000) -> bool:"""执行转换操作"""# 检查门控条件if self.gate and not self.gate.check(amount):return False# 尝试从输入池获取资源if self.input_pool.value < amount:return Falseprint('trade(buy)')# 执行交换self.input_pool.remove(amount)# 添加资源到输出池self.output_pool.add( amount * self.rate)return Truedef trade_all(self):# 检查门控条件if self.gate and not self.gate.check(amount):return Falseprint('trade(sell)')all_amount = self.input_pool.value# 执行交换self.input_pool.remove(all_amount)# 添加资源到输出池self.output_pool.add(all_amount * self.rate)return True

市场价格与决策信号

from pydantic import BaseModel, computed_field
class MarketPrice(BaseModel):price : floatsignal : int # 加上磨损,形成更贴近真实的保守转换率@computed_field(return_type=float)def buy_rate(self):return 1 / (self.price * 1.003)@computed_field(return_type=float)def sell_rate(self):return self.price * 0.997@computed_field(return_type=str)def decision(self):if self.signal >= 4:return 'buy'elif self.signal <=-4:return 'sell'else:return 'hold'def buy_condition(self):return self.decision == 'buy'def sell_condition(self):return self.decision =='sell'

触发器

@dataclass
class Trigger:"""Machinations触发器"""name: strcondition: callable  # 条件函数action: callable  # 触发动作active: bool = Truedef check(self):"""检查条件并触发动作"""if self.active and self.condition():self.action()

回测过程

for i in range(len(environment_df)):tem_dict = dict(environment_df.iloc[i])# print(tem_dict)market_price = MarketPrice(price= tem_dict['close'] , signal= tem_dict['signal']) buy_trader = Trader('买入交易', input_pool=cash ,output_pool=stock,rate=market_price.buy_rate)sell_trader = Trader('卖出交易', input_pool=stock ,output_pool=cash,rate = market_price.sell_rate)buy_trigger = Trigger('买入触发', condition=market_price.buy_condition, action= buy_trader.trade)sell_trigger = Trigger('买入触发', condition=market_price.sell_condition, action= sell_trader.trade_all)buy_trigger.check()sell_trigger.check()print(f"【{tem_dict['dt']}】 - 现金 :{cash.value} - 股票: {stock.value} - 总资产 - {cash.value + stock.value *  market_price.sell_rate} "  )

过程打印

【2023-03-23 00:00:00】 - 现金 :30000 - 股票: 0 - 总资产 - 30000.0 
【2023-03-24 00:00:00】 - 现金 :30000 - 股票: 0 - 总资产 - 30000.0 
【2023-03-27 00:00:00】 - 现金 :30000 - 股票: 0 - 总资产 - 30000.0 
【2023-03-28 00:00:00】 - 现金 :30000 - 股票: 0 - 总资产 - 30000.0 
【2023-03-29 00:00:00】 - 现金 :30000 - 股票: 0 - 总资产 - 30000.0 
【2023-03-30 00:00:00】 - 现金 :30000 - 股票: 0 - 总资产 - 30000.0 
【2023-03-31 00:00:00】 - 现金 :30000 - 股票: 0 - 总资产 - 30000.0 
【2023-04-03 00:00:00】 - 现金 :30000 - 股票: 0 - 总资产 - 30000.0 
【2023-04-04 00:00:00】 - 现金 :30000 - 股票: 0 - 总资产 - 30000.0 
trade(buy)
【2023-04-06 00:00:00】 - 现金 :10000 - 股票: 4877.734701960655 - 总资产 - 29880.358923230313 
【2023-04-07 00:00:00】 - 现金 :10000 - 股票: 4877.734701960655 - 总资产 - 30021.3888666681 
【2023-04-10 00:00:00】 - 现金 :10000 - 股票: 4877.734701960655 - 总资产 - 29948.44234420028 
【2023-04-11 00:00:00】 - 现金 :10000 - 股票: 4877.734701960655 - 总资产 - 29899.81132922173 
【2023-04-12 00:00:00】 - 现金 :10000 - 股票: 4877.734701960655 - 总资产 - 29880.358923230313 
【2023-04-13 00:00:00】 - 现金 :10000 - 股票: 4877.734701960655 - 总资产 - 29758.78138578394 
【2023-04-14 00:00:00】 - 现金 :10000 - 股票: 4877.734701960655 - 总资产 - 29860.90651723889 
【2023-04-17 00:00:00】 - 现金 :10000 - 股票: 4877.734701960655 - 总资产 - 30157.555708608033 
【2023-04-18 00:00:00】 - 现金 :10000 - 股票: 4877.734701960655 - 总资产 - 30191.597419093017 
【2023-04-19 00:00:00】 - 现金 :10000 - 股票: 4877.734701960655 - 总资产 - 30031.11506966381 
【2023-04-20 00:00:00】 - 现金 :10000 - 股票: 4877.734701960655 - 总资产 - 29953.305445698134 
trade(sell)
【2023-04-21 00:00:00】 - 现金 :29598.29903635474 - 股票: 0.0 - 总资产 - 29598.29903635474 
trade(sell)
【2023-04-24 00:00:00】 - 现金 :29598.29903635474 - 股票: 0.0 - 总资产 - 29598.29903635474 
trade(sell)
【2023-04-25 00:00:00】 - 现金 :29598.29903635474 - 股票: 0.0 - 总资产 - 29598.29903635474 
【2023-04-26 00:00:00】 - 现金 :29598.29903635474 - 股票: 0.0 - 总资产 - 29598.29903635474 
【2023-04-27 00:00:00】 - 现金 :29598.29903635474 - 股票: 0.0 - 总资产 - 29598.29903635474 
...【2025-04-02 00:00:00】 - 现金 :11872.604377619704 - 股票: 4900.511049794829 - 总资产 - 31313.240444351926 
trade(sell)
【2025-04-03 00:00:00】 - 现金 :31215.52425401902 - 股票: 0.0 - 总资产 - 31215.52425401902 
trade(sell)
【2025-04-07 00:00:00】 - 现金 :31215.52425401902 - 股票: 0.0 - 总资产 - 31215.52425401902 
【2025-04-08 00:00:00】 - 现金 :31215.52425401902 - 股票: 0.0 - 总资产 - 31215.52425401902 
【2025-04-09 00:00:00】 - 现金 :31215.52425401902 - 股票: 0.0 - 总资产 - 31215.52425401902 
【2025-04-10 00:00:00】 - 现金 :31215.52425401902 - 股票: 0.0 - 总资产 - 31215.52425401902 
【2025-04-11 00:00:00】 - 现金 :31215.52425401902 - 股票: 0.0 - 总资产 - 31215.52425401902 
【2025-04-14 00:00:00】 - 现金 :31215.52425401902 - 股票: 0.0 - 总资产 - 31215.52425401902 
【2025-04-15 00:00:00】 - 现金 :31215.52425401902 - 股票: 0.0 - 总资产 - 31215.52425401902 
【2025-04-16 00:00:00】 - 现金 :31215.52425401902 - 股票: 0.0 - 总资产 - 31215.52425401902 

在这里插入图片描述

总结

做完了之后我反思了一下,这和我之前写的程序有什么不同。仅仅是一种口味的变化,还是层次的上升?

原来的代码,有一个即时手搓的,是这样的,逻辑是都在里面了,逻辑都在里面了,但是比较难看,也比较难改。这里没有什么组件设计,只是把需要的逻辑堆叠在一起。有点像直接翻过一堵矮墙,姿势比较难看,也没有什么复用性。这次做的多少有点设计感,各功能部件还是高度抽象的,这意味着调试更简单,而且不只可以用在量化,还可以用在普通的程序设计中,复用性很好。

    init_cash = 20_000init_stock = 0 capital = init_cash + init_stockopen_orders = []trades = []fee_rate = 0.005# vol = 2_000for rec in lod:if len(open_orders) :is_ok_to_sell = True is_ok_to_buy = False else:is_ok_to_sell = False is_ok_to_buy = True if is_ok_to_buy:if rec['signals'] >= 4:tem = Naive()tem.order_id = 'order_1_%s_' % some_code + rec['date'].replace(' ','_')   # get_shortuuid()tem.code = some_codevol =  (init_cash // (rec['close']*100) ) * 100tem.buy_vol = voltem.buy_price = rec['close']tem.buy_amt = tem.buy_vol * tem.buy_pricetem.buy_dt =  rec['date']tem.buy_fee = tem.buy_amt  * fee_rateopen_orders.append(tem.dict())if is_ok_to_sell:if rec['signals'] <=-4:buy_order_dict = open_orders[0]tem = Naive()tem.order_id = buy_order_dict['order_id']tem.sell_vol  = buy_order_dict['buy_vol']tem.sell_price  =  rec['close']tem.sell_amt = tem.sell_vol * tem.sell_pricetem.sell_dt =  rec['date']tem.sell_fee = tem.sell_amt  * fee_rate# 计算结果buy_order_dict.update(tem.dict())buy_order_dict['gp'] = buy_order_dict['sell_amt'] -buy_order_dict['buy_amt'] buy_order_dict['gpr'] = buy_order_dict['gp'] / buy_order_dict['buy_amt'] trades.append(buy_order_dict)open_orders = []offline_trade_df_list.append(pd.DataFrame(trades))offline_open_df_list.append(pd.DataFrame(open_orders))

再看一个更早,花更多时间做的一个版本:看着像是一个比较完整的对象,但其实也是缺乏设计,代码非常臃肿,所以实验之后很快放弃了。所以这也并不是流程和对象的问题,或者说,仅仅是形式上按对象的方式整理是没有用的。原来你的逻辑是臃肿的,那么封装成对象,里面的内容仍然是臃肿的(没法阅读和改进),这时候的对象并没有起到抽象作用,而仅仅是封装作用。

import time# 每个策略的外在驱动是行情数据 {'data_slot', 'close'};信号则是由事先准备好的(或者是步骤更早的处理生成的)class Order2:def __init__(self, strategy_data_dict = None, amt_per_order = 5000, open_order_limit = 1,order_win_stop = 0.1, order_loss_stop = -0.02,fix_fee = 0 , flex_fee_rate = 0.005, order_ttl = 600, order_target_stop = 0.01):# 外接对象self.strategy_data_dict = strategy_data_dictself.amt_per_order = amt_per_orderself.open_orders = strategy_data_dict['orders']['open_orders']self.close_orders = strategy_data_dict['orders']['close_orders']self.order_ttl = order_ttlself.order_target_stop = order_target_stopself.open_order_limit = open_order_limitself.order_win_stop = order_win_stopself.order_loss_stop = order_loss_stopself.fix_fee = fix_feeself.flex_fee_rate = flex_fee_rate# 买卖均会更新此属性# storageself.last_order_dict = strategy_data_dict['orders']['last_order_dict']# useself.last_order_slot = self.last_order_dict.get('last_order_slot') or 0 self.last_order_buy_amt = self.last_order_dict.get('last_order_buy_amt') or 0 self.last_order_sell_amt = self.last_order_dict.get('last_order_sell_amt') or 0 # 持有的订单数量self.open_order_num = len(self.open_orders)self.change_tuple_list = strategy_data_dict['orders']['change_tuple_list']self.bak_change_tuple_list = strategy_data_dict['orders']['bak_change_tuple_list']# 占位self.slot_data = {} # 时隙数据self.sell_sel = Noneself.on_hold_amt = 0def setattr(self, data_dict):self.slot_data = data_dictdef cal_buy_vol(self, price):return int((self.amt_per_order/ (price*100))) * 100# buy ~ highdef buy(self):if len(self.open_orders) < self.open_order_limit:order_dict = {}order_dict['buy_slot'] = self.slot_data['data_slot']order_dict['buy_dt'] = self._slot_ord2str(order_dict['buy_slot'])order_dict['buy_vol'] = self.cal_buy_vol(self.slot_data['high'])order_dict['buy_amt'] = round(order_dict['buy_vol'] * self.slot_data['high'])order_dict['buy_price'] = self.slot_data['high']order_dict['trade_slots'] = 0# 添加数据 - 在使用可变对象(未销毁)不必单独的存储self.open_orders.append(order_dict)# 更新属性# self.open_order_num = len(self.open_orders)# 当前对象的刷新self.last_order_slot  = self.slot_data['data_slot']self.last_order_buy_amt = order_dict['buy_amt'] # 外部存储self.last_order_dict['last_order_slot'] = self.slot_data['data_slot']self.last_order_dict['last_order_buy_amt'] = order_dict['buy_amt'] open_order_df = pd.DataFrame(self.open_orders)self.on_hold_amt = open_order_df['buy_amt'].sum()# slot, on_hold, cashthe_tuple = self.slot_data['data_slot'], self.on_hold_amt, -1 * order_dict['buy_amt'] self.change_tuple_list.append(the_tuple)# sell ~ lowdef sell(self, is_force = False):if len(self.open_orders) == 0:return None open_order_df = pd.DataFrame(self.open_orders)if not is_force:if self.sell_sel is None: # 由评估(evaluate)修改sell_sel,符合控制条件的被卖出,如果没有就挑出sell actionreturn Nonereason ='Control Sell'open_order_df1 = open_order_df[self.sell_sel]else: # 通过信号触发的,强制的卖出reason ='Signal Sell'open_order_df1 = open_order_df# 计算属性open_order_df1['sell_slot'] = self.slot_data['data_slot']open_order_df1['sell_dt'] = open_order_df1['sell_slot'].apply(self._slot_ord2str)open_order_df1['sell_price'] = self.slot_data['low']open_order_df1['sell_vol'] = open_order_df1['buy_vol']open_order_df1['sell_amt'] = open_order_df1['sell_price'] * open_order_df1['sell_vol']open_order_df1['fix_fee'] = self.fix_feeopen_order_df1['flex_fee_rate'] = self.flex_fee_rateopen_order_df1['total_fee'] = round(open_order_df1['sell_amt'] *open_order_df1['flex_fee_rate'] + open_order_df1['fix_fee'])open_order_df1['gp'] = (open_order_df1['sell_amt'] - open_order_df1['buy_amt']).apply(lambda x: round(x,2))open_order_df1['np'] = (open_order_df1['gp'] - open_order_df1['total_fee'] ).apply(lambda x: round(x,2))open_order_df1['npr'] = round(open_order_df1['np']/open_order_df1['buy_amt'] , 3) open_order_df1['is_win'] = open_order_df1['npr'].apply(lambda x: 1 if x > 0 else 0)open_order_df1['hold_slots'] = open_order_df1['sell_slot'] - open_order_df1['buy_slot'] open_order_df1['reason'] = reasonopen_order_df1['sell_dt'] = open_order_df1['sell_slot'].apply(self._slot_ord2str)open_order_df1['buy_yymon'] = open_order_df1['buy_dt'].apply(lambda x: x[:7])open_order_df1['buy_yy'] = open_order_df1['buy_dt'].apply(lambda x: x[:4])# 更新属性if  self.sell_sel is None: # 在signal模式(force)下,这些属性会被清空;否则此时必然sell_sel非空(非force sell_sel空在之前已经跳出)self.open_orders = []self.open_order_num = len(self.open_orders)self.on_hold_amt = 0# storage | 销毁了原对象,所以要覆盖self.strategy_data_dict['orders']['open_orders'] = self.open_orderselse:self.open_orders = open_order_df[~self.sell_sel].to_dict(orient='records')self.on_hold_amt = open_order_df[~self.sell_sel]['buy_amt'].sum()# storage  | 销毁了原对象,所以要覆盖self.strategy_data_dict['orders']['open_orders'] = self.open_orders# 当前对象的刷新self.last_order_slot  = self.slot_data['data_slot']self.last_order_sell_amt = open_order_df1['sell_amt'].sum() - open_order_df1['total_fee'].sum()# 外部对象的存储self.last_order_dict['last_order_slot'] = self.slot_data['data_slot']self.last_order_dict['last_order_sell_amt'] = open_order_df1['sell_amt'].sum() - open_order_df1['total_fee'].sum()# slot, on_hold, cashthe_tuple = self.slot_data['data_slot'], self.on_hold_amt ,  self.last_order_dict['last_order_sell_amt']self.change_tuple_list.append(the_tuple)self.close_orders += open_order_df1.to_dict(orient ='records')return True# evaluate ~ closedef evaluate(self):if len(self.open_orders) ==0:self.sell_sel = Noneself.on_hold_amt = 0return None open_order_df = pd.DataFrame(self.open_orders)# gap_pct - 从涨幅来看,达到单个订单上界时卖出,或者达到指定的目标值gap_pct_s = (self.slot_data['close'] - open_order_df['buy_price'])/open_order_df['buy_price']gap_pct_s_sel = gap_pct_s.apply(lambda x: True if x >= self.order_win_stop or x < self.order_loss_stop or x >= self.order_target_stop else False)# print('gap_pct_s_sel:', gap_pct_s_sel.sum())# ttlttl_s_sel = open_order_df['trade_slots'].apply(lambda x: True if x  >= self.order_ttl else False)# print('ttl_s_sel:', ttl_s_sel.sum())order_sel = gap_pct_s_sel | ttl_s_selif order_sel.sum() ==0 : # 如果没有需要控制(卖出)的,sell_sel为空self.sell_sel = None else: # 否则sell_sell 会被复制self.sell_sel = order_selself.on_hold_amt = (open_order_df['buy_vol']*self.slot_data['close']) .sum()# get_change_tuple ~ data_slot, on_hold, capitaldef get_change_tuple(self):if len(self.change_tuple_list):the_tuple = self.change_tuple_list.pop(0)self.bak_change_tuple_list.append(the_tuple)return the_tuple @staticmethoddef _get_time_str1(ts = None,bias_hours=0):ts = ts or time.time()return time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(ts + bias_hours*3600)) @staticmethod# 将标准时间戳转为时间轴时序 day/hour/minutedef _ts2ord(ts = None, tx_level='minute', bias_hours=8):ts = ts or time.time()tx_level = tx_level.lower()assert tx_level in ['day','hour','minute'], '只有 day, hour, minute三个级别'if tx_level == 'minute':res = (ts + bias_hours * 3600)//60elif tx_level == 'hour':res = (ts + bias_hours * 3600)//3600else:res = (ts + bias_hours * 3600)//86400return int(res)@staticmethod# 将时序映射回字符def _slot_ord2str(some_slot, tx_level='minute', bias_hours=-8, fs=None):tx_level = tx_level.lower()assert tx_level in ['day','hour','minute'], '只有 day, hour, minute三个级别'if tx_level == 'minute':res = Order2._get_time_str1(some_slot*60, bias_hours=bias_hours)elif tx_level == 'hour':res = Order2._get_time_str1(some_slot*3600, bias_hours=bias_hours)else:res = Order2._get_time_str1(some_slot*86400, bias_hours=bias_hours)return resfrom empyrical import max_drawdown
import ffnimport pandas as pd 
import numpy as np # 每策略每时隙最多一个订单
class Capital2:def __init__(self, cash_slot_list = None,onhold_slot_list = None ,slot_list = None,capital_slot_list=None,drawback_watch_window = 10000):self.cash_slot_list = cash_slot_listself.onhold_slot_list = onhold_slot_listself.slot_list = slot_listself.capital_slot_list = capital_slot_listself.capital_drawback = 0self.capital_loss_r = 0 self.drawback_watch_window = drawback_watch_window# Datadef appendattr(self, data_tuple):the_slot, the_onhold, the_cash = data_tupleself.slot_list.append(the_slot)self.onhold_slot_list.append(the_onhold)updated_cash = self.cash_slot_list[-1] + the_cashself.cash_slot_list.append(updated_cash)updated_captical = self.onhold_slot_list[-1] + self.cash_slot_list[-1]self.capital_slot_list.append(updated_captical)# 计算回撤@staticmethoddef cal_max_drawdown(s):if not isinstance(s,pd.Series):s = pd.Series(s)r = ffn.to_returns(s)return max_drawdown(r)# 计算损失@staticmethoddef cal_capital_loss(base,x):return (x-base)/base# Drawbackdef drawback(self, s = None):if s is None:s = self.capital_slot_lists = s[-self.drawback_watch_window:]self.capital_drawback = self.cal_max_drawdown(s)# lossdef cap_loss(self):init_val = self.capital_slot_list[0]new_val = self.capital_slot_list[-1]self.capital_loss_r = self.cal_capital_loss(init_val, new_val)def evaluate(self):self.drawback()self.cap_loss()# 给到某个时隙的行情与信号,作出决策
class Strategy2:# order -> control -> capitaldef __init__(self,# orderamt_per_order = 5000,open_order_limit =1,order_win_stop = 0.1,order_loss_stop = -0.02,fix_fee = 0, flex_fee_rate = 0.005,order_ttl = 600, order_target_stop = 0.01,#  strategy - controlcapital_win_stop = 10,capital_loss_stop = -0.1,capital_drawback_stop = -0.1,cool_down_slots = 600,# capitalinit_cash = 5000, drawback_watch_window = 10000,# 历史存储将放于外部strategy_data_dict = None):# 变量关系:如果实际的本金少于一定程度,那么进行提升(以每笔订单金额为基准)min_init_cash = amt_per_order * (1 + abs(capital_drawback_stop))self.init_cash = max(init_cash, min_init_cash)# - orderself.amt_per_order = amt_per_orderself.open_order_limit =open_order_limitself.order_win_stop =order_win_stopself.order_loss_stop =order_loss_stopself.fix_fee =fix_feeself.flex_fee_rate =flex_fee_rate# - controlself.capital_win_stop = capital_win_stopself.capital_loss_stop = capital_loss_stopself.capital_drawback_stop = capital_drawback_stopself.cool_down_slots = cool_down_slots# - capitalself.drawback_watch_window = drawback_watch_window# - data 在每个时隙结束完毕,下一个时隙开始之前是一次改变决策的机会。例如改变交易的金额,此时改变参数,然后将历史数据置空就可以了。# 在上层的对象中(BackTesting 来控制堆栈,老策略终止时将会被推入堆栈,否则会keep alive)if strategy_data_dict is None:self.strategy_data_dict = self._construct_new(self.init_cash)else:self.strategy_data_dict = strategy_data_dict# 时隙数据,运算时临时使用self.slot_data = {}self.slot_rule_data = {}# 数据分离cash_slot_list = self.strategy_data_dict['capital']['cash_slot_list']onhold_slot_list = self.strategy_data_dict['capital']['onhold_slot_list']slot_list = self.strategy_data_dict['capital']['slot_list']capital_slot_list=self.strategy_data_dict['capital']['capital_slot_list']# 对象挂接-功能self.Order = Order2(strategy_data_dict= self.strategy_data_dict,amt_per_order = amt_per_order, open_order_limit = open_order_limit, order_win_stop = order_win_stop, order_loss_stop = order_loss_stop,fix_fee = fix_fee , flex_fee_rate = flex_fee_rate,order_ttl = order_ttl, order_target_stop = order_target_stop)self.Capital = Capital2(cash_slot_list=cash_slot_list,onhold_slot_list=onhold_slot_list,slot_list = slot_list, capital_slot_list= capital_slot_list,drawback_watch_window=drawback_watch_window)# 状态self.rule_result = 'Pass','按默认信号规则处理'# 构造空的数据结构 - 如果没有预先的历史,那么新生成一个数据@staticmethoddef _construct_new(init_cash):# 每个策略的数据结构data_struct = {}# orders 采取多买一卖模式,受到的买入限制为订单最大笔数、现金量等;受到的卖出限制为每单回撤# orders 依赖于现金和持仓,同时也会对现金和持仓产生影响# orders 分析的方法可以挪到strategy,在这一层执行分析data_struct['orders'] = {}data_struct['orders']['open_orders'] = []data_struct['orders']['close_orders'] = []data_struct['orders']['last_order_dict'] = {}data_struct['orders']['change_tuple_list'] = []data_struct['orders']['bak_change_tuple_list'] = []# capital 会计算区间回撤以及资本金损失两个指标,这两个指标会进行宏观控制,强制终止策略# capital 按时隙记录了持仓和现金data_struct['capital'] = {}data_struct['capital']['cash_slot_list'] = [init_cash]data_struct['capital']['onhold_slot_list'] = [0]data_struct['capital']['slot_list'] = [0]data_struct['capital']['capital_slot_list'] = [init_cash]# signals由外部产生信号data_struct['signals'] = {}return data_struct# 更新信号 signal_dict keys ~ slots | values ~ 离散的维度信号 ~ 信号结论  def update_merge_signal(self, signal_dict):self.strategy_data_dict['signals'].update(signal_dict)# 执行策略: market_data_dict ~ data_slotdef run(self, market_data_dict, skip_rule = False):# S01 连接-输入# 当前数据(在这个时隙下执行策略)self.slot_data['data_slot'] = market_data_dict['data_slot']self.slot_data['high'] = market_data_dict['high']self.slot_data['low'] = market_data_dict['low']self.slot_data['close'] = market_data_dict['close']# S02 处理-计算规则属性self.cal_rule_attr()# S03 处理-计算规则结果tem = self.exe_rule()# print(tem)# S04 处理-信号执行if not skip_rule:rule_result = self.rule_resultif rule_result[0] == 'Pass':self.pass_process()else:self.pass_process()# S05 连接-输出# run-s02def cal_rule_attr(self):data_dict = self.slot_data# print(data_dict)self.slot_rule_data['data_slot'] = data_dict['data_slot']self.slot_rule_data['last_order_slot'] = self.Order.last_order_slotself.Capital.evaluate()self.slot_rule_data['capital_drawback'] = self.Capital.capital_drawbackself.slot_rule_data['capital_loss_r'] = self.Capital.capital_loss_rself.slot_rule_data['cash'] = self.Capital.cash_slot_list[-1]self.slot_rule_data['hold_orders'] = len(self.Order.open_orders)# 增加订单交易时间的自动计数,随着时间推移必然递增 - 每次循环都会进行,这是不对的if len(self.Order.open_orders):for tem_dict in self.Order.open_orders:# 因为每次都会计次,所以改为0.4tem_dict['trade_slots'] +=0.4return True# run-s03 规则判定def exe_rule(self):if self.slot_rule_data['capital_drawback']  < self.capital_drawback_stop:self.rule_result = 'Stop','回撤过大'return self.rule_resultif self.slot_rule_data['capital_loss_r'] < self.capital_loss_stop:self.rule_result = 'Stop','本金损失过大'return self.rule_resultif self.slot_rule_data['hold_orders'] ==0 and  self.slot_rule_data['cash'] < self.amt_per_order:self.rule_result = 'Stop','本金不足以完成单次交易'return self.rule_result# print(self.slot_data['data_slot'] - self.slot_rule_data['last_order_slot'] )if self.slot_data['data_slot'] - self.slot_rule_data['last_order_slot'] < self.cool_down_slots: self.rule_result = 'Continue','交易冷却时间未到' # 冷却时间可以依据半衰期和交易规则指定,例 半衰期的目标达成率只有0.2不到,所以前期可以冷却return self.rule_resultself.rule_result = 'Pass','按默认信号规则处理'return self.rule_result# run-s04 信号处理 if self.rule_result[0] == 'Pass' | buy - signal sell - control selldef pass_process(self):# ps01 - 连接:输入cur_slot = self.slot_data['data_slot']cur_signal = self.strategy_data_dict['signals'][cur_slot]['final_signal']self.Order.setattr(self.slot_data)# ps02 - 处理:评估及交易# 如果有open订单,先执行订单的评估(而不管是什么信号)if len(self.Order.open_orders):if cur_signal == -1: # 信号触发的必然卖出is_sell = self.Order.sell(is_force=True)else:self.Order.evaluate()is_sell = self.Order.sell()else:is_sell = Noneif (len(self.Order.open_orders) < self.Order.open_order_limit) and (not is_sell) and cur_signal ==1:self.Order.buy()# ps03 - 连接:输出the_capital_change = self.Order.get_change_tuple()if the_capital_change:self.Capital.appendattr(the_capital_change)      # 保存本次的结果def save(self):pass

回到之前的问题 仅仅是一种口味的变化,还是层次的上升?

我觉得这次的改进应该算是层次的上升,设计清晰,简单明了。而且未来的系统基本上不会超过20类组件(目前只有10个,我会之后会做必要的拓展),逐渐的把所有的程序,都组件化,只留下特殊的逻辑,超复杂的逻辑开发需要专门处理,然后封装到组件中;大部分的逻辑应该都是乐高式的。

另外还记得最初在写回测系统的时候感觉很奇怪:逻辑看起来很简单,无非是加加减减,一边是资产,一边是股票,但是这两个部分有交互,然后还有一些规则限制(比如回撤,止盈),然后就变得很复杂,非常反直觉。我在想,大概这个就是系统问题:涌现性。单个问题很简单,交互在一起就很复杂。这时候如果要从第三人称,或者上帝视角去描述这种问题,可能不太适合用语言或者简单的线性逻辑。

最后一个问题是,有没有用?

我觉得是用的,可以满足我的短期需求,也可以不断进化来满足未来需求。

短期需求:构造一个简单高效的回测系统,来执行基准策略。同时,可以进行整体的汇总观察,和时间序列对接。

像这样的图,过去我是没有的。我只记录了关键时刻的数据,还想着是不是要通过数据处理来补(好傻)。这次做完原型,很简单的就画出来了,然后有几个标的执行几个,然后存到时序库里,直接统计就行了。

画图的代码也很简单,把日期设为索引,然后画图就好了。(pd.to_datetime要函数式调用,嗯,其实我有更高效的方法,平时用太少了,以后再说)

import cufflinks as cf
from plotly.offline import iplot, init_notebook_mode
# 1
cf.go_offline()res_df['dt'] = pd.to_datetime(res_df['dt'] )
res_df.set_index('dt', inplace=True)
res_df.iplot( title='Capital Change Over Time', xTitle='Day', yTitle='¥')

在这里插入图片描述

最后总结:基于系统的设计是非常重要的,纲举目张。后面几年会找到那个中间层,让所有的努力都汇聚起来,产生价值。

相关文章:

  • SQL注入 01
  • 机器学习专栏(4):从数据饥荒到模型失控,破解AI训练的七大生死劫
  • 实现对象之间的序列化和反序列化
  • Kubernetes控制平面组件:调度器Scheduler(一)
  • Java 软件测试开发相关资源
  • DSA数据结构与算法 6
  • 快速从S32K358切换到328
  • 在阿里云和树莓派上编写一个守护进程程序
  • NLP 梳理03 — 停用词删除和规范化
  • Python 深度学习实战 第11章 自然语言处理(NLP)实例
  • 嵌入式芯片中的 SRAM 内容细讲
  • 4.20刷题记录(单调栈)
  • 非参数检验题目集
  • 将 JSON 字符串转化为对象的详细笔记 (Java示例)
  • 使用安全继电器的急停电路设计
  • TCP常见知识点整理
  • 关于TCP三次握手和四次挥手过程中的状态机、使用三次握手和四次挥手的原因、拥塞控制
  • Matlab 五相电机仿真
  • Pandas的应用
  • 栈和队列(C语言)
  • 民生访谈|让餐饮店选址合规性可查、社区妙趣横生,上海有实招
  • 高明士︱纪念坚苦卓绝的王寿南先生
  • 18米巨作绘写伏羲女娲,安徽展石虎最后十年重彩画
  • AI换脸侵权案入选最高法典型案例:明晰人工智能使用边界
  • 广东省东莞市委原书记、市人大常委会原主任徐建华被开除党籍
  • 阿塞拜疆总统阿利耶夫将访华