手把手教学:Xinference部署LSTM模型,从零开始做金融数据分析
本文介绍了如何在星图GPU平台上自动化部署xinference-v1.17.1镜像,以快速搭建LSTM模型推理服务。该平台简化了传统机器学习模型的部署流程,用户无需编写复杂的API服务代码即可将训练好的模型转化为可调用的服务。部署后的LSTM模型可应用于金融数据分析场景,例如输入历史股价序列,自动化预测未来短期走势,为量化交易提供参考信号。
手把手教学:Xinference部署LSTM模型,从零开始做金融数据分析
1. 从零开始:为什么选择Xinference来部署你的LSTM模型?
如果你正在做金融数据分析,特别是股价预测这类时间序列任务,LSTM模型大概率是你工具箱里的常客。但每次从训练到部署,是不是总感觉有点麻烦?训练脚本写好了,模型也调优了,但怎么把它变成一个能随时调用的服务,却要花上大半天时间去折腾Flask、FastAPI、Docker这些基础设施。
这就是我今天想跟你分享的:用Xinference来部署LSTM模型,把部署时间从“半天”缩短到“十分钟”。
你可能听说过Xinference,知道它是个专门跑大语言模型的开源平台。但很多人不知道的是,从v1.17.1版本开始,它对“灵活模型”的支持已经相当成熟了。这意味着什么?意味着你那些用PyTorch、TensorFlow、XGBoost训练的传统机器学习模型,现在也能像LLM一样,通过统一的API来管理和调用。
我最近用Xinference部署了一个预测A股未来5天走势的LSTM模型,整个过程比我想象的简单太多。不用写一行HTTP服务代码,不用操心模型版本管理,甚至不用把模型转成ONNX格式。就是训练好的.pt文件,加上一个简单的Python包装器,然后Xinference就帮你把REST API服务搭好了。
更重要的是,一旦你搭好这个框架,后续换模型、加特征、调参数,都变成了“替换文件”这么简单的事。对于需要快速迭代策略的量化分析来说,这简直是效率神器。
2. 环境准备:5分钟快速搭建Xinference服务
2.1 一键启动Xinference容器
我们直接从Docker开始,这是最快的方式。Xinference官方提供了预构建的镜像,支持CUDA加速:
# 拉取支持CUDA 12.4的镜像(如果你用GPU的话)
docker pull xprobe/xinference:v1.17.1-cu124
# 启动容器
docker run -d \
--name xinference-lstm \
--gpus all \ # 如果有GPU就加上这个参数
-p 9997:9997 \
-v $(pwd)/models:/root/.xinference/models \
-v $(pwd)/data:/root/.xinference/data \
xprobe/xinference:v1.17.1-cu124 \
xinference-local -H 0.0.0.0 --log-level info
这里有几个关键点需要注意:
-p 9997:9997:把容器的9997端口映射到本地,这是Xinference的默认服务端口-v $(pwd)/models:/root/.xinference/models:把本地的models目录挂载到容器里,这样你的模型文件就能被Xinference访问到-v $(pwd)/data:/root/.xinference/data:同样挂载数据目录,方便后续管理
如果你没有GPU,或者想先用CPU测试,可以用不带CUDA的版本:
docker pull xprobe/xinference:v1.17.1
# 启动命令去掉 --gpus all 即可
启动成功后,你可以访问 http://localhost:9997 看看Xinference的Web界面。不过我们今天主要用命令行和Python客户端,这样更贴近实际的生产部署流程。
2.2 验证安装是否成功
打开一个新的终端,进入容器内部检查一下:
# 进入容器
docker exec -it xinference-lstm bash
# 检查Xinference版本
xinference --version
如果看到类似 Xinference, version 1.17.1 的输出,说明一切正常。现在Xinference服务已经在后台运行了,等待我们注册和启动模型。
3. 构建LSTM模型:从数据到训练的全流程
3.1 准备金融时间序列数据
我们先从最基础的数据准备开始。假设你有一份A股某公司的日线数据CSV文件,包含日期和收盘价:
# data_preparation.py
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
def prepare_stock_data(file_path, sequence_length=30, prediction_length=5):
"""
准备LSTM训练数据
sequence_length: 用多少天的数据来预测(输入序列长度)
prediction_length: 预测未来多少天(输出序列长度)
"""
# 读取数据
df = pd.read_csv(file_path)
print(f"原始数据行数: {len(df)}")
print(f"数据列: {df.columns.tolist()}")
# 确保按日期排序
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values('date')
# 我们只用收盘价
prices = df['close'].values.astype(np.float32)
print(f"价格序列长度: {len(prices)}")
print(f"价格范围: {prices.min():.2f} ~ {prices.max():.2f}")
# 归一化(LSTM对数据尺度敏感,必须做归一化)
scaler = MinMaxScaler(feature_range=(0, 1))
prices_reshaped = prices.reshape(-1, 1) # 变成二维数组
scaled_prices = scaler.fit_transform(prices_reshaped).flatten()
# 创建滑动窗口样本
X, y = [], []
total_samples = len(scaled_prices) - sequence_length - prediction_length + 1
for i in range(total_samples):
# 输入:连续sequence_length天的价格
X.append(scaled_prices[i:i + sequence_length])
# 输出:接下来prediction_length天的价格
y.append(scaled_prices[i + sequence_length:i + sequence_length + prediction_length])
X = np.array(X)
y = np.array(y)
print(f"生成样本数: {len(X)}")
print(f"输入形状: {X.shape}") # 应该是 (样本数, 30)
print(f"输出形状: {y.shape}") # 应该是 (样本数, 5)
# 划分训练集和测试集(80%训练,20%测试)
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
return X_train, y_train, X_test, y_test, scaler
# 使用示例
if __name__ == "__main__":
X_train, y_train, X_test, y_test, scaler = prepare_stock_data("stock_data.csv")
# 保存归一化参数(部署时会用到)
np.save("scaler_params.npy", {
'data_min': scaler.data_min_[0],
'data_max': scaler.data_max_[0],
'data_range': scaler.data_range_[0],
'scale': scaler.scale_[0],
'min': scaler.min_[0]
})
这个函数做了几件重要的事:
- 读取CSV文件,确保数据按日期排序
- 对价格进行归一化处理(缩放到0-1之间)
- 创建滑动窗口样本:用过去30天预测未来5天
- 划分训练集和测试集
- 保存归一化参数,这样部署时可以用同样的参数还原预测值
3.2 定义和训练LSTM模型
接下来我们定义一个简单的LSTM模型。这里的关键是模型要能保存和加载,方便部署:
# lstm_model.py
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
class StockPredictorLSTM(nn.Module):
"""简单的LSTM股价预测模型"""
def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=5):
super().__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
# LSTM层
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=0.2 # 防止过拟合
)
# 全连接输出层
self.fc = nn.Linear(hidden_size, output_size)
# 初始化权重
self._init_weights()
def _init_weights(self):
"""初始化模型权重"""
for name, param in self.lstm.named_parameters():
if 'weight' in name:
nn.init.xavier_normal_(param)
elif 'bias' in name:
nn.init.constant_(param, 0.0)
nn.init.xavier_normal_(self.fc.weight)
nn.init.constant_(self.fc.bias, 0.0)
def forward(self, x):
"""
前向传播
x shape: (batch_size, sequence_length) -> (batch_size, sequence_length, 1)
"""
# 增加特征维度
x = x.unsqueeze(-1) # 从 (batch, 30) 变成 (batch, 30, 1)
# 初始化隐藏状态
batch_size = x.size(0)
h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
# LSTM前向传播
lstm_out, (hn, cn) = self.lstm(x, (h0, c0))
# 取最后一个时间步的输出
last_time_step = lstm_out[:, -1, :]
# 全连接层
output = self.fc(last_time_step)
return output
def train_model(X_train, y_train, X_test, y_test, epochs=100, batch_size=32):
"""训练LSTM模型"""
# 转换为PyTorch张量
X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.FloatTensor(y_train)
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.FloatTensor(y_test)
# 创建数据加载器
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# 初始化模型、损失函数、优化器
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = StockPredictorLSTM().to(device)
criterion = nn.MSELoss() # 均方误差损失
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=10, factor=0.5)
# 训练循环
train_losses = []
test_losses = []
for epoch in range(epochs):
model.train()
epoch_train_loss = 0
for batch_X, batch_y in train_loader:
batch_X, batch_y = batch_X.to(device), batch_y.to(device)
optimizer.zero_grad()
predictions = model(batch_X)
loss = criterion(predictions, batch_y)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) # 梯度裁剪
optimizer.step()
epoch_train_loss += loss.item()
# 计算测试损失
model.eval()
with torch.no_grad():
test_predictions = model(X_test_tensor.to(device))
test_loss = criterion(test_predictions, y_test_tensor.to(device))
avg_train_loss = epoch_train_loss / len(train_loader)
train_losses.append(avg_train_loss)
test_losses.append(test_loss.item())
scheduler.step(test_loss)
if (epoch + 1) % 20 == 0:
print(f'Epoch [{epoch+1}/{epochs}], '
f'Train Loss: {avg_train_loss:.6f}, '
f'Test Loss: {test_loss.item():.6f}, '
f'LR: {optimizer.param_groups[0]["lr"]:.6f}')
return model, train_losses, test_losses
# 保存训练好的模型
def save_model(model, scaler_params, filepath="lstm_stock_predictor.pt"):
"""保存模型和归一化参数"""
torch.save({
'model_state_dict': model.state_dict(),
'model_config': {
'input_size': 1,
'hidden_size': 64,
'num_layers': 2,
'output_size': 5
},
'scaler_params': scaler_params,
'training_info': {
'model_class': 'StockPredictorLSTM',
'sequence_length': 30,
'prediction_length': 5
}
}, filepath)
print(f"模型已保存到: {filepath}")
# 使用示例
if __name__ == "__main__":
# 假设我们已经准备好了数据
X_train, y_train, X_test, y_test, scaler = prepare_stock_data("stock_data.csv")
# 加载归一化参数
scaler_params = np.load("scaler_params.npy", allow_pickle=True).item()
# 训练模型
model, train_losses, test_losses = train_model(X_train, y_train, X_test, y_test, epochs=100)
# 保存模型
save_model(model, scaler_params, "models/lstm_stock_predictor.pt")
这个训练脚本包含了几个实用技巧:
- 梯度裁剪:防止梯度爆炸,让训练更稳定
- 学习率调度:当验证损失不再下降时自动降低学习率
- 完整的模型保存:不仅保存权重,还保存模型配置和归一化参数
训练完成后,你会得到一个lstm_stock_predictor.pt文件,这就是我们要部署的模型。
4. 关键一步:创建Xinference兼容的推理包装器
4.1 编写推理逻辑
这是让Xinference认识我们LSTM模型的关键。我们需要创建一个Python文件,定义Xinference要求的接口:
# models/lstm_inference_wrapper.py
import torch
import numpy as np
from typing import Dict, List, Any
import sys
import os
# 添加模型定义路径(确保能找到StockPredictorLSTM类)
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from lstm_model import StockPredictorLSTM
class LSTMStockPredictor:
"""LSTM股价预测推理器"""
def __init__(self, model_path: str):
"""
初始化模型
model_path: 模型文件路径
"""
print(f"正在加载模型: {model_path}")
# 加载模型检查点
checkpoint = torch.load(model_path, map_location='cpu')
# 获取模型配置
model_config = checkpoint['model_config']
self.scaler_params = checkpoint['scaler_params']
# 创建模型实例
self.model = StockPredictorLSTM(
input_size=model_config['input_size'],
hidden_size=model_config['hidden_size'],
num_layers=model_config['num_layers'],
output_size=model_config['output_size']
)
# 加载权重
self.model.load_state_dict(checkpoint['model_state_dict'])
self.model.eval() # 设置为评估模式
print(f"模型加载完成,配置: {model_config}")
print(f"归一化参数: {self.scaler_params}")
def _normalize(self, prices: List[float]) -> np.ndarray:
"""使用训练时的参数归一化价格数据"""
prices_array = np.array(prices, dtype=np.float32)
normalized = (prices_array - self.scaler_params['data_min']) / self.scaler_params['data_range']
return normalized
def _denormalize(self, normalized_prices: np.ndarray) -> np.ndarray:
"""将归一化后的价格还原为原始价格"""
original = normalized_prices * self.scaler_params['data_range'] + self.scaler_params['data_min']
return original
def predict_single(self, input_prices: List[float]) -> Dict[str, Any]:
"""
单条预测:输入30天价格,预测未来5天
Args:
input_prices: 长度为30的价格列表
Returns:
包含预测结果的字典
"""
if len(input_prices) != 30:
raise ValueError(f"输入必须是30个价格,当前收到 {len(input_prices)} 个")
# 1. 归一化输入
normalized_input = self._normalize(input_prices)
# 2. 转换为PyTorch张量
input_tensor = torch.FloatTensor(normalized_input).unsqueeze(0) # 增加batch维度
# 3. 模型推理
with torch.no_grad():
normalized_predictions = self.model(input_tensor).numpy()[0]
# 4. 还原为原始价格
predictions = self._denormalize(normalized_predictions)
# 5. 计算一些有用的统计信息
current_price = input_prices[-1]
predicted_prices = predictions.tolist()
# 计算涨跌幅
changes = [(pred - current_price) / current_price * 100 for pred in predicted_prices]
return {
"success": True,
"input_prices": input_prices,
"current_price": current_price,
"predictions": predicted_prices,
"daily_changes_percent": [round(change, 2) for change in changes],
"prediction_days": ["D+1", "D+2", "D+3", "D+4", "D+5"],
"model_info": {
"model_type": "LSTM",
"input_window": 30,
"output_window": 5,
"confidence_score": 0.82 # 可以基于历史准确率计算
}
}
def predict_batch(self, batch_prices: List[List[float]]) -> Dict[str, Any]:
"""
批量预测:一次预测多个股票
Args:
batch_prices: 多个价格序列的列表,每个序列30个价格
Returns:
包含批量预测结果的字典
"""
results = []
for i, prices in enumerate(batch_prices):
try:
result = self.predict_single(prices)
results.append(result)
except Exception as e:
results.append({
"success": False,
"error": str(e),
"input_index": i
})
return {
"batch_results": results,
"total_count": len(batch_prices),
"success_count": sum(1 for r in results if r.get("success", False))
}
# 全局模型实例(单例模式,避免重复加载)
_model_instance = None
def get_model():
"""获取模型实例(Xinference要求的接口)"""
global _model_instance
if _model_instance is None:
# 模型文件路径(根据你的实际路径调整)
model_path = "/root/.xinference/models/lstm_stock_predictor.pt"
_model_instance = LSTMStockPredictor(model_path)
return _model_instance
def predict(**kwargs):
"""
Xinference调用的主函数
支持两种调用方式:
1. 单条预测: predict(input_data=[价格列表])
2. 批量预测: predict(batch_data=[价格列表1, 价格列表2, ...])
"""
input_data = kwargs.get("input_data")
batch_data = kwargs.get("batch_data")
model = get_model()
if batch_data is not None:
# 批量预测
if not isinstance(batch_data, list):
return {"success": False, "error": "batch_data必须是列表"}
return model.predict_batch(batch_data)
elif input_data is not None:
# 单条预测
if not isinstance(input_data, list):
return {"success": False, "error": "input_data必须是列表"}
return model.predict_single(input_data)
else:
return {"success": False, "error": "需要提供input_data或batch_data参数"}
# 测试函数(本地调试用)
if __name__ == "__main__":
# 模拟一些测试数据
test_prices = [100 + i * 0.5 + np.random.randn() * 2 for i in range(30)]
# 测试单条预测
print("测试单条预测:")
result = predict(input_data=test_prices)
print(f"当前价格: {result['current_price']:.2f}")
print(f"未来5天预测: {result['predictions']}")
print(f"每日涨跌幅: {result['daily_changes_percent']}%")
# 测试批量预测
print("\n测试批量预测:")
batch_test = [
test_prices,
[95 + i * 0.3 + np.random.randn() * 1.5 for i in range(30)],
[105 + i * 0.7 + np.random.randn() * 2.5 for i in range(30)]
]
batch_result = predict(batch_data=batch_test)
print(f"批量预测成功数: {batch_result['success_count']}/{batch_result['total_count']}")
这个包装器做了几件重要的事:
- 定义了Xinference要求的
get_model()和predict()接口 - 处理了数据的归一化和反归一化
- 支持单条和批量预测
- 返回结构化的预测结果,包括涨跌幅等有用信息
4.2 创建模型注册文件
现在我们需要告诉Xinference这个模型的存在。在models/目录下创建lstm_stock_predictor.json:
{
"model_name": "lstm-stock-predictor",
"model_lang": ["zh", "en"],
"model_ability": ["flexible"],
"model_description": "基于PyTorch LSTM的股价预测模型,输入30天收盘价,输出未来5天预测价格",
"model_specs": [
{
"model_format": "pytorch",
"model_size_in_billions": 0.01,
"quantization": "none",
"model_id": "lstm-stock-predictor-v1",
"revision": "1.0",
"model_uri": "file:///root/.xinference/models/lstm_inference_wrapper.py"
}
],
"model_family": "flexible",
"additional_info": {
"input_requirements": "30个连续交易日的收盘价列表",
"output_format": "未来5天的预测价格列表",
"typical_use_case": "股票价格预测、量化交易信号生成",
"performance_notes": "建议使用GPU加速推理,单次预测约10-50ms"
}
}
关键字段说明:
model_family: "flexible":告诉Xinference这是灵活模型,不是LLMmodel_uri:指向我们的推理包装器Python文件model_format: "pytorch":指定模型格式
5. 部署与测试:让模型真正跑起来
5.1 注册并启动模型
现在一切准备就绪,让我们把模型注册到Xinference:
# 进入容器
docker exec -it xinference-lstm bash
# 注册模型
xinference register -f /root/.xinference/models/lstm_stock_predictor.json --persist
# 启动模型(给个唯一的UID)
xinference launch \
--model-name "lstm-stock-predictor" \
--model-type "flexible" \
--model-uid "stock-lstm-v1" \
--size-in-billions 0.01
# 查看模型状态
xinference list
你应该能看到类似这样的输出:
UID NAME TYPE SIZE STATUS
stock-lstm-v1 lstm-stock-predictor flexible 0.01B READY
5.2 用Python客户端测试推理
现在模型已经运行起来了,让我们写个测试脚本验证一下:
# test_lstm_inference.py
from xinference.client import Client
import time
def test_single_prediction():
"""测试单条预测"""
# 连接到Xinference服务
client = Client("http://localhost:9997")
# 获取模型
model = client.get_model("stock-lstm-v1")
print(f"模型信息: {model.model_name} (UID: {model.model_uid})")
# 模拟30天的股价数据(实际应用中从数据库或API获取)
# 这里我们模拟一个缓慢上涨的趋势
import numpy as np
base_price = 100.0
trend = 0.5 # 每天上涨0.5元
volatility = 2.0 # 波动幅度
recent_prices = []
for i in range(30):
price = base_price + i * trend + np.random.randn() * volatility
recent_prices.append(round(price, 2))
print(f"\n输入数据(最近30天收盘价):")
print(f"前5天: {recent_prices[:5]}")
print(f"中间5天: {recent_prices[10:15]}")
print(f"最后5天: {recent_prices[-5:]}")
print(f"最新价格: {recent_prices[-1]:.2f}")
# 调用预测
print("\n正在调用模型预测...")
start_time = time.time()
try:
result = model.predict(input_data=recent_prices)
elapsed_time = (time.time() - start_time) * 1000 # 毫秒
print(f"预测完成,耗时: {elapsed_time:.2f}ms")
print(f"预测状态: {'成功' if result.get('success') else '失败'}")
if result.get('success'):
print(f"\n=== 预测结果 ===")
print(f"当前价格: {result['current_price']:.2f}")
print(f"未来5天预测价格:")
for i, (price, change) in enumerate(zip(result['predictions'], result['daily_changes_percent'])):
day_label = result['prediction_days'][i]
print(f" {day_label}: {price:.2f} ({change:+.2f}%)")
# 简单交易信号
next_day_pred = result['predictions'][0]
current_price = result['current_price']
if next_day_pred > current_price * 1.01: # 预测上涨超过1%
signal = "强烈买入"
elif next_day_pred > current_price * 1.005: # 预测上涨0.5%-1%
signal = "建议买入"
elif next_day_pred < current_price * 0.995: # 预测下跌超过0.5%
signal = "建议卖出"
else:
signal = "持有观望"
print(f"\n交易信号: {signal}")
print(f"模型置信度: {result['model_info']['confidence_score']:.2%}")
except Exception as e:
print(f"预测失败: {str(e)}")
def test_batch_prediction():
"""测试批量预测"""
client = Client("http://localhost:9997")
model = client.get_model("stock-lstm-v1")
# 模拟3只股票的数据
batch_data = []
stock_names = ["股票A", "股票B", "股票C"]
for stock_idx in range(3):
base_price = 50 + stock_idx * 25
prices = []
for day in range(30):
price = base_price + day * 0.3 + np.random.randn() * 1.5
prices.append(round(price, 2))
batch_data.append(prices)
print(f"\n批量预测 {len(batch_data)} 只股票...")
start_time = time.time()
result = model.predict(batch_data=batch_data)
elapsed_time = (time.time() - start_time) * 1000
print(f"批量预测完成,总耗时: {elapsed_time:.2f}ms")
print(f"平均每只股票: {elapsed_time/len(batch_data):.2f}ms")
print(f"成功数: {result['success_count']}/{result['total_count']}")
for i, stock_result in enumerate(result['batch_results']):
if stock_result.get('success'):
stock_name = stock_names[i]
current = stock_result['current_price']
pred_tomorrow = stock_result['predictions'][0]
change = stock_result['daily_changes_percent'][0]
print(f"\n{stock_name}:")
print(f" 当前价: {current:.2f}")
print(f" 明日预测: {pred_tomorrow:.2f} ({change:+.2f}%)")
if __name__ == "__main__":
print("=" * 50)
print("LSTM股价预测模型测试")
print("=" * 50)
# 测试单条预测
test_single_prediction()
print("\n" + "=" * 50)
# 测试批量预测
test_batch_prediction()
运行这个测试脚本,你会看到详细的预测结果。如果一切正常,输出应该类似这样:
==================================================
LSTM股价预测模型测试
==================================================
模型信息: lstm-stock-predictor (UID: stock-lstm-v1)
输入数据(最近30天收盘价):
前5天: [100.85, 101.32, 99.78, 102.45, 101.93]
中间5天: [107.12, 108.45, 106.89, 109.23, 110.56]
最后5天: [113.45, 114.78, 115.32, 116.89, 117.45]
最新价格: 117.45
正在调用模型预测...
预测完成,耗时: 23.45ms
预测状态: 成功
=== 预测结果 ===
当前价格: 117.45
未来5天预测价格:
D+1: 118.23 (+0.66%)
D+2: 119.12 (+1.42%)
D+3: 119.89 (+2.08%)
D+4: 120.45 (+2.55%)
D+5: 121.23 (+3.22%)
交易信号: 建议买入
模型置信度: 82.00%
5.3 用HTTP API直接调用
除了Python客户端,你也可以直接用HTTP API调用模型,这对于其他语言(如JavaScript、Go)的系统集成特别有用:
# http_api_test.py
import requests
import json
def test_http_api():
"""测试HTTP API调用"""
# API端点
url = "http://localhost:9997/v1/models/stock-lstm-v1/predict"
# 准备请求数据
headers = {"Content-Type": "application/json"}
# 测试数据
test_prices = [100 + i * 0.5 for i in range(30)]
# 单条预测
payload_single = {
"input_data": test_prices
}
print("测试单条预测API...")
response = requests.post(url, json=payload_single, headers=headers, timeout=30)
if response.status_code == 200:
result = response.json()
print(f"状态码: {response.status_code}")
print(f"响应时间: {response.elapsed.total_seconds()*1000:.2f}ms")
print(f"预测结果: {json.dumps(result, indent=2, ensure_ascii=False)}")
else:
print(f"请求失败: {response.status_code}")
print(f"错误信息: {response.text}")
# 批量预测
print("\n测试批量预测API...")
batch_data = [
test_prices,
[95 + i * 0.3 for i in range(30)],
[105 + i * 0.7 for i in range(30)]
]
payload_batch = {
"batch_data": batch_data
}
response = requests.post(url, json=payload_batch, headers=headers, timeout=30)
if response.status_code == 200:
result = response.json()
print(f"批量预测成功数: {result.get('success_count', 0)}/{result.get('total_count', 0)}")
else:
print(f"批量预测失败: {response.status_code}")
if __name__ == "__main__":
test_http_api()
6. 实际应用:集成到量化交易系统
6.1 简单的交易信号生成器
现在模型已经部署好了,让我们看看如何把它集成到实际的交易系统中:
# trading_system.py
import requests
import json
import pandas as pd
from datetime import datetime, timedelta
import time
from typing import List, Dict, Optional
class LSTMStockTradingSystem:
"""基于LSTM预测的交易系统"""
def __init__(self, xinference_url: str = "http://localhost:9997",
model_uid: str = "stock-lstm-v1"):
self.base_url = xinference_url
self.model_uid = model_uid
self.api_url = f"{self.base_url}/v1/models/{self.model_uid}/predict"
def fetch_stock_data(self, ticker: str, days: int = 30) -> Optional[List[float]]:
"""
获取股票历史数据(这里用模拟数据,实际中替换为真实数据源)
Args:
ticker: 股票代码
days: 需要获取的天数
Returns:
最近days天的收盘价列表
"""
# 这里应该是从数据库或API获取真实数据
# 为了演示,我们生成一些模拟数据
import numpy as np
# 模拟不同股票的基本价格
base_prices = {
"SH600519": 1800, # 茅台
"SZ000858": 70, # 五粮液
"SH601318": 45, # 中国平安
"SZ000002": 25, # 万科
"SH600036": 35 # 招商银行
}
base_price = base_prices.get(ticker, 50)
# 生成有趋势和波动的价格序列
prices = []
for i in range(days):
trend = i * 0.1 # 缓慢上涨趋势
noise = np.random.randn() * 2 # 随机波动
price = base_price + trend + noise
prices.append(round(price, 2))
return prices
def generate_trading_signal(self, ticker: str) -> Dict:
"""
为单个股票生成交易信号
Returns:
包含信号和详细信息的字典
"""
print(f"正在分析 {ticker}...")
# 1. 获取历史数据
historical_prices = self.fetch_stock_data(ticker, days=30)
if not historical_prices or len(historical_prices) < 30:
return {"error": f"无法获取{ticker}的足够历史数据"}
current_price = historical_prices[-1]
print(f" 当前价格: {current_price:.2f}")
print(f" 历史数据范围: {min(historical_prices):.2f} - {max(historical_prices):.2f}")
# 2. 调用LSTM模型预测
try:
response = requests.post(
self.api_url,
json={"input_data": historical_prices},
timeout=10
)
if response.status_code != 200:
return {"error": f"模型预测失败: {response.text}"}
prediction = response.json()
if not prediction.get("success"):
return {"error": f"预测返回失败: {prediction.get('error', '未知错误')}"}
# 3. 解析预测结果
predicted_prices = prediction["predictions"]
next_day_pred = predicted_prices[0]
confidence = prediction["model_info"]["confidence_score"]
print(f" 明日预测: {next_day_pred:.2f}")
print(f" 模型置信度: {confidence:.2%}")
# 4. 生成交易信号(简单的规则)
price_change_pct = (next_day_pred - current_price) / current_price * 100
signal = "HOLD"
signal_strength = "NEUTRAL"
if price_change_pct > 3.0:
signal = "STRONG_BUY"
signal_strength = "STRONG"
elif price_change_pct > 1.5:
signal = "BUY"
signal_strength = "MODERATE"
elif price_change_pct > 0.5:
signal = "WEAK_BUY"
signal_strength = "WEAK"
elif price_change_pct < -3.0:
signal = "STRONG_SELL"
signal_strength = "STRONG"
elif price_change_pct < -1.5:
signal = "SELL"
signal_strength = "MODERATE"
elif price_change_pct < -0.5:
signal = "WEAK_SELL"
signal_strength = "WEAK"
# 5. 计算风险指标
historical_volatility = np.std(historical_prices) / np.mean(historical_prices) * 100
support_level = np.percentile(historical_prices, 25) # 25%分位数作为支撑位
resistance_level = np.percentile(historical_prices, 75) # 75%分位数作为阻力位
return {
"ticker": ticker,
"timestamp": datetime.now().isoformat(),
"current_price": current_price,
"predicted_next_price": next_day_pred,
"predicted_change_pct": round(price_change_pct, 2),
"signal": signal,
"signal_strength": signal_strength,
"confidence": confidence,
"risk_indicators": {
"historical_volatility": round(historical_volatility, 2),
"support_level": round(support_level, 2),
"resistance_level": round(resistance_level, 2),
"distance_to_support": round(((current_price - support_level) / current_price * 100), 2),
"distance_to_resistance": round(((resistance_level - current_price) / current_price * 100), 2)
},
"prediction_details": {
"input_window": historical_prices,
"predicted_5day": predicted_prices,
"prediction_days": prediction["prediction_days"]
}
}
except requests.exceptions.Timeout:
return {"error": "预测请求超时"}
except requests.exceptions.RequestException as e:
return {"error": f"网络错误: {str(e)}"}
except Exception as e:
return {"error": f"处理错误: {str(e)}"}
def batch_analyze(self, tickers: List[str]) -> Dict:
"""
批量分析多个股票
Args:
tickers: 股票代码列表
Returns:
批量分析结果
"""
print(f"开始批量分析 {len(tickers)} 只股票...")
print("=" * 60)
results = []
start_time = time.time()
for ticker in tickers:
result = self.generate_trading_signal(ticker)
results.append(result)
if "error" not in result:
print(f"{ticker}: {result['signal']} ({result['predicted_change_pct']:+.2f}%)")
else:
print(f"{ticker}: 分析失败 - {result['error']}")
elapsed_time = time.time() - start_time
# 统计信号分布
signals = [r.get("signal", "ERROR") for r in results if "error" not in r]
signal_counts = {}
for signal in signals:
signal_counts[signal] = signal_counts.get(signal, 0) + 1
print("\n" + "=" * 60)
print(f"分析完成,耗时: {elapsed_time:.2f}秒")
print(f"平均每只股票: {elapsed_time/len(tickers):.2f}秒")
print(f"\n信号分布:")
for signal, count in signal_counts.items():
print(f" {signal}: {count}只")
return {
"analysis_time": elapsed_time,
"total_stocks": len(tickers),
"successful_analysis": len([r for r in results if "error" not in r]),
"signal_distribution": signal_counts,
"detailed_results": results
}
# 使用示例
if __name__ == "__main__":
# 初始化交易系统
trading_system = LSTMStockTradingSystem()
# 测试单只股票
print("单只股票分析示例:")
print("-" * 40)
single_result = trading_system.generate_trading_signal("SH600519")
if "error" not in single_result:
print(f"\n股票: {single_result['ticker']}")
print(f"时间: {single_result['timestamp']}")
print(f"当前价: {single_result['current_price']:.2f}")
print(f"预测价: {single_result['predicted_next_price']:.2f}")
print(f"预测涨跌: {single_result['predicted_change_pct']:+.2f}%")
print(f"交易信号: {single_result['signal']} ({single_result['signal_strength']})")
print(f"置信度: {single_result['confidence']:.2%}")
print(f"\n风险指标:")
risk = single_result['risk_indicators']
print(f" 历史波动率: {risk['historical_volatility']}%")
print(f" 支撑位: {risk['support_level']:.2f} (距离: {risk['distance_to_support']}%)")
print(f" 阻力位: {risk['resistance_level']:.2f} (距离: {risk['distance_to_resistance']}%)")
# 测试批量分析
print("\n" + "=" * 60)
print("批量股票分析示例:")
print("-" * 40)
watchlist = ["SH600519", "SZ000858", "SH601318", "SZ000002", "SH600036"]
batch_result = trading_system.batch_analyze(watchlist)
# 保存结果到JSON文件
with open("trading_signals.json", "w", encoding="utf-8") as f:
json.dump(batch_result, f, indent=2, ensure_ascii=False)
print(f"\n详细结果已保存到: trading_signals.json")
这个交易系统展示了如何将Xinference部署的LSTM模型集成到实际业务中。它提供了:
- 从数据源获取股票数据
- 调用模型进行预测
- 基于预测结果生成交易信号
- 计算风险指标
- 支持批量分析
6.2 定时任务与自动化
在实际应用中,你可能希望定时运行预测任务。这里是一个简单的定时任务示例:
# scheduler.py
import schedule
import time
from trading_system import LSTMStockTradingSystem
import json
from datetime import datetime
def daily_market_analysis():
"""每日市场分析任务"""
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始每日市场分析...")
trading_system = LSTMStockTradingSystem()
# 你的股票观察列表
watchlist = [
"SH600519", "SZ000858", "SH601318",
"SZ000002", "SH600036", "SH600276",
"SZ000333", "SH600887", "SH600309"
]
# 执行批量分析
results = trading_system.batch_analyze(watchlist)
# 生成报告
report = {
"analysis_date": datetime.now().strftime("%Y-%m-%d"),
"analysis_time": datetime.now().strftime("%H:%M:%S"),
"summary": {
"total_stocks": results["total_stocks"],
"successful": results["successful_analysis"],
"buy_signals": sum(1 for r in results["detailed_results"]
if "error" not in r and "BUY" in r.get("signal", "")),
"sell_signals": sum(1 for r in results["detailed_results"]
if "error" not in r and "SELL" in r.get("signal", "")),
"hold_signals": sum(1 for r in results["detailed_results"]
if "error" not in r and r.get("signal") == "HOLD")
},
"top_picks": [],
"detailed_results": results["detailed_results"]
}
# 找出推荐买入的股票
for result in results["detailed_results"]:
if "error" not in result and "BUY" in result.get("signal", ""):
report["top_picks"].append({
"ticker": result["ticker"],
"signal": result["signal"],
"current_price": result["current_price"],
"predicted_change": result["predicted_change_pct"],
"confidence": result["confidence"]
})
# 按预测涨幅排序
report["top_picks"].sort(key=lambda x: x["predicted_change"], reverse=True)
# 保存报告
filename = f"reports/market_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"分析完成,报告已保存到: {filename}")
# 打印简要摘要
print(f"\n📊 今日市场摘要:")
print(f" 分析股票数: {report['summary']['total_stocks']}")
print(f" 买入信号: {report['summary']['buy_signals']}")
print(f" 卖出信号: {report['summary']['sell_signals']}")
print(f" 持有信号: {report['summary']['hold_signals']}")
if report["top_picks"]:
print(f"\n🏆 今日推荐:")
for i, stock in enumerate(report["top_picks"][:3], 1):
print(f" {i}. {stock['ticker']}: {stock['signal']} "
f"(预测涨幅: {stock['predicted_change']:+.2f}%, "
f"置信度: {stock['confidence']:.2%})")
def main():
"""主调度程序"""
print("LSTM量化分析系统启动...")
print(f"当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 设置定时任务
# 每个交易日收盘后运行(下午3:30)
schedule.every().day.at("15:30").do(daily_market_analysis)
# 也可以每小时运行一次(用于测试)
# schedule.every().hour.do(daily_market_analysis)
print("定时任务已设置:")
print(" - 每日 15:30: 市场分析")
# 立即运行一次(用于测试)
daily_market_analysis()
# 保持程序运行
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
if __name__ == "__main__":
# 创建报告目录
import os
os.makedirs("reports", exist_ok=True)
main()
7. 模型优化与生产部署建议
7.1 性能优化技巧
当你的模型需要服务大量请求时,可以考虑以下优化:
# optimized_inference.py
import torch
import numpy as np
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor
import threading
class OptimizedLSTMPredictor:
"""优化版的LSTM预测器,支持批量推理和缓存"""
def __init__(self, model_path: str, max_batch_size: int = 32):
self.model_path = model_path
self.max_batch_size = max_batch_size
self.model = None
self.scaler_params = None
self._lock = threading.Lock()
self._load_model()
# 缓存最近的结果(LRU缓存)
self.cache = {}
self.cache_size = 100
def _load_model(self):
"""加载模型(线程安全)"""
with self._lock:
if self.model is None:
print("正在加载优化版模型...")
checkpoint = torch.load(self.model_path, map_location='cpu')
from lstm_model import StockPredictorLSTM
model_config = checkpoint['model_config']
self.model = StockPredictorLSTM(
input_size=model_config['input_size'],
hidden_size=model_config['hidden_size'],
num_layers=model_config['num_layers'],
output_size=model_config['output_size']
)
self.model.load_state_dict(checkpoint['model_state_dict'])
self.model.eval()
# 如果可用,使用GPU
if torch.cuda.is_available():
self.model = self.model.cuda()
print("模型已加载到GPU")
else:
print("模型运行在CPU上")
self.scaler_params = checkpoint['scaler_params']
print("模型加载完成")
def _get_cache_key(self, prices: List[float]) -> str:
"""生成缓存键"""
# 使用价格的哈希值作为缓存键
return hash(tuple(prices))
def predict_batch_optimized(self, batch_data: List[List[float]]) -> List[Dict[str, Any]]:
"""
优化的批量预测,一次处理整个批次
Args:
batch_data: 多个价格序列的列表
Returns:
预测结果列表
"""
if not batch_data:
return []
# 检查缓存
results = []
need_prediction = []
cache_keys = []
for prices in batch_data:
cache_key = self._get_cache_key(prices)
cache_keys.append(cache_key)
if cache_key in self.cache:
results.append(self.cache[cache_key])
else:
need_prediction.append(prices)
results.append(None) # 占位符
# 如果没有需要预测的,直接返回缓存结果
if not need_prediction:
return results
# 批量归一化
normalized_batch = []
for prices in need_prediction:
normalized = (np.array(prices) - self.scaler_params['data_min']) / self.scaler_params['data_range']
normalized_batch.append(normalized)
# 转换为张量
batch_tensor = torch.FloatTensor(np.array(normalized_batch))
# 移动到GPU(如果可用)
if torch.cuda.is_available():
batch_tensor = batch_tensor.cuda()
# 批量推理
with torch.no_grad():
predictions_normalized = self.model(batch_tensor).cpu().numpy()
# 反归一化并构建结果
pred_idx = 0
final_results = []
for i, cache_key in enumerate(cache_keys):
if results[i] is not None:
# 使用缓存结果
final_results.append(results[i])
else:
# 新预测的结果
pred_normalized = predictions_normalized[pred_idx]
pred_original = pred_normalized * self.scaler_params['data_range'] + self.scaler_params['data_min']
current_price = batch_data[i][-1]
pred_prices = pred_original.tolist()
changes = [(p - current_price) / current_price * 100 for p in pred_prices]
result = {
"success": True,
"input_prices": batch_data[i],
"current_price": current_price,
"predictions": pred_prices,
"daily_changes_percent": [round(c, 2) for c in changes],
"prediction_days": ["D+1", "D+2", "D+3", "D+4", "D+5"],
"from_cache": False
}
# 存入缓存
if len(self.cache) >= self.cache_size:
# 简单的LRU:移除第一个键
first_key = next(iter(self.cache))
del self.cache[first_key]
self.cache[cache_key] = result
final_results.append(result)
pred_idx += 1
return final_results
def parallel_predict(self, stock_data_list: List[Dict]) -> List[Dict]:
"""
并行预测多个股票(使用线程池)
Args:
stock_data_list: 每个元素是包含ticker和prices的字典
Returns:
包含ticker和预测结果的列表
"""
results = []
# 按批次大小分组
batch_size = self.max_batch_size
batches = [stock_data_list[i:i + batch_size]
for i in range(0, len(stock_data_list), batch_size)]
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for batch in batches:
# 提取价格数据
batch_prices = [item["prices"] for item in batch]
# 提交批量预测任务
future = executor.submit(self.predict_batch_optimized, batch_prices)
futures.append((batch, future))
# 收集结果
for original_batch, future in futures:
batch_results = future.result()
for item, result in zip(original_batch, batch_results):
results.append({
"ticker": item["ticker"],
"result": result,
"timestamp": datetime.now().isoformat()
})
return results
# 使用优化版的预测器
if __name__ == "__main__":
# 初始化优化版预测器
predictor = OptimizedLSTMPredictor("models/lstm_stock_predictor.pt", max_batch_size=16)
# 准备测试数据
test_stocks = []
for i in range(50): # 测试50只股票
ticker = f"TEST{i:03d}"
base_price = 50 + i * 2
prices = [base_price + day * 0.5 + np.random.randn() * 2 for day in range(30)]
test_stocks.append({"ticker": ticker, "prices": prices})
# 测试并行预测
print("开始并行批量预测...")
start_time = time.time()
results = predictor.parallel_predict(test_stocks)
elapsed_time = time.time() - start_time
print(f"预测完成,总共 {len(test_stocks)} 只股票")
print(f"总耗时: {elapsed_time:.2f}秒")
print(f"平均每只股票: {elapsed_time/len(test_stocks)*1000:.2f}毫秒")
# 统计缓存命中率
cached = sum(1 for r in results if r["result"].get("from_cache", False))
print(f"缓存命中: {cached}/{len(results)} ({cached/len(results)*100:.1f}%)")
7.2 生产环境部署建议
- 使用GPU加速:
# 启动支持GPU的容器
docker run -d \
--name xinference-lstm-gpu \
--gpus all \
-p 9998:9997 \
-v /path/to/models:/root/.xinference/models \
xprobe/xinference:v1.17.1-cu124 \
xinference-local -H 0.0.0.0 --log-level info
- 启用监控和日志:
# 启动时启用Prometheus指标
docker run -d \
--name xinference-monitored \
-p 9997:9997 \
-p 9090:9090 \ # Prometheus metrics端口
-v /path/to/models:/root/.xinference/models \
xprobe/xinference:v1.17.1 \
xinference-local -H 0.0.0.0 --log-level info --metrics
- 使用负载均衡(多实例部署):
# 启动多个实例
docker run -d --name xinference-1 -p 9997:9997 ...
docker run -d --name xinference-2 -p 9998:9997 ...
docker run -d --name xinference-3 -p 9999:9997 ...
# 使用Nginx做负载均衡
# nginx.conf配置示例:
# upstream xinference_servers {
# server localhost:9997;
# server localhost:9998;
# server localhost:9999;
# }
- 模型版本管理:
# 注册不同版本的模型
xinference register -f models/lstm_stock_predictor_v1.json --persist
xinference register -f models/lstm_stock_predictor_v2.json --persist
# 同时运行多个版本
xinference launch --model-name "lstm-stock-predictor" --model-type "flexible" --model-uid "lstm-v1"
xinference launch --model-name "lstm-stock-predictor" --model-type "flexible" --model-uid "lstm-v2"
# A/B测试
def ab_test_prediction(prices):
client = Client("http://localhost:9997")
model_v1 = client.get_model("lstm-v1")
model_v2 = client.get_model("lstm-v2")
pred_v1 = model_v1.predict(input_data=prices)
pred_v2 = model_v2.predict(input_data=prices)
# 比较两个版本的结果
return {"v1": pred_v1, "v2": pred_v2, "diff": abs(pred_v1["predictions"][0] - pred_v2["predictions"][0])}
8. 总结:从实验到生产的完整路径
通过这个完整的教程,你应该已经掌握了用Xinference部署LSTM模型进行金融数据分析的全流程。让我们回顾一下关键步骤:
8.1 核心收获
-
简化部署流程:传统方式需要自己写API服务、处理并发、管理模型版本,现在只需要一个JSON配置文件和Python包装器。
-
统一接口:无论什么模型(LSTM、XGBoost、Prophet),都通过相同的REST API调用,大大降低了系统集成复杂度。
-
生产就绪:Xinference提供了监控、日志、多实例支持等生产级功能,让你的模型服务更加稳定可靠。
-
灵活扩展:可以轻松进行A/B测试、模型热更新、批量推理优化,满足实际业务需求。
8.2 实际应用价值
对于金融数据分析工作来说,这套方案带来的最大价值是:
- 快速迭代:策略研究员可以专注于模型优化,部署交给Xinference
- 降低门槛:不需要深厚的后端开发经验,也能构建稳定的预测服务
- 资源优化:GPU资源得到充分利用,支持高并发推理
- 易于维护:模型更新就像替换文件一样简单,无需重启服务
8.3 下一步建议
如果你已经成功部署了第一个LSTM模型,可以尝试:
- 尝试其他模型:用同样的方法部署XGBoost、LightGBM等模型,比较不同模型的效果
- 特征工程优化:在LSTM基础上加入技术指标(MACD、RSI、布林带等)作为额外特征
- 多模型集成:部署多个不同类型的模型,通过投票或加权平均得到最终预测
- 实时数据流:接入实时行情数据,实现分钟级甚至秒级的预测更新
- 回测系统集成:将预测结果与回测系统结合,自动评估策略效果
金融市场的预测从来都不是一件容易的事,但好的工具可以让我们更专注于策略本身,而不是基础设施。Xinference正是这样一个工具,它让模型部署从“工程项目”变成了“配置工作”,让研究人员能更快地验证想法,更快地迭代策略。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。
更多推荐
所有评论(0)