因为证监会的监管,之前聚宽使用的券商通道被禁止,我们这些技术韭菜不得不寻找其他出路,解决这个重大难题。

在和蒋老师深入交流后,终于成功搭建好了聚宽对接qmt的全流程服务。

先上图看效果:

        目前测试效果还算满意,策略延误时间在3-10s以内。扣掉滑点、手续费等影响,整体实盘与模拟盘误差在1%左右。

        不废话,上代码:

import redis
import redis as r
import json
import time
from functools import wraps
from kuanke.user_space_api import *

"""
通过Redis的Stream模式传递交易信号

注意:
    1)使用前将RedisTrade类中的host,port, password改成自己的
    2)选择Redis模式:pattern=0,pubsub模式;pattern=1,stream模式
    3)选择策略mode:mode = 0, 测试:策略历史回测(back_test)时,发送Redis信号;
                  mode = 1, 正式:策略模拟交易(sim_trade)时,发送Redis信号。
"""

__version__ = '20250114'

class RedisTrade:


    host = '' # 此处填写你自己服务器的域名或ip
    port = 
    password = ''   # 此处填写你自己服务器的密码
    pattern = 1  # 0:PUBSUB模式,1:STREAM模式  01
    mode =1 # 0: 测试,1:正式

    @staticmethod
    def trade_signal(func):

        @wraps(func)
        def wrapper(*args, **kwargs):
            context = kwargs.get('context') or args[0]
            security = args[1].security if len(args) == 2 else kwargs.get('security') or args[1]
            # 下单前的现金、股票数量
            pre_cash = context.portfolio.available_cash
            pre_amt = 0
            if security in context.portfolio.positions:
                pre_amt = context.portfolio.positions[security].total_amount
            my_order = func(*args, **kwargs)
            if my_order is not None:
                order_amt = my_order.amount
                limit_stings = str(my_order.style)
                p1 = limit_stings.find('=')
                limit_price = float(limit_stings[p1 + 1:])
                order_price = max(my_order.price, limit_price)
                cedan_status = str(my_order.status)
                cedan = 1 if cedan_status == 'canceled' else 0
                if my_order.is_buy:  # 买入,看现金
                    new_cash = context.portfolio.available_cash  # 10W
                    pct = round(1.0 - new_cash / pre_cash, 8)  # 1 - 10/15 = 1/3, 即买入现有现金的1/3
                else:  # 卖出,看持仓
                    pct = round(order_amt / pre_amt, 8)  # 1-500/2000 = 3/4, 即卖出现持仓的股票的3/4
                    
#                     if security not in context.portfolio.positions:
#                         new_amt = 0
#                     else:
#                         new_amt = context.portfolio.positions[security].total_amount  # 卖出后,持有的数量500股
#                     pct = round(1.0 - new_amt / pre_amt, 8)  # 1-500/2000 = 3/4, 即卖出现持仓的股票的3/4
                    
                    
                #
                data = {
                    'time': my_order.add_time.strftime('%Y-%m-%d %H:%M:%S'),
                    'action': 'BUY' if my_order.is_buy else 'SELL',
                    'code': security,
                    'pct': pct,
                    'strategy': g.strategy,
                    'price': order_price,
                    'cancel_order': cedan
                }

                log.info(data)
          
                if context.run_params.type == 'sim_trade' or RedisTrade.mode == 0:  # mode = 0: 测试; mode == 1:正式
                    try:
                        rds = RedisTrade._open()
                        if RedisTrade.pattern == 0:  # 0:PUBSUB模式,1:STREAM模式
                            rds.publish(g.strategy, json.dumps(data))
                        else:
                            rds.xadd(g.strategy, data, maxlen=200)
                    except Exception as e:
                        log.error(repr(e))

            return my_order

        return wrapper

    @staticmethod
    def _open():
        if hasattr(g, 'rds_connected') and g.rds_connected:
            rds = g.__dict__.get('__redis')
            if rds:
                return rds

        pool = redis.ConnectionPool(
            host=RedisTrade.host,
            port=RedisTrade.port,
            password=RedisTrade.password)
        rds = redis.Redis(connection_pool=pool)
        rds.auto_close_connection_pool = True

        g.__dict__.update({'__redis': rds})
        g.rds_connected = True

        return rds

    @staticmethod
    def close():
        if hasattr(g, 'rds_connected') and (not g.rds_connected):
            return
        try:
            rds = g.__dict__.get('__redis')  # type: redis.Redis
            g.__dict__.update({'__redis': None})
            if rds:
                rds.connection_pool.disconnect()
        except Exception as e:
            log.error(repr(e))
        finally:
            g.rds_connected = False


@RedisTrade.trade_signal
def order_(context, *args, **kwargs):
    _order = order(*args, **kwargs)
    return _order


@RedisTrade.trade_signal
def order_target_(context, *args, **kwargs):
    _order = order_target(*args, **kwargs)
    return _order


@RedisTrade.trade_signal
def order_value_(context, *args, **kwargs):
    _order = order_value(*args, **kwargs)
    return _order


@RedisTrade.trade_signal
def order_target_value_(context, *args, **kwargs):
    _order = order_target_value(*args, **kwargs)
    return _order


@RedisTrade.trade_signal
def cancel_order_(context, orderid):
    _order = cancel_order(orderid)
    # time.sleep(3)
    return _order

上传至聚宽环境中。

下一步,修改聚宽的策略:

主要为四块:

1) from redistrade import *
引入"研究"中的redistrade.py中包括的类和函数。
2)策略initialize中,增加一个全局变量:g.strategy,例如:

g.strategy = 'smallgo'  # 策略名

3)增加一个定时运行:after_market_close,用于休市时关闭掉Redis。

# 收盘后运行
    run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')

def after_market_close(context):
    RedisTrade.close()

4)修改策略的下单函数order系列。例如:

order_target(stock, 0) 修改为: order_target_(context, stock, 0)
order_target_value(stock, value) 修改为: order_target_value_(context, stock, value)

以上,聚宽端的数据就算搭建完毕。

之后本地接收数据,并进行交易的代码:

# coding:utf-8
import time, datetime, traceback, sys
import logging
import redis
from multiprocessing import Manager

from xtquant import xtdata, xttype
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant

from db_manager import DatabaseManager


"""
通过Redis的Stream模式获取交易信号,并驱动miniQMT下单

主要特性:
1.  接收聚宽发送的百分比下单信号,并根据本地数据库中的资金量进行下单,策略实际资金可与聚宽不同,本地数据库中的数据也方便自行调整。
2.  可同时监听多个策略的信号并下单,策略之间资金互相独立。
3.  Stream模式获取交易信号可避免网络等原因导致的信号丢失。
4.  加入miniQMT的交易接口断开时自动重连逻辑。
5.  下单时加入了时间差检查和滑点检查。

注意:
    1) 使用前将redis.StrictRedis方法中的host,port,password以及miniQMT的账号信息改成自己的
    2) 下单依赖本地数据库,请使用db_manager.py文件中的create_strategy_table方法创建本地数据库和策略表,仅监听和操作本地数据库中存在的策略
    3) 为保证成交,委托均为对手方最优价的市价委托(实际委托以涨停或跌停价格发布),可根据实际情况自行修改。
"""

DB_NAME = 'strategy_data.db'
acc = StockAccount('', 'STOCK')  # 填写你自己的账号id信息
path = r'' # 填写你自己的userdata_mini路径

time_check = True   # 是否检查下单时间差
SlippagePct = 0.02 # 买入时允许下单的滑点阈值


console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(fmt="%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s - %(message)s",
                              datefmt="%Y-%m-%d %H:%M:%S")
console_handler.setFormatter(formatter)
nowtime = time.strftime("%Y%m%d_%H%M%S")
logpath = fr".\log\qmt_{nowtime}.log"
file_handler = logging.FileHandler(logpath, mode="a", encoding="utf-8")
file_handler.setFormatter(formatter)
logger = logging.getLogger("joinquant_to_qmt")
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)
logger.addHandler(file_handler)

# 定义一个类 创建类的实例 作为状态的容器
class _a():
    pass


A = _a()
A.bought_list = []
A.hsa = xtdata.get_stock_list_in_sector('沪深A股')


def interact():
    """执行后进入repl模式"""
    import code
    code.InteractiveConsole(locals=globals()).interact()


xtdata.download_sector_data()


class freeze_hanadler:

    @staticmethod
    def pre_add_cash(freeze_dict, strategy_name, order_seq, vol, price):
        ca = vol * price
        logger.debug(f"预存入策略{strategy_name}资金{vol}*{price}={ca},order_seq:{order_seq}")
        strategy_dict = freeze_dict.setdefault(strategy_name, p_manager.dict())
        strategy_dict[f"seq{order_seq}"] = [-vol, price]

    @staticmethod
    def freeze_cash(freeze_dict, strategy_name, order_seq, vol, price):
        ca = vol * price
        logger.debug(f"冻结策略{strategy_name}资金{vol}*{price}={ca},order_seq:{order_seq}")
        strategy_dict = freeze_dict.setdefault(strategy_name, p_manager.dict())
        strategy_dict[f"seq{order_seq}"] = [vol, price]

    @staticmethod
    def unfreeze_cash(freeze_dict, strategy_name, order_id, vol_change):
        strategy_dict = freeze_dict.get(strategy_name)
        logger.debug(f"当前冻结字典:{freeze_dict},策略{strategy_name}冻结字典:{strategy_dict},成交order_id:{order_id}")
        if strategy_dict:
            vol, price = strategy_dict.get(f"ord{order_id}")
            # vol_change买入为正,卖出为负
            ca_change = vol_change * price
            if ca_change > 0:
                logger.debug(f"解冻策略{strategy_name}资金{vol_change}*{price}={ca_change},order_id:{order_id}")
            else:
                logger.debug(f"删除策略{strategy_name}预存入资金{-vol_change}*{price}={-ca_change},order_id:{order_id}")
            new_vol = vol - vol_change
            if new_vol == 0:
                strategy_dict.pop(f"ord{order_id}")
            else:
                strategy_dict[f"ord{order_id}"] = [new_vol, price]

    @staticmethod
    def change_seq_to_id(freeze_dict, strategy_name, order_seq, order_id):
        strategy_dict = freeze_dict.get(strategy_name)
        logger.debug(f"策略{strategy_name}冻结字典:{strategy_dict},order_seq:{order_seq}, order_id:{order_id}")
        if strategy_dict:
            strategy_dict[f"ord{order_id}"] = strategy_dict.pop(f"seq{order_seq}")

    @staticmethod
    def get_frozen_cash(freeze_dict, strategy_name):
        frozen_cash = 0
        strategy_dict = freeze_dict.get(strategy_name)
        if strategy_dict:
            for vol, price in strategy_dict.values():
                frozen_cash += vol * price
        logger.debug(f"策略总计冻结资金{frozen_cash}")
        return frozen_cash



class MyXtQuantTraderCallback(XtQuantTraderCallback):

    def __init__(self, manager, db_name, xt_trader, freeze_dict):
        self.logger = logging.getLogger("MyXtQuantTraderCallback")
        self.logger.setLevel(logging.DEBUG)
        self.logger.addHandler(console_handler)
        self.logger.addHandler(file_handler)
        self.db_manager = manager
        self.db_name = db_name
        self.xt_trader = xt_trader
        self.freeze_dict = freeze_dict
        # self.call_back_num = 0

    def on_disconnected(self):
        """
        连接断开
        :return:
        """
        self.logger.warning("连接断开回调: connection lost, 交易接口断开")
        self.xt_trader.connection_lost()

    def on_stock_order(self, order):
        """
        委托回报推送
        :param order: XtOrder对象
        :return:
        """
        self.logger.info(f'委托回调:{order.order_remark}')

    def on_stock_trade(self, trade: xttype.XtTrade):
        """
        成交变动推送
        :param trade: XtTrade对象
        :return:
        """
        self.logger.info(f'成交回调: {trade.order_remark}')
        with self.db_manager(self.db_name) as db_m:
            strategy = trade.strategy_name
            code = trade.stock_code
            filled_price = trade.traded_price
            filled_volume = trade.traded_volume
            traded_amount = trade.traded_amount
            action = trade.order_type  # 交易方向: 买入或卖出
            order_id = trade.order_id
            commission = trade.commission
            self.logger.info(f"成交详情:strategy:{strategy},code:{code},filled_price:{filled_price},"
                             f"filled_volume:{filled_volume},traded_amount:{traded_amount},action:{action},"
                             f"order_id:{order_id},commission:{commission}")
            self.logger.debug(f"当前冻结字典:{self.freeze_dict[strategy]}")
            if action == xtconstant.STOCK_BUY:
                db_m.update_position_and_funds(strategy, code, filled_volume, -traded_amount)
                freeze_hanadler.unfreeze_cash(self.freeze_dict, strategy, order_id, filled_volume)
            elif action == xtconstant.STOCK_SELL:
                db_m.update_position_and_funds(strategy, code, -filled_volume, traded_amount)
                freeze_hanadler.unfreeze_cash(self.freeze_dict, strategy, order_id, -filled_volume)


    def on_order_error(self, order_error):
        """
        委托失败推送
        :param order_error:XtOrderError 对象
        :return:
        """

        self.logger.warning(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")

    def on_cancel_error(self, cancel_error):
        """
        撤单失败推送
        :param cancel_error: XtCancelError 对象
        :return:
        """
        self.logger.warning(f"{sys._getframe().f_code.co_name}:cancel_error{cancel_error}" )

    def on_order_stock_async_response(self, response):
        """
        异步下单回报推送
        :param response: XtOrderResponse 对象
        :return:
        """
        self.logger.info(f"异步委托回调 order_remark{response.order_remark}, order_id{response.order_id}, seq{response.seq}")
        self.logger.debug(f"当前冻结字典:{self.freeze_dict[response.strategy_name]}")
        freeze_hanadler.change_seq_to_id(self.freeze_dict, response.strategy_name,response.seq, response.order_id)

    def on_cancel_order_stock_async_response(self, response):
        """
        :param response: XtCancelOrderResponse 对象
        :return:
        """
        self.logger.info(f"{sys._getframe().f_code.co_name}:response{response}" )

    def on_account_status(self, status):
        """
        :param response: XtAccountStatus 对象
        :return:
        """
        self.logger.info(f"{sys._getframe().f_code.co_name}:status{status.status}")


class MyXtTrader:

    def __init__(self, xt_acc, path, freeze_dict):
        self._logger = logging.getLogger("MyXtTrader")
        self._logger.setLevel(logging.DEBUG)
        self._logger.addHandler(console_handler)
        self._logger.addHandler(file_handler)
        self._freeze_dict = freeze_dict
        self._connected = False
        self._xt_trader = None
        self._xt_acc = xt_acc
        self._path = path
        self._try_connect()

    def connection_lost(self):
        self._connected = False
        self._xt_trader = None

    def _create_trader(self, session_id):
        # 创建交易回调类对象,并声明接收回调
        trader = XtQuantTrader(self._path, session_id, callback=MyXtQuantTraderCallback(DatabaseManager, DB_NAME, self, self._freeze_dict))
        # 开启主动请求接口的专用线程 开启后在on_stock_xxx回调函数里调用XtQuantTrader.query_xxx函数不会卡住回调线程,但是查询和推送的数据在时序上会变得不确定
        # 详见: http://docs.thinktrader.net/vip/pages/ee0e9b/#开启主动请求接口的专用线程
        trader.set_relaxed_response_order_enabled(True)
        # 启动交易线程
        trader.start()
        connect_result = trader.connect()
        self._logger.info(f'建立交易连接,返回0表示连接成功:{connect_result}', )
        # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
        subscribe_result = trader.subscribe(self._xt_acc)
        self._logger.info(f'对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功:{subscribe_result}', )
        return trader if connect_result == 0 else None

    def _try_connect(self):
        session_id_range = [i for i in range(100, 120)]

        import random
        random.shuffle(session_id_range)

        # 遍历尝试session_id列表尝试连接
        for session_id in session_id_range:
            trader = self._create_trader(session_id)
            if trader:
                self._logger.info(f'连接成功,session_id:{session_id}')
                self._xt_trader = trader
                self._connected = True
                return
            else:
                self._logger.info(f'连接失败,session_id:{session_id},继续尝试下一个id')
                continue

        self._logger.info('所有id都尝试后仍失败,放弃连接')
        # return None

    def __getattr__(self, item):
        if not self._connected:
            self._logger.info("connection lost, 交易接口断开,即将重连")
            self._try_connect()
        return getattr(self._xt_trader, item)



def order_handle(xt_trader, freeze_dict, msg, db_m):
    # 请在此处自己coding, 根据msg给交易端下单
    # {'time': '2019-01-02 09:38:00', 'action': 'BUY', 'code': '511010.XSHG',
    # 'pct': 0.0116, 'strategy': 'my_test', 'price': 116.409, 'cancel_order': 0}
    # logger.info(f"下单信息:{msg}")
    #取账号信息
    account_info = xt_trader.query_stock_asset(acc)
    # logger.info(account_info)
    stock = ret_code(msg['code'])
    full_tick = xtdata.get_full_tick([stock])
    if not full_tick:
        logger.warning(f"无法获取当前{stock}行情, 下单取消!")
        return
    logger.info(f"{stock} 全推行情: {full_tick}")
    current_price = full_tick[stock]['lastPrice']
    msg_price = float(msg['price'])
    logger.info(f"当前每股金额{current_price},策略信号每股金额{msg_price}")
    sp = 0
    if msg_price > 0:
        sp = current_price / msg_price - 1
        logger.info(f"预计滑点{sp * 100:.3f}%")
    info_time = msg['time']
    info_time = time.mktime(time.strptime(info_time, "%Y-%m-%d %H:%M:%S"))
    time_delay = time.time() - info_time
    logger.debug(f"下单时间与当前时间偏差:{time_delay}s")

    if time_check and time_delay > 1800000:  # 允许最多30分钟下单时间偏差,防止延迟交易信号异常发送
        logger.error(f"时间偏差过大,可能为回测或延迟交易信号,下单取消!")
        return

    if msg['action'] == 'BUY':
        if sp > SlippagePct:  # 仅在买入时检查滑点
            logger.error(f'当前滑点大于设置滑点,下单取消!')
            return
        # 取可用资金
        available_cash = account_info.m_dCash
        logger.info(f'账号{acc.account_id}总可用资金{available_cash}')
        strategy_cash = db_m.get_available_funds(msg['strategy'])
        strategy_alv_cash = strategy_cash - freeze_hanadler.get_frozen_cash(freeze_dict,msg['strategy'])
        logger.info(f"策略{msg['strategy']}总现金{strategy_cash},可用资金{strategy_alv_cash}")
        # 买入金额
        buy_amount = min(available_cash, strategy_alv_cash) * float(msg['pct'])
        # 买入数量 取整为100的整数倍
        buy_vol = int(buy_amount / current_price / 100) * 100
        logger.info(f"策略可用资金{strategy_alv_cash};账号可用资金 {available_cash};目标买入金额 {buy_amount};买入股数 {buy_vol}股")
        if buy_vol > 0:
            async_seq = xt_trader.order_stock_async(acc, stock, xtconstant.STOCK_BUY, buy_vol,
                                                    xtconstant.MARKET_PEER_PRICE_FIRST,
                                                    0,
                                                    msg['strategy'], stock)
            if async_seq > 0:
                logger.debug(f"下单序号{async_seq}")
                freeze_hanadler.freeze_cash(freeze_dict, msg['strategy'], async_seq, buy_vol, current_price)
                logger.info(f"买入下单完成 等待回调")
            else:
                logger.error("买入下单失败")
        else:
            logger.warning(f"可用资金不足,下单取消!")

    elif msg['action'] == 'SELL':
        # 查账号持仓
        positions = xt_trader.query_stock_positions(acc)
        # 取各品种 总持仓 可用持仓
        position_total_dict = {i.stock_code: i.m_nVolume for i in positions}
        position_available_dict = {i.stock_code: i.m_nCanUseVolume for i in positions}
        logger.info(f'{acc.account_id}持仓字典{position_total_dict}')
        logger.info(f'{acc.account_id}可用持仓字典{position_available_dict}')

        # 可用数量
        available_vol = position_available_dict[stock] if stock in position_available_dict else 0
        # 策略现有数量
        strategy_vol = db_m.get_position(msg['strategy'], stock)
        # 目标卖出数量
        target_vol = int(strategy_vol * float(msg['pct']) / 100) * 100
        # 卖出量取目标量与可用量中较小的
        sell_vol = min(target_vol, available_vol)
        logger.info(f"策略现有股数{strategy_vol};{stock} 目标卖出量 {target_vol} 可用数量 {available_vol} 卖出 {sell_vol}股")
        if sell_vol > 0:
            async_seq = xt_trader.order_stock_async(acc, stock, xtconstant.STOCK_SELL, sell_vol,
                                                    xtconstant.MARKET_PEER_PRICE_FIRST,
                                                    0,
                                                    msg['strategy'], stock)
            if async_seq > 0:
                logger.debug(f"下单序号{async_seq}")
                freeze_hanadler.pre_add_cash(freeze_dict, msg['strategy'], async_seq, sell_vol, current_price)
                logger.info(f"卖出下单完成 等待回调")
            else:
                logger.error("卖出下单失败")

    else:
        logger.info(f"错误的action:{msg['action']}")


def ret_code(stock_code):  # 将收到的股票代码进行转换   含基金
    if stock_code[:2] in ["60", "68", "90", "50", "51"]:
        return stock_code[:6] + ".SH"
    elif stock_code[:2] in ["00", "30", "15"] or "XSHG" in stock_code:  #
        return stock_code[:6] + ".SZ"
    elif stock_code[:2] in ["43", "83", "87"]:
        return stock_code[:6] + ".BJ"
    elif "XSHG" in stock_code:
        return stock_code[:6] + ".SH"
    elif "XSHE" in stock_code:
        return stock_code[:6] + ".SZ"
    else:
        return stock_code


if __name__ == '__main__':

    logger.info("start")
    # 初始化数据库管理器
    with DatabaseManager(DB_NAME) as db_manager:
        strategy_names = db_manager.get_all_strategy_names()
    logger.info(f"监听中的策略:{strategy_names}")
    stream_dict = {}
    for name in strategy_names:
        stream_dict[name] = '0' # 初始ID


    with Manager() as p_manager:
        freeze_dict = p_manager.dict()

        my_trader = MyXtTrader(acc, path, freeze_dict)

        # 连接到 Redis 服务器, 改为你自己的域名和密码
        redis_client = redis.StrictRedis(host='122.51.215.56',
                                         port=6379,
                                         password='166858kdp',
                                         decode_responses=True)

        logger.info("从Redis流中读取最新的订单信号")
        last_time = time.time()
        #freeze_info.freeze_cash("my_t", 10, 12345)

        while True:
            # 从Redis流中读取最新的订单信号
            response = redis_client.xread(stream_dict, block=100)
            # response = redis_client.get(stream_dict)
            #print(response)


            now_time = time.time()
            timedelta = now_time-last_time
            last_time = now_time
            #print('.', end="")
            # print(round(timedelta*1000), end=" ")
            if timedelta > 100000:
                logger.warning(f"long timedelta:{timedelta}")
            if response:
                logger.info(f"timedelta:{timedelta}")
                logger.info(f"len_response:{len(response)}")
                for stream, messages in response:
                    del_ids = []
                    logger.info(f"len_messages:{len(messages)}")
                    for msg_id, order_data in messages:
                        # order_data = json.loads(order_data)
                        logger.info(f"stream:{stream}, msg_id:{msg_id}, 下单信息:{order_data}")
                        with DatabaseManager(DB_NAME) as db_manager:
                            order_handle(my_trader, freeze_dict, order_data, db_manager)
                        # 更新 last_id 并删除处理过的消息
                        stream_dict[stream] = msg_id
                        # redis_client.xdel(stream, msg_id)
                        del_ids.append(msg_id)
                        # logger.info(f"Outer call_back_num: {callback.call_back_num}, ID:{id(callback.call_back_num)}")
                    if del_ids:
                        redis_client.xdel(stream, *del_ids)


通过这段代码就可以在本地接收命令,并进行下单交易。

如果还有不懂的朋友,欢迎点击我微信咨询沟通。

可以提供QMT,Ptrade等专业辅导,交流技术,发送学习礼包。

Logo

专业量化交易与投资者大本营

更多推荐