量化投资初探:搭建比特币智能交易机器人

Python实战:从零构建加密货币量化交易系统

一、加密货币量化交易革命

​比特币交易数据​​:

  • 全球日交易量:$300亿+
  • 量化交易占比:70%
  • 高频策略年化收益:200%+
  • 波动率:5-10%(日平均)
  • 交易机器人数量:100,000+

二、技术架构:智能交易系统设计

1. 系统架构图

2. 技术栈选择

模块 技术
数据获取 CCXT, Websockets
策略开发 Backtrader, PyAlgoTrade
回测引擎 Backtesting.py, Zipline
实时交易 CCXT, Binance API
风险管理 Pandas, NumPy
可视化 Plotly, Dash
部署 Docker, Kubernetes

三、环境准备:搭建量化交易平台

1. 安装核心库

# 基础库
pip install pandas numpy matplotlib

# 交易库
pip install ccxt backtrader backtesting.py ta

# 可视化
pip install plotly dash

# 异步处理
pip install asyncio websockets

2. 配置文件

# config.py
BINANCE_API_KEY = 'your_api_key'
BINANCE_SECRET_KEY = 'your_secret_key'

# 交易参数
SYMBOL = 'BTC/USDT'
TIMEFRAME = '1h'
INITIAL_BALANCE = 10000  # USDT
RISK_PER_TRADE = 0.01  # 1% per trade

四、数据获取:实时行情与历史数据

1. 实时行情获取

import ccxt
import pandas as pd

def get_real_time_data(symbol, timeframe):
    """获取实时行情数据"""
    exchange = ccxt.binance({
        'apiKey': BINANCE_API_KEY,
        'secret': BINANCE_SECRET_KEY,
        'enableRateLimit': True
    })
    
    # 获取最新K线
    ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=100)
    df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df.set_index('timestamp', inplace=True)
    return df

# 示例
btc_data = get_real_time_data('BTC/USDT', '1h')
print(btc_data.tail())

2. 历史数据下载

def download_historical_data(symbol, timeframe, since, limit=1000):
    """下载历史数据"""
    exchange = ccxt.binance()
    all_data = []
    
    while True:
        data = exchange.fetch_ohlcv(symbol, timeframe, since, limit)
        if not data:
            break
        since = data[-1][0] + 1
        all_data += data
        print(f"已获取 {len(all_data)} 条数据")
        if len(data) < limit:
            break
    
    df = pd.DataFrame(all_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df.set_index('timestamp', inplace=True)
    return df

# 示例:获取2023年比特币数据
btc_2023 = download_historical_data('BTC/USDT', '1d', exchange.parse8601('2023-01-01T00:00:00Z'))
btc_2023.to_csv('btc_2023.csv')

3. WebSocket实时数据

import asyncio
import websockets
import json

async def binance_websocket(symbol):
    """Binance WebSocket实时数据"""
    uri = f"wss://stream.binance.com:9443/ws/{symbol.lower()}@kline_1m"
    
    async with websockets.connect(uri) as websocket:
        while True:
            message = await websocket.recv()
            data = json.loads(message)
            kline = data['k']
            print(f"时间: {pd.to_datetime(kline['t'], unit='ms')} | "
                  f"开盘: {kline['o']} | 收盘: {kline['c']} | "
                  f"最高: {kline['h']} | 最低: {kline['l']} | "
                  f"成交量: {kline['v']}")

# 运行WebSocket
# asyncio.get_event_loop().run_until_complete(binance_websocket('btcusdt'))

五、策略开发:双均线交易策略

1. 策略原理

2. Backtrader实现

import backtrader as bt

class DualMovingAverage(bt.Strategy):
    """双均线交易策略"""
    params = (
        ('fast_period', 20),
        ('slow_period', 50),
    )
    
    def __init__(self):
        # 创建指标
        self.fast_ma = bt.indicators.SimpleMovingAverage(
            self.data.close, period=self.params.fast_period
        )
        self.slow_ma = bt.indicators.SimpleMovingAverage(
            self.data.close, period=self.params.slow_period
        )
        self.crossover = bt.indicators.CrossOver(self.fast_ma, self.slow_ma)
        
    def next(self):
        if not self.position:
            if self.crossover > 0:  # 金叉
                self.buy(size=self.broker.getvalue() * 0.99 / self.data.close[0])
        elif self.crossover < 0:  # 死叉
            self.sell(size=self.position.size)

# 回测函数
def backtest_strategy(data, strategy, cash=10000, commission=0.001):
    """策略回测"""
    cerebro = bt.Cerebro()
    cerebro.addstrategy(strategy)
    
    # 添加数据
    data_feed = bt.feeds.PandasData(dataname=data)
    cerebro.adddata(data_feed)
    
    # 设置初始资金和手续费
    cerebro.broker.setcash(cash)
    cerebro.broker.setcommission(commission=commission)
    
    # 添加分析器
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
    cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
    
    # 运行回测
    results = cerebro.run()
    strat = results[0]
    
    # 打印结果
    print(f"最终资产: {cerebro.broker.getvalue():.2f}")
    print(f"夏普比率: {strat.analyzers.sharpe.get_analysis()['sharperatio']:.2f}")
    print(f"最大回撤: {strat.analyzers.drawdown.get_analysis()['max']['drawdown']:.2f}%")
    
    # 可视化
    cerebro.plot(style='candlestick')
    
    return strat

# 加载数据
data = pd.read_csv('btc_2023.csv', index_col='timestamp', parse_dates=True)
backtest_strategy(data, DualMovingAverage)

3. 策略优化

def optimize_strategy(data):
    """策略参数优化"""
    cerebro = bt.Cerebro()
    cerebro.adddata(bt.feeds.PandasData(dataname=data))
    
    # 添加策略并定义参数范围
    cerebro.optstrategy(
        DualMovingAverage,
        fast_period=range(10, 30, 5),
        slow_period=range(40, 70, 5)
    )
    
    # 设置初始资金和手续费
    cerebro.broker.setcash(10000)
    cerebro.broker.setcommission(commission=0.001)
    
    # 添加分析器
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
    cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
    
    # 运行优化
    opt_results = cerebro.run(maxcpus=1)
    
    # 分析结果
    results = []
    for run in opt_results:
        for strat in run:
            sharpe = strat.analyzers.sharpe.get_analysis()['sharperatio']
            returns = strat.analyzers.returns.get_analysis()['rtot']
            results.append({
                'params': strat.params,
                'sharpe': sharpe,
                'returns': returns
            })
    
    # 找到最佳参数
    best = max(results, key=lambda x: x['sharpe'])
    print(f"最佳参数: {best['params']}")
    print(f"夏普比率: {best['sharpe']:.2f}")
    print(f"总收益: {best['returns']*100:.2f}%")
    
    return best

六、实盘交易:连接交易所API

1. 交易引擎实现

class TradingEngine:
    """交易引擎"""
    def __init__(self, api_key, secret_key):
        self.exchange = ccxt.binance({
            'apiKey': api_key,
            'secret': secret_key,
            'enableRateLimit': True
        })
        self.positions = {}
        self.balance = self.get_balance()
    
    def get_balance(self):
        """获取账户余额"""
        balance = self.exchange.fetch_balance()
        return {
            'total': balance['total'],
            'free': balance['free'],
            'used': balance['used']
        }
    
    def create_order(self, symbol, side, amount, order_type='market'):
        """创建订单"""
        try:
            order = self.exchange.create_order(
                symbol=symbol,
                type=order_type,
                side=side,
                amount=amount
            )
            return order
        except Exception as e:
            print(f"下单失败: {str(e)}")
            return None
    
    def get_open_orders(self, symbol):
        """获取未成交订单"""
        return self.exchange.fetch_open_orders(symbol)
    
    def cancel_order(self, order_id):
        """取消订单"""
        return self.exchange.cancel_order(order_id)
    
    def run_strategy(self, strategy, symbol, timeframe):
        """运行交易策略"""
        while True:
            try:
                # 获取最新数据
                data = get_real_time_data(symbol, timeframe)
                
                # 生成信号
                signal = strategy.generate_signal(data)
                
                # 执行交易
                if signal == 'BUY':
                    self.execute_buy(symbol)
                elif signal == 'SELL':
                    self.execute_sell(symbol)
                
                # 等待下一个周期
                time.sleep(timeframe_to_seconds(timeframe))
            except Exception as e:
                print(f"策略执行错误: {str(e)}")
                time.sleep(60)

def timeframe_to_seconds(tf):
    """时间帧转换为秒"""
    units = {
        '1m': 60,
        '5m': 300,
        '15m': 900,
        '30m': 1800,
        '1h': 3600,
        '4h': 14400,
        '1d': 86400
    }
    return units.get(tf, 60)

2. 双均线策略实现

class DualMAStrategy:
    """双均线策略"""
    def __init__(self, fast_period=20, slow_period=50):
        self.fast_period = fast_period
        self.slow_period = slow_period
    
    def generate_signal(self, data):
        """生成交易信号"""
        # 计算均线
        data['fast_ma'] = data['close'].rolling(self.fast_period).mean()
        data['slow_ma'] = data['close'].rolling(self.slow_period).mean()
        
        # 检查交叉
        if data['fast_ma'].iloc[-1] > data['slow_ma'].iloc[-1] and data['fast_ma'].iloc[-2] <= data['slow_ma'].iloc[-2]:
            return 'BUY'
        elif data['fast_ma'].iloc[-1] < data['slow_ma'].iloc[-1] and data['fast_ma'].iloc[-2] >= data['slow_ma'].iloc[-2]:
            return 'SELL'
        return 'HOLD'

3. 交易执行逻辑

class TradingBot:
    """交易机器人"""
    def __init__(self, engine, strategy, symbol, initial_balance, risk_per_trade):
        self.engine = engine
        self.strategy = strategy
        self.symbol = symbol
        self.initial_balance = initial_balance
        self.risk_per_trade = risk_per_trade
        self.position = None
    
    def execute_buy(self):
        """执行买入"""
        if self.position:
            return  # 已有仓位
            
        # 计算买入数量
        price = self.engine.get_current_price(self.symbol)
        risk_amount = self.initial_balance * self.risk_per_trade
        amount = risk_amount / price
        
        # 下单
        order = self.engine.create_order(self.symbol, 'buy', amount)
        if order:
            self.position = {
                'entry_price': price,
                'amount': amount,
                'stop_loss': price * 0.95  # 5%止损
            }
            print(f"买入 {amount} {self.symbol} @ {price}")
    
    def execute_sell(self):
        """执行卖出"""
        if not self.position:
            return  # 没有仓位
            
        # 卖出全部仓位
        amount = self.position['amount']
        order = self.engine.create_order(self.symbol, 'sell', amount)
        if order:
            print(f"卖出 {amount} {self.symbol}")
            self.position = None
    
    def check_stop_loss(self):
        """检查止损"""
        if self.position:
            current_price = self.engine.get_current_price(self.symbol)
            if current_price <= self.position['stop_loss']:
                print(f"触发止损 @ {current_price}")
                self.execute_sell()
    
    def run(self):
        """运行交易机器人"""
        while True:
            try:
                # 获取信号
                data = self.engine.get_recent_data(self.symbol, '1h', 100)
                signal = self.strategy.generate_signal(data)
                
                # 执行信号
                if signal == 'BUY':
                    self.execute_buy()
                elif signal == 'SELL':
                    self.execute_sell()
                
                # 检查止损
                self.check_stop_loss()
                
                # 等待下一周期
                time.sleep(3600)  # 每小时检查一次
            except Exception as e:
                print(f"交易错误: {str(e)}")
                time.sleep(60)

七、风险管理:保护你的资金

1. 风险控制策略

2. 动态止损策略

class DynamicRiskManager:
    """动态风险管理器"""
    def __init__(self, max_drawdown=0.1, volatility_factor=2.0):
        self.max_drawdown = max_drawdown
        self.volatility_factor = volatility_factor
        self.portfolio_value = []
    
    def calculate_position_size(self, price, atr):
        """计算仓位大小"""
        # 基于波动率调整仓位
        risk_unit = self.portfolio_value[-1] * 0.01  # 1%风险
        position_size = risk_unit / (atr * self.volatility_factor)
        return position_size
    
    def calculate_stop_loss(self, entry_price, atr):
        """计算止损价"""
        return entry_price - atr * self.volatility_factor
    
    def calculate_take_profit(self, entry_price, atr):
        """计算止盈价"""
        return entry_price + atr * self.volatility_factor * 2
    
    def update_portfolio_value(self, value):
        """更新投资组合价值"""
        self.portfolio_value.append(value)
        
        # 检查最大回撤
        peak = max(self.portfolio_value)
        current = self.portfolio_value[-1]
        drawdown = (peak - current) / peak
        
        if drawdown > self.max_drawdown:
            return 'REDUCE_RISK'
        return 'NORMAL'

3. 多策略风控系统

class RiskManagementSystem:
    """综合风控系统"""
    def __init__(self, engine):
        self.engine = engine
        self.position_limits = {
            'BTC': 0.3,  # 最大仓位30%
            'ETH': 0.2,
            'OTHER': 0.1
        }
        self.max_leverage = 3
        self.max_daily_loss = 0.05  # 5%
    
    def check_position_limit(self, symbol, amount):
        """检查仓位限制"""
        current_positions = self.engine.get_positions()
        symbol_pos = current_positions.get(symbol, 0)
        portfolio_value = self.engine.get_portfolio_value()
        
        # 计算新仓位占比
        price = self.engine.get_current_price(symbol)
        new_position_value = (symbol_pos + amount) * price
        new_percentage = new_position_value / portfolio_value
        
        # 检查是否超限
        limit = self.position_limits.get(symbol.split('/')[0], self.position_limits['OTHER'])
        return new_percentage <= limit
    
    def check_leverage(self):
        """检查杠杆水平"""
        portfolio_value = self.engine.get_portfolio_value()
        total_position_value = sum(
            pos['amount'] * self.engine.get_current_price(symbol)
            for symbol, pos in self.engine.positions.items()
        )
        leverage = total_position_value / portfolio_value
        return leverage <= self.max_leverage
    
    def check_daily_loss(self):
        """检查当日损失"""
        daily_pnl = self.engine.get_daily_pnl()
        portfolio_value = self.engine.get_portfolio_value()
        daily_loss = -daily_pnl / portfolio_value
        return daily_loss <= self.max_daily_loss
    
    def evaluate_trade(self, symbol, amount):
        """评估交易风险"""
        if not self.check_position_limit(symbol, amount):
            return False, "超出仓位限制"
        if not self.check_leverage():
            return False, "超出杠杆限制"
        if not self.check_daily_loss():
            return False, "超出当日损失限制"
        return True, "风险检查通过"

八、绩效分析:优化你的策略

1. 关键绩效指标

def calculate_performance_metrics(trades, initial_balance):
    """计算策略绩效指标"""
    # 计算累计收益
    final_balance = trades['balance'].iloc[-1]
    total_return = (final_balance - initial_balance) / initial_balance
    
    # 计算年化收益
    duration_days = (trades.index[-1] - trades.index[0]).days
    annualized_return = (1 + total_return) ** (365 / duration_days) - 1
    
    # 计算最大回撤
    peak = trades['balance'].cummax()
    drawdown = (trades['balance'] - peak) / peak
    max_drawdown = drawdown.min()
    
    # 计算夏普比率
    daily_returns = trades['balance'].pct_change().dropna()
    sharpe_ratio = daily_returns.mean() / daily_returns.std() * np.sqrt(365)
    
    # 计算胜率
    winning_trades = trades[trades['pnl'] > 0]
    win_rate = len(winning_trades) / len(trades) if len(trades) > 0 else 0
    
    return {
        'total_return': total_return,
        'annualized_return': annualized_return,
        'max_drawdown': max_drawdown,
        'sharpe_ratio': sharpe_ratio,
        'win_rate': win_rate
    }

2. 可视化分析

import plotly.graph_objects as go

def visualize_performance(trades):
    """可视化交易绩效"""
    fig = go.Figure()
    
    # 资产曲线
    fig.add_trace(go.Scatter(
        x=trades.index,
        y=trades['balance'],
        name='资产曲线',
        line=dict(color='blue')
    ))
    
    # 买入点
    buy_signals = trades[trades['signal'] == 'BUY']
    fig.add_trace(go.Scatter(
        x=buy_signals.index,
        y=buy_signals['balance'],
        mode='markers',
        marker=dict(color='green', size=10),
        name='买入点'
    ))
    
    # 卖出点
    sell_signals = trades[trades['signal'] == 'SELL']
    fig.add_trace(go.Scatter(
        x=sell_signals.index,
        y=sell_signals['balance'],
        mode='markers',
        marker=dict(color='red', size=10),
        name='卖出点'
    ))
    
    # 布局设置
    fig.update_layout(
        title='交易绩效分析',
        xaxis_title='日期',
        yaxis_title='资产价值',
        hovermode='x unified',
        template='plotly_dark'
    )
    
    fig.show()
    
    # 绘制回撤曲线
    peak = trades['balance'].cummax()
    drawdown = (trades['balance'] - peak) / peak
    
    fig2 = go.Figure()
    fig2.add_trace(go.Scatter(
        x=trades.index,
        y=drawdown,
        fill='tozeroy',
        fillcolor='rgba(255,0,0,0.2)',
        line=dict(color='red'),
        name='回撤曲线'
    ))
    
    fig2.update_layout(
        title='最大回撤分析',
        xaxis_title='日期',
        yaxis_title='回撤比例',
        template='plotly_dark'
    )
    
    fig2.show()

九、真实案例:成功与失败分析

1. 成功案例:高频套利策略

​策略特点​​:

  • 年化收益:245%
  • 最大回撤:8.2%
  • 夏普比率:3.8
  • 胜率:68%

​核心算法​​:

class ArbitrageStrategy:
    """交易所套利策略"""
    def __init__(self, exchanges):
        self.exchanges = exchanges  # 多个交易所实例
    
    def find_opportunities(self):
        """寻找套利机会"""
        opportunities = []
        for base_ex in self.exchanges:
            for quote_ex in self.exchanges:
                if base_ex != quote_ex:
                    # 获取价格
                    base_price = base_ex.get_order_book('BTC/USDT')['bid']
                    quote_price = quote_ex.get_order_book('BTC/USDT')['ask']
                    
                    # 计算价差
                    spread = quote_price - base_price
                    if spread > 0.01 * base_price:  # 1%价差
                        opportunities.append({
                            'buy_exchange': base_ex.name,
                            'sell_exchange': quote_ex.name,
                            'spread': spread
                        })
        return opportunities
    
    def execute_arbitrage(self, opportunity):
        """执行套利"""
        # 在低价交易所买入
        buy_ex = self.get_exchange(opportunity['buy_exchange'])
        buy_order = buy_ex.create_order('BTC/USDT', 'buy', 0.1)
        
        # 在高价交易所卖出
        sell_ex = self.get_exchange(opportunity['sell_exchange'])
        sell_order = sell_ex.create_order('BTC/USDT', 'sell', 0.1)
        
        return buy_order, sell_order

2. 失败案例:杠杆爆仓事件

​问题分析​​:

  • 过度杠杆:10倍杠杆
  • 无止损策略
  • 黑天鹅事件:市场闪崩
  • 流动性不足
  • 系统故障

​教训总结​​:

  1. 严格控制杠杆(≤3倍)
  2. 必须设置止损
  3. 分散投资组合
  4. 压力测试策略
  5. 监控系统健康

十、工业级部署:生产环境方案

1. Docker容器化

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "trading_bot.py"]

2. Kubernetes部署

# trading-bot-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: trading-bot
spec:
  replicas: 3
  selector:
    matchLabels:
      app: trading-bot
  template:
    metadata:
      labels:
        app: trading-bot
    spec:
      containers:
      - name: trading-bot
        image: your-registry/trading-bot:latest
        env:
        - name: BINANCE_API_KEY
          valueFrom:
            secretKeyRef:
              name: trading-secrets
              key: api_key
        - name: BINANCE_SECRET_KEY
          valueFrom:
            secretKeyRef:
              name: trading-secrets
              key: secret_key
        resources:
          limits:
            cpu: "1"
            memory: "512Mi"

3. 监控系统

from prometheus_client import start_http_server, Gauge

# 创建监控指标
balance_metric = Gauge('trading_balance', 'Current trading balance')
position_metric = Gauge('trading_position', 'Current position size')
drawdown_metric = Gauge('trading_drawdown', 'Current drawdown')

def start_monitoring(port=8000):
    """启动监控服务"""
    start_http_server(port)
    
    while True:
        # 更新指标
        balance = trading_engine.get_balance()
        balance_metric.set(balance['total'])
        
        position = trading_engine.get_position('BTC/USDT')
        position_metric.set(position['amount'] if position else 0)
        
        # 计算回撤
        peak = max(portfolio_history)
        current = portfolio_history[-1]
        drawdown = (peak - current) / peak
        drawdown_metric.set(drawdown)
        
        time.sleep(10)

十一、完整可运行代码

# trading_bot.py
import time
import pandas as pd
import ccxt
from ta.trend import EMAIndicator

class TradingBot:
    """比特币交易机器人"""
    def __init__(self, api_key, secret_key, symbol='BTC/USDT', timeframe='1h', initial_balance=10000, risk_per_trade=0.01):
        self.exchange = ccxt.binance({
            'apiKey': api_key,
            'secret': secret_key,
            'enableRateLimit': True
        })
        self.symbol = symbol
        self.timeframe = timeframe
        self.balance = initial_balance
        self.risk_per_trade = risk_per_trade
        self.position = None
        self.trade_history = []
    
    def get_ohlcv(self, limit=100):
        """获取K线数据"""
        ohlcv = self.exchange.fetch_ohlcv(self.symbol, self.timeframe, limit=limit)
        df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
        df.set_index('timestamp', inplace=True)
        return df
    
    def get_current_price(self):
        """获取当前价格"""
        ticker = self.exchange.fetch_ticker(self.symbol)
        return ticker['last']
    
    def create_order(self, side, amount):
        """创建订单"""
        try:
            order = self.exchange.create_order(
                symbol=self.symbol,
                type='market',
                side=side,
                amount=amount
            )
            return order
        except Exception as e:
            print(f"下单失败: {str(e)}")
            return None
    
    def calculate_position_size(self, price):
        """计算仓位大小"""
        risk_amount = self.balance * self.risk_per_trade
        return risk_amount / price
    
    def dual_ma_signal(self, df):
        """双均线交易信号"""
        # 计算EMA
        df['ema_fast'] = EMAIndicator(df['close'], window=20).ema_indicator()
        df['ema_slow'] = EMAIndicator(df['close'], window=50).ema_indicator()
        
        # 检查交叉
        if df['ema_fast'].iloc[-1] > df['ema_slow'].iloc[-1] and df['ema_fast'].iloc[-2] <= df['ema_slow'].iloc[-2]:
            return 'BUY'
        elif df['ema_fast'].iloc[-1] < df['ema_slow'].iloc[-1] and df['ema_fast'].iloc[-2] >= df['ema_slow'].iloc[-2]:
            return 'SELL'
        return 'HOLD'
    
    def execute_trade(self, signal):
        """执行交易"""
        current_price = self.get_current_price()
        
        if signal == 'BUY' and not self.position:
            # 计算买入数量
            amount = self.calculate_position_size(current_price)
            
            # 下单
            order = self.create_order('buy', amount)
            if order:
                self.position = {
                    'entry_price': current_price,
                    'amount': amount,
                    'stop_loss': current_price * 0.95  # 5%止损
                }
                self.trade_history.append({
                    'time': pd.Timestamp.now(),
                    'type': 'BUY',
                    'price': current_price,
                    'amount': amount
                })
                print(f"买入 {amount} BTC @ {current_price}")
        
        elif signal == 'SELL' and self.position:
            # 卖出全部仓位
            order = self.create_order('sell', self.position['amount'])
            if order:
                profit = (current_price - self.position['entry_price']) * self.position['amount']
                self.balance += profit
                self.trade_history.append({
                    'time': pd.Timestamp.now(),
                    'type': 'SELL',
                    'price': current_price,
                    'amount': self.position['amount'],
                    'profit': profit
                })
                print(f"卖出 {self.position['amount']} BTC @ {current_price} | 利润: {profit:.2f} USDT")
                self.position = None
    
    def check_stop_loss(self):
        """检查止损"""
        if self.position:
            current_price = self.get_current_price()
            if current_price <= self.position['stop_loss']:
                print(f"触发止损 @ {current_price}")
                self.execute_trade('SELL')
    
    def run(self):
        """运行交易机器人"""
        print("启动比特币交易机器人...")
        print(f"初始资金: {self.balance} USDT")
        
        while True:
            try:
                # 获取数据
                data = self.get_ohlcv(100)
                
                # 生成信号
                signal = self.dual_ma_signal(data)
                
                # 执行交易
                self.execute_trade(signal)
                
                # 检查止损
                self.check_stop_loss()
                
                # 更新余额
                if self.position:
                    current_price = self.get_current_price()
                    position_value = self.position['amount'] * current_price
                    cash = self.balance - position_value
                    portfolio_value = cash + position_value
                else:
                    portfolio_value = self.balance
                
                print(f"当前资产: {portfolio_value:.2f} USDT | 信号: {signal}")
                
                # 等待下一周期
                time.sleep(3600)  # 每小时运行一次
                
            except Exception as e:
                print(f"错误: {str(e)}")
                time.sleep(60)

if __name__ == "__main__":
    # 配置参数
    API_KEY = "your_api_key"
    SECRET_KEY = "your_secret_key"
    
    # 创建并运行机器人
    bot = TradingBot(API_KEY, SECRET_KEY)
    bot.run()

结语:成为量化交易专家

通过本指南,您已掌握:

  • 📈 加密货币数据获取
  • 🤖 量化策略开发
  • ⚙️ 回测与优化
  • 💼 实盘交易系统
  • 🛡️ 风险管理
  • 📊 绩效分析

​下一步行动​​:

  1. 开发更多策略(均值回归、动量策略)
  2. 添加机器学习预测
  3. 实现多资产配置
  4. 部署云交易系统
  5. 加入量化交易社区

"在加密货币的海洋中,量化交易是你的罗盘,风险管理是你的锚。掌握它们,你就能在数字金融的浪潮中航行自如。"

Logo

加入社区!打开量化的大门,首批课程上线啦!

更多推荐