免责声明:本文基于个人使用体验,与任何厂商无商业关系。内容仅供技术交流参考,不构成投资建议。


一、前言

历史数据管理是量化交易的基础。如何高效存储和查询期货历史数据?2026年了,今天分享一下我在数据管理方面的实践经验。


二、数据存储方案选择

1. 文件存储

最简单的存储方式是CSV文件:

import pandas as pd
import os

def save_to_csv(data, filepath):
    """保存数据到CSV"""
    data.to_csv(filepath, index=False, encoding='utf-8-sig')
    print(f"数据已保存到: {filepath}")

def load_from_csv(filepath):
    """从CSV加载数据"""
    data = pd.read_csv(filepath, encoding='utf-8-sig')
    data['datetime'] = pd.to_datetime(data['datetime'])
    return data

# 使用示例
klines = get_kline_data("SHFE.rb2401")
save_to_csv(klines, "data/rb2401_1m.csv")

优点:

  • 简单易用
  • 易于查看和编辑
  • 跨平台兼容

缺点:

  • 查询效率低
  • 占用空间大
  • 不适合大量数据

2. SQLite数据库

SQLite适合中小规模数据:

import sqlite3
import pandas as pd

def init_database(db_path="data/quant.db"):
    """初始化数据库"""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # 创建K线数据表
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS klines (
            symbol TEXT,
            datetime TEXT,
            open REAL,
            high REAL,
            low REAL,
            close REAL,
            volume INTEGER,
            open_interest INTEGER,
            PRIMARY KEY (symbol, datetime)
        )
    """)
    
    conn.commit()
    conn.close()

def save_klines_to_db(data, symbol, db_path="data/quant.db"):
    """保存K线数据到数据库"""
    conn = sqlite3.connect(db_path)
    
    data['symbol'] = symbol
    data.to_sql('klines', conn, if_exists='append', index=False)
    
    conn.close()

def query_klines(symbol, start_date, end_date, db_path="data/quant.db"):
    """查询K线数据"""
    conn = sqlite3.connect(db_path)
    
    query = """
        SELECT * FROM klines 
        WHERE symbol = ? AND datetime BETWEEN ? AND ?
        ORDER BY datetime
    """
    
    data = pd.read_sql_query(query, conn, params=(symbol, start_date, end_date))
    data['datetime'] = pd.to_datetime(data['datetime'])
    
    conn.close()
    return data

3. MySQL/PostgreSQL

对于大规模数据,使用关系型数据库:

import pymysql
import pandas as pd

def connect_mysql():
    """连接MySQL数据库"""
    conn = pymysql.connect(
        host='localhost',
        user='root',
        password='password',
        database='quant',
        charset='utf8mb4'
    )
    return conn

def save_to_mysql(data, table_name, conn):
    """保存数据到MySQL"""
    data.to_sql(table_name, conn, if_exists='append', index=False, method='multi')

def query_from_mysql(query, conn):
    """从MySQL查询数据"""
    return pd.read_sql(query, conn)

4. 时序数据库

对于高频数据,使用时序数据库更合适:

# InfluxDB示例
from influxdb import InfluxDBClient

client = InfluxDBClient(host='localhost', port=8086, database='quant')

def save_to_influxdb(data, measurement, symbol):
    """保存到时序数据库"""
    points = []
    for _, row in data.iterrows():
        point = {
            "measurement": measurement,
            "tags": {"symbol": symbol},
            "time": row['datetime'],
            "fields": {
                "open": row['open'],
                "high": row['high'],
                "low": row['low'],
                "close": row['close'],
                "volume": row['volume']
            }
        }
        points.append(point)
    
    client.write_points(points)

三、数据获取方案对比

1. TqSdk(天勤量化)

我目前主要使用TqSdk获取历史数据:

from tqsdk import TqApi, TqAuth
from datetime import date, datetime
import pandas as pd

def get_historical_data_tqsdk(symbol, duration_seconds, count):
    """使用TqSdk获取历史数据"""
    api = TqApi(auth=TqAuth("账户", "密码"))
    
    try:
        # 获取K线数据
        klines = api.get_kline_serial(symbol, duration_seconds, count)
        
        # 转换为DataFrame
        df = pd.DataFrame({
            'datetime': klines['datetime'],
            'open': klines['open'],
            'high': klines['high'],
            'low': klines['low'],
            'close': klines['close'],
            'volume': klines['volume'],
            'open_interest': klines['open_interest']
        })
        
        return df
    finally:
        api.close()

# 使用示例
data = get_historical_data_tqsdk("SHFE.rb2401", 60, 1000)
print(data.head())

优势:

  • 数据服务完善,Tick和K线数据都有
  • 不需要自己建数据库
  • API简洁,使用方便
  • 数据质量有保障

需要注意的:

  • 需要Python基础
  • 只支持国内期货

2. VnPy

VnPy需要自己对接数据源:

# VnPy数据获取示例
from vnpy.trader.engine import MainEngine
from vnpy.gateway.ctp import CtpGateway

# 需要自己配置CTP接口获取数据
# 或者使用第三方数据源

特点:

  • 完全开源免费
  • 但数据需要自己解决
  • 配置相对复杂

3. 第三方数据服务

也可以使用专门的期货数据服务:

特点:

  • 数据质量高
  • 但需要付费
  • 需要自己对接API

四、数据管理最佳实践

1. 数据分层存储

# 数据分层存储结构
data_structure = {
    "原始数据": {
        "路径": "data/raw/",
        "格式": "CSV或数据库",
        "保留": "永久保留"
    },
    "清洗后数据": {
        "路径": "data/cleaned/",
        "格式": "Parquet或HDF5",
        "保留": "长期保留"
    },
    "特征数据": {
        "路径": "data/features/",
        "格式": "Parquet",
        "保留": "根据需要"
    }
}

2. 数据版本管理

import hashlib
import json

def save_data_with_version(data, filepath, metadata=None):
    """保存数据并记录版本"""
    # 保存数据
    data.to_parquet(filepath)
    
    # 计算数据哈希
    data_hash = hashlib.md5(data.to_string().encode()).hexdigest()
    
    # 保存元数据
    version_info = {
        "filepath": filepath,
        "hash": data_hash,
        "rows": len(data),
        "columns": list(data.columns),
        "metadata": metadata or {}
    }
    
    version_file = filepath.replace(".parquet", "_version.json")
    with open(version_file, 'w', encoding='utf-8') as f:
        json.dump(version_info, f, ensure_ascii=False, indent=2)

3. 数据清洗

def clean_data(data):
    """数据清洗"""
    # 删除缺失值
    data = data.dropna()
    
    # 删除异常值
    data = data[data['volume'] > 0]
    data = data[data['high'] >= data['low']]
    data = data[data['high'] >= data['open']]
    data = data[data['high'] >= data['close']]
    data = data[data['low'] <= data['open']]
    data = data[data['low'] <= data['close']]
    
    # 去重
    data = data.drop_duplicates(subset=['datetime'])
    
    # 排序
    data = data.sort_values('datetime')
    
    return data

4. 数据查询优化

# 使用索引提高查询效率
def create_index(conn, table_name, columns):
    """创建索引"""
    cursor = conn.cursor()
    index_name = f"idx_{table_name}_{'_'.join(columns)}"
    columns_str = ', '.join(columns)
    cursor.execute(f"CREATE INDEX IF NOT EXISTS {index_name} ON {table_name}({columns_str})")
    conn.commit()

# 分区表提高查询效率
def create_partitioned_table(conn):
    """创建分区表"""
    cursor = conn.cursor()
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS klines_partitioned (
            symbol TEXT,
            datetime TEXT,
            open REAL,
            high REAL,
            low REAL,
            close REAL,
            volume INTEGER
        ) PARTITION BY RANGE (datetime)
    """)
    conn.commit()

五、完整数据管理系统示例

class DataManager:
    """数据管理器"""
    
    def __init__(self, db_path="data/quant.db"):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS klines (
                symbol TEXT,
                datetime TEXT,
                open REAL,
                high REAL,
                low REAL,
                close REAL,
                volume INTEGER,
                open_interest INTEGER,
                PRIMARY KEY (symbol, datetime)
            )
        """)
        
        conn.commit()
        conn.close()
    
    def save_data(self, data, symbol):
        """保存数据"""
        conn = sqlite3.connect(self.db_path)
        data['symbol'] = symbol
        data.to_sql('klines', conn, if_exists='append', index=False)
        conn.close()
    
    def query_data(self, symbol, start_date, end_date):
        """查询数据"""
        conn = sqlite3.connect(self.db_path)
        query = """
            SELECT * FROM klines 
            WHERE symbol = ? AND datetime BETWEEN ? AND ?
            ORDER BY datetime
        """
        data = pd.read_sql_query(query, conn, params=(symbol, start_date, end_date))
        data['datetime'] = pd.to_datetime(data['datetime'])
        conn.close()
        return data
    
    def get_latest_data(self, symbol, count=100):
        """获取最新数据"""
        conn = sqlite3.connect(self.db_path)
        query = """
            SELECT * FROM klines 
            WHERE symbol = ?
            ORDER BY datetime DESC
            LIMIT ?
        """
        data = pd.read_sql_query(query, conn, params=(symbol, count))
        data['datetime'] = pd.to_datetime(data['datetime'])
        data = data.sort_values('datetime')
        conn.close()
        return data

# 使用示例
dm = DataManager()
data = get_historical_data_tqsdk("SHFE.rb2401", 60, 1000)
dm.save_data(data, "SHFE.rb2401")
query_data = dm.query_data("SHFE.rb2401", "2023-01-01", "2023-12-31")

六、总结

数据管理是量化交易的基础。我目前主要使用TqSdk获取历史数据,配合SQLite或MySQL进行存储,可以高效地管理大量历史数据。

当然,这只是我个人的选择,每个人需求不同。如果数据量特别大,可以考虑使用时序数据库;如果只是简单存储,CSV文件也够用。建议根据自己的实际情况选择合适的数据管理方案。

最后再次强调:量化交易有风险,数据质量直接影响策略表现。本文仅从技术角度介绍相关方法,不构成任何投资建议。


声明:本文基于个人学习经验整理,仅供技术交流参考,不构成任何投资建议。文中提及的工具和方法请自行评估是否适合自己的需求。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐