聚宽交易系统4.0---年华70%大小外择时小市值3.0策略设置
文章声明:本内容为个人的业余研究,和任何单位,机构没有关系,文章出现的股票代码,全部只是测试例子,不做投资参考,投资有风险,代码学习使用,不做商业用途最近把大家反馈的问题修改完成了西蒙斯聚宽交易成交系统高速服务器4,大qmt的没有什么问题可以放心使用,修改了实盘半年下来遇到的全部问题使用的教程在修改miniqmt版本这几天上线这里我利用策略为例子给大家测试怎么样交易策略的原理。
文章声明:本内容为个人的业余研究,和任何单位,机构没有关系,文章出现的股票代码,全部只是测试例子,不做投资参考,投资有风险,代码学习使用,不做商业用途
最近把大家反馈的问题修改完成了西蒙斯聚宽交易成交系统高速服务器4,大qmt的没有什么问题可以放心使用,修改了实盘半年下来遇到的全部问题
使用的教程小果qmt聚宽交易系统实盘--小果黄金美股红利三剑客轮动策略为例子
在修改miniqmt版本这几天上线
这里我利用策略为例子给大家测试怎么样交易
策略的原理
核心逻辑
-
市场温度判断:
-
冷市:选择低PB(<1)、现金流好、盈利稳定的价值股
-
暖市:选择适度PB(<1)、成长性较好的股票
-
热市:选择高PB(>3)、高增长的成长股
-
通过沪深300指数过去220天的位置判断市场状态(冷/暖/热)
-
不同市场温度下采用不同的选股标准:
-
-
大小盘轮动:
-
大市值占优时选择沪深300成分股(白马股策略)
-
小市值占优时选择中小板指成分股(小市值策略)
-
都不佳时配置海外ETF
-
每月初通过比较大小盘股近期表现决定配置方向:
-
-
特殊股票处理:
-
对涨停股特殊处理:次日观察,若不再涨停则卖出
-
止损机制:个股亏损8%时止损
-
补跌机制:对表现最差的持仓股进行补仓
-
选股方法
-
大市值策略:
-
从沪深300中筛选
-
根据市场温度选择不同财务特征的股票
-
按ROA/PB等指标排序
-
-
小市值策略:
-
从中小板指中筛选
-
选择市值5-30亿的小盘股
-
要求净利润、营收为正且满足一定规模
-
按市值从小到大排序
-
风控措施
-
过滤ST股、次新股、涨跌停股
-
个股止损(8%)
-
涨停股次日观察机制
-
补跌机制控制回撤
-
设置交易成本(万3)和零滑点
交易执行
-
每月初调仓
-
每日尾盘处理涨停股和止损
-
每周清理补跌仓位
该策略试图通过市场状态判断、大小盘轮动和多因子选股相结合的方式获取超额收益,同时通过严格的风控
点击运行的回测效果
点击服务器可以看全部的数据http://101.34.65.108:8888/
实盘设置
输入账户
选择下单默认,测试的时候是否参数,后面改成是
实盘的时候改成否
输入策略的授权码比如 大小外择时小市值3.0
和服务器一一对应,一个策略一个授权码找我要就可以
设置跟单比如比如0.5是50%的意思
比如聚宽下单1000,跟单比例0.5,策略就下单500
点击运行测试一下数据
没有问题挂模型交易就可以
点击实盘点击运行
交易的结果
下单结果
委托
测试没有问题改成实盘,只需要该一个参数是否测试参数改成否
重新点击一下策略启动就可以
没有跟单的数据
聚宽源代码我全部上传了知识星球可以下载
交易系统的部分源代码,代码有1000多行太长了,完整真正实盘交易的模型,很复制
#encoding:gbk
'''
西蒙斯聚宽交易成交系统高速服务器4
完善了撤单在委托下单函数
作者:西蒙斯量化
微信:xg_quants
时间:20250614
优化内容
1:交易检查功能,自动补单
2:5分钟不成交撤单了重新下单
3:优化了交易算法,交易细节
测试的参数设置*******************,全部在下面的text={}里面该对应的参数就可以
"是否开启测试":"是",
时间设置":"时间设置********",
"交易时间段":8,
"交易开始时间":0,
"交易结束时间":24,
"是否参加集合竞价":"否",
"开始交易分钟":0,
**************************
实盘参数,只需要把是否测试改成就可以否*****************全部在下面的text={}里面该对应的参数
"是否开启测试":"否",
时间设置":"时间设置********",
"交易时间段":4,
"交易开始时间":9,
"交易结束时间":14,
"是否参加集合竞价":"否",
"开始交易分钟":0,
教程 https://gitee.com/li-xingguo11111/joinquant_trader_miniqmt
大qmt https://gitee.com/li-xingguo11111/joinquant_trader_bigqmt
高频使用循环模式,低频使用定时
数值 描述
-1 无效(只对于algo_passorder起作用)
0 卖5价
1 卖4价
2 卖3价
3 卖2价
4 卖1价
5 最新价
6 买1价
7 买2价(组合不支持)
8 买3价(组合不支持)
9 买4价(组合不支持)
10 买5价(组合不支持)
'''
import pandas as pd
import numpy as np
import talib
import requests
import json
from datetime import datetime
import math
import time
text={
"账户":"55001948",
"账户支持融资融券":"账户支持融资融券,账户类型STOCK/CREDIT",
"账户类型":"STOCK",
"买入价格编码":5,
"卖出价格编码":5,
"黑名单说明":"黑名单里面的标的,不会买入也不会卖出",
"黑名单":['600031.SH'],
"聚宽跟单":"跟单原理",
"服务器设置":"服务器跟单设置",
"服务器":"http://101.34.65.108",
"端口":"8888",
"服务器编码":"63d85b6189e42cba63feea36381da615c31ad8e36ae420ed67f60f3598efc9ad",
"测试说明":"开启测试就是选择历史交易不开启就是选择今天的数据",
"是否开启测试":"否",
"测试数量":200,
"测试长度":10,
"跟单设置":"跟单设置***********",
"账户跟单比例":1,
"多策略用逗号隔开":"多策略用逗号隔开********",
"组合名称":["大小外择时小市值3.0"],
"组合授权码":["大小外择时小市值3.0"],
"组合跟单比例":[0.5],
"不同策略间隔更新时间":0,
"时间设置":"时间设置********",
"交易时间段":8,
"交易开始时间":0,
"交易结束时间":24,
"是否参加集合竞价":"否",
"开始交易分钟":0,
}
#记录临时id,避免循环下没有用的单子
class a:
pass
a=a()
a.log_id=[]
def init(c):
#账户
c.account=text['账户']
#账户类型
c.account_type=text['账户类型']
if c.account_type=='stock' or c.account_type=='STOCK':
c.buy_code=23
c.sell_code=24
else:
#融资融券
c.buy_code=33
c.sell_code=34
c.buy_price_code=text['买入价格编码']
c.sell_price_code=text['卖出价格编码']
c.del_trader_list=text['黑名单']
c.url=text['服务器']
c.port=text['端口']
print('小果服务器提供数据支持************服务器{} 端口{}'.format(c.url,c.port))
#定时模式
#c.run_time("update_all_data","1nDay","2024-07-25 09:45:00")
#c.run_time("update_all_data","1nDay","2024-07-25 14:45:00")
print(get_account(c,c.account,c.account_type))
print(get_position(c,c.account,c.account_type))
#循环模式3秒
c.run_time("update_all_data","3nSecond","2024-07-25 13:20:00")
#交易检查函数10秒一次
c.run_time("run_check_trader_func","10nSecond","2024-07-25 13:20:00")
#撤单了重新下单5分钟一次
c.run_time("run_order_trader_func","300nSecond","2024-07-25 13:20:00")
run_check_trader_func(c)
def handlebar(c):
pass
def tarder_test(c):
print('交易测试***************')
stock='513100.SH'
amount=100
maker='交易测试'
passorder(23, 1101, c.account, stock, 5, 0, amount, maker,1,maker,c)
def get_del_buy_sell_data(c,name='测试1',password='大小外择时小市值3.0'):
'''
处理交易数据获取原始数据
'''
test=text['是否开启测试']
url=text['服务器']
port=text['端口']
url_code=text['服务器编码']
now_date=str(datetime.now())[:10]
xg_data=xg_jq_data(url=url,port=port,password=password,url_code=url_code)
info=xg_data.get_user_data(data_type='用户信息')
df=xg_data.get_user_data(data_type='实时数据')
print('用户信息')
print(info)
if df.shape[0]>0:
stats=df['数据状态'].tolist()[-1]
if stats==True:
df['证券代码']=df['股票代码'].apply(lambda x:str(x).split('.')[0]+'.SH' if str(x).split('.')[-1]=='XSHG' else str(x).split('.')[0]+'.SZ')
df['数据长度']=df['证券代码'].apply(lambda x: len(str(x)))
df['订单添加时间']=df['订单添加时间'].apply(lambda x :str(x)[:10])
df=df[df['数据长度']>=6]
df['交易类型']=df['买卖'].apply(lambda x: 'buy' if x==True else 'sell')
df=df.drop_duplicates(subset=['股票代码','下单数量','买卖','多空'],keep='last')
df['组合名称']=str(name)
df['组合授权码']=str(password)
df['策略名称']=str('聚宽跟单')
df['订单ID']=df['订单ID'].astype(str)
df['证券代码']=df['证券代码'].apply(lambda x: str(x))
df['投资备注']=df['组合授权码']+','+df['证券代码']+','+df['订单ID']
if test=='是':
print('开启测试模式,实盘记得关闭')
df=df
else:
df=df[df['订单添加时间']==now_date]
else:
df=pd.DataFrame()
else:
df=pd.DataFrame()
test=text['是否开启测试']
n=text['测试长度']
if test=='是':
print('开启数据测试模型实盘记得关闭***********测试长度{}'.format(n))
df=df[-n:]
else:
df=df
if df.shape[0]>0:
print('组合 {} 策略授权码 {} {}今天跟单数据*********************'.format(name,password,now_date))
print(df)
else:
print('组合 {} 策略授权码 {} {}今天没有跟单数据*********************'.format(name,password,now_date))
return df
def check_is_sell(c,accountid,datatype,stock='513100.SH',amount=100):
'''
检查是否可以卖出
'''
position=get_position(c,accountid,datatype)
if position.shape[0]>0:
position=position[position['证券代码']==stock]
if position.shape[0]>0:
position=position[position['持仓量']>=10]
print(position)
if position.shape[0]>0:
hold_amount=position['持仓量'].tolist()[-1]
av_amount=position['可用数量'].tolist()[-1]
if av_amount>=amount and amount>=10:
return True
elif av_amount< amount and av_amount>=10:
return True
else:
return False
else:
return False
else:
return False
def check_is_buy(c,accountid,datatype,stock='513100.SH',amount=100,price=1.3):
'''
检查是否可以买入
'''
account=get_account(c,accountid,datatype)
#可以使用的现金
av_cash=account['可用金额']
value=amount*price
if av_cash>=value:
return True
else:
return False
def get_trader_data(c,name='测试',password='大小外择时小市值3.0',zh_ratio=1):
'''
获取交易数据
组合的跟单比例
'''
test=text['是否开启测试']
adjust_ratio=text['账户跟单比例']
#读取跟单数据
df=get_del_buy_sell_data(c,name=name,password=password)
try:
df['证券代码']=df['证券代码'].apply(lambda x: '0'*(6-len(str(x)))+str(x))
except:
pass
trader_log=get_order(c,c.account,c.account_type)
now_date=str(datetime.now())[:10]
#剔除撤单废单
not_list=[54,57]
if trader_log.shape[0]>0:
trader_log['撤单']=trader_log['委托状态'].apply(lambda x: '是' if x in not_list else '不是')
trader_log=trader_log[trader_log['撤单']=='不是']
else:
trader_log=trader_log
if trader_log.shape[0]>0:
trader_log['证券代码']=trader_log['证券代码'].apply(lambda x: '0'*(6-len(str(x)))+str(x))
trader_log['组合授权码']=trader_log['投资备注'].apply(lambda x: str(x).split(',')[0])
trader_log['订单ID']=trader_log['投资备注'].apply(lambda x: str(x).split(',')[-1])
trader_log['订单ID']=trader_log['订单ID'].astype(str)
if test=='是':
trader_log=trader_log
else:
trader_log=trader_log
trader_log['组合授权码']=trader_log['组合授权码'].astype(str)
trader_log_1=trader_log[trader_log['组合授权码']==password]
if trader_log_1.shape[0]>0:
trader_id_list=trader_log_1['订单ID'].tolist()
else:
trader_id_list=[]
else:
trader_id_list=[]
trader_id_list=list(set(trader_id_list))
if df.shape[0]>0:
df['组合授权码']=df['组合授权码'].astype(str)
df['证券代码']=df['证券代码'].astype(str)
df['下单数量']=df['下单数量'].astype(int)
#df['订单ID ']=df['订单ID'].astype(str)
df=df[df['组合授权码']==password]
if df.shape[0]>0:
df['账户跟单比例']=adjust_ratio
df['组合跟单比例']=zh_ratio
df['交易检查']=df['订单ID'].apply(lambda x: '已经交易' if x in trader_id_list else '没有交易')
df=df[df['交易检查']=='没有交易']
amount_list=[]
if df.shape[0]>0:
for stock,amount,trader_type in zip(df['证券代码'].tolist(),df['下单数量'].tolist(),df['交易类型'].tolist()):
try:
price=get_price(c,stock=stock)
test=text['是否开启测试']
test_amount=text['测试数量']
if test=='是':
amount=test_amount
amount=adjust_amount(c,amount=amount,stock=stock)
else:
amount=amount*adjust_ratio*zh_ratio
amount=adjust_amount(c,amount=amount,stock=stock)
if trader_type=='buy':
try:
if trader_type=='buy' and amount>=10:
if check_is_buy(c,c.account,c.account_type,stock=stock,amount=amount,price=price):
amount_list.append(amount)
else:
print('{}账户资金无法买入{} 数量{} 价格{}'.format(name,stock,amount,price))
amount_list.append(0)
else:
print('{}账户资金无法买入{} 数量{} 低于最低数量10'.format(name,stock,amount,price))
amount_list.append(0)
except Exception as e:
print('组合{} 组合授权码{} {}买入有问题可能没有资金'.format(name,password,stock))
amount_list.append(0)
elif trader_type=='sell':
更多推荐
所有评论(0)