通过API获取与展示印尼股票数据的实战指南
摘要:印尼股票数据API获取与可视化实战指南 本文详细介绍了如何通过标准化API构建印尼股票数据获取与可视化系统。针对雅加达综合指数(JKSE)数据获取的挑战,文章提出完整的技术架构方案,包括数据获取层(HTTP/WebSocket)、处理层(Pandas)、存储层(PostgreSQL)和可视化层(Plotly)。重点演示了API客户端封装方法,包含认证配置、股票列表获取和实时行情查询功能实现,
·
通过API获取与展示印尼股票数据的实战指南
引言:印尼股市的数据价值与获取挑战
印度尼西亚证券市场(IDX)作为东南亚最具活力的资本市场之一,近年来吸引了众多国际投资者的目光。雅加达综合指数(JKSE)在过去三年中表现稳健,年化波动率约为18%,为量化策略提供了丰富的交易机会。然而,对于开发者而言,直接获取印尼股票的实时行情、历史K线和技术指标数据面临诸多挑战:交易所官方接口申请流程复杂、数据格式不统一、延迟较高以及成本问题。
本文将详细介绍如何通过标准化的数据接口,构建一个完整的印尼股票数据获取与可视化系统。我们将从环境配置、接口调用到数据展示,提供完整的代码实现。
一、技术架构设计
1.1 系统架构概览
数据获取层 → 数据处理层 → 存储层 → 可视化层
↓ ↓ ↓ ↓
HTTP/WebSocket 数据清洗 数据库 图表库
↓ ↓ ↓ ↓
数据API Pandas PostgreSQL Plotly
1.2 技术栈选择
- 数据获取: Requests (HTTP)、websocket-client (实时)
- 数据处理: Pandas、NumPy
- 数据存储: PostgreSQL (关系型) / Redis (缓存)
- 可视化: Plotly、Dash
- 调度任务: APScheduler
- 部署: Docker、FastAPI
二、API接口对接实战
2.1 认证与配置管理
# config.py - 配置文件
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class APIConfig:
"""API配置类"""
base_url: str = "https://api.financial-data.com"
ws_url: str = "wss://ws.financial-data.com"
api_key: Optional[str] = None
timeout: int = 30
max_retries: int = 3
def __post_init__(self):
# 优先从环境变量读取API Key
self.api_key = os.getenv('STOCK_API_KEY', self.api_key)
def validate(self) -> bool:
"""验证配置完整性"""
if not self.api_key:
raise ValueError("API Key未配置,请设置STOCK_API_KEY环境变量")
if len(self.api_key) != 32:
raise ValueError("API Key格式不正确")
return True
# 配置实例化
config = APIConfig()
2.2 基础数据接口封装
# api_client.py - API客户端
import requests
import pandas as pd
import time
from typing import Dict, List, Any, Optional
from functools import lru_cache
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class IndonesiaStockAPI:
"""印尼股票数据API客户端"""
def __init__(self, config: APIConfig):
self.config = config
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'IndonesiaStockData/1.0',
'Accept': 'application/json'
})
def _make_request(self, endpoint: str, params: Dict) -> Dict:
"""执行API请求,包含重试机制"""
url = f"{self.config.base_url}/{endpoint}"
params['apikey'] = self.config.api_key
for attempt in range(self.config.max_retries):
try:
response = self.session.get(
url,
params=params,
timeout=self.config.timeout
)
response.raise_for_status()
data = response.json()
# 检查API返回状态
if data.get('code') != 200:
error_msg = data.get('message', '未知错误')
logger.error(f"API错误: {error_msg}")
raise ValueError(f"API返回错误: {error_msg}")
return data.get('data', {})
except requests.exceptions.RequestException as e:
logger.warning(f"请求失败 (尝试 {attempt+1}/{self.config.max_retries}): {e}")
if attempt < self.config.max_retries - 1:
time.sleep(2 ** attempt) # 指数退避
else:
raise
return {}
@lru_cache(maxsize=128)
def get_stock_list(self, market: str = "IDX") -> pd.DataFrame:
"""
获取印尼股票列表
Args:
market: 市场类型,IDX(主板)或IDX-B(发展板)
Returns:
DataFrame包含股票代码、名称、行业等信息
"""
params = {
'country': 'ID',
'market': market,
'fields': 'symbol,name,sector,listing_date,currency'
}
data = self._make_request("equities/list", params)
if not data:
return pd.DataFrame()
df = pd.DataFrame(data)
# 数据清洗
df['listing_date'] = pd.to_datetime(df['listing_date'])
df['currency'] = 'IDR'
return df
def get_realtime_quotes(self, symbols: List[str]) -> pd.DataFrame:
"""
获取实时行情数据
Args:
symbols: 股票代码列表,如 ['BBCA', 'TLKM', 'BBRI']
Returns:
DataFrame包含实时价格、成交量、涨跌幅等
"""
params = {
'symbols': ','.join(symbols),
'fields': 'last,change,change_pct,volume,open,high,low,prev_close'
}
data = self._make_request("quotes/realtime", params)
if not data:
return pd.DataFrame()
quotes = []
for symbol, quote_data in data.items():
quote_data['symbol'] = symbol
quotes.append(quote_data)
return pd.DataFrame(quotes)
def get_historical_data(self,
symbol: str,
start_date: str,
end_date: str,
interval: str = "1d") -> pd.DataFrame:
"""
获取历史K线数据
Args:
symbol: 股票代码
start_date: 开始日期 (YYYY-MM-DD)
end_date: 结束日期 (YYYY-MM-DD)
interval: 时间间隔 (1d, 1h, 5m)
Returns:
DataFrame包含OHLCV数据
"""
params = {
'symbol': symbol,
'from': start_date,
'to': end_date,
'interval': interval
}
data = self._make_request("quotes/historical", params)
if not data:
return pd.DataFrame()
df = pd.DataFrame(data['candles'])
df['time'] = pd.to_datetime(df['time'], unit='ms')
df.set_index('time', inplace=True)
# 重命名列
df.columns = ['open', 'high', 'low', 'close', 'volume']
return df
2.3 WebSocket实时数据订阅
# websocket_client.py - 实时数据客户端
import websocket
import json
import threading
import queue
from datetime import datetime
import logging
class RealtimeDataClient:
"""WebSocket实时数据客户端"""
def __init__(self, config: APIConfig):
self.config = config
self.ws = None
self.data_queue = queue.Queue()
self.connected = False
self.subscriptions = set()
def on_message(self, ws, message):
"""处理接收到的消息"""
try:
data = json.loads(message)
message_type = data.get('type', '')
if message_type == 'ticker':
# 实时行情数据
ticker_data = {
'symbol': data.get('s'),
'timestamp': datetime.fromtimestamp(data.get('t')/1000),
'price': data.get('p'),
'volume': data.get('v'),
'bid': data.get('b'),
'ask': data.get('a'),
'change': data.get('c'),
'change_pct': data.get('cp')
}
self.data_queue.put(('ticker', ticker_data))
elif message_type == 'trade':
# 逐笔成交数据
trade_data = {
'symbol': data.get('s'),
'timestamp': datetime.fromtimestamp(data.get('t')/1000),
'price': data.get('p'),
'quantity': data.get('q'),
'side': 'buy' if data.get('sd') == 1 else 'sell'
}
self.data_queue.put(('trade', trade_data))
except json.JSONDecodeError as e:
logger.error(f"JSON解析错误: {e}")
def on_error(self, ws, error):
logger.error(f"WebSocket错误: {error}")
def on_close(self, ws, close_status_code, close_msg):
logger.info(f"连接关闭: {close_msg}")
self.connected = False
def on_open(self, ws):
logger.info("WebSocket连接已建立")
self.connected = True
# 重新订阅之前的股票
if self.subscriptions:
self.subscribe(list(self.subscriptions))
# 启动心跳线程
threading.Thread(target=self._heartbeat, daemon=True).start()
def _heartbeat(self):
"""发送心跳包保持连接"""
while self.connected:
try:
self.ws.send(json.dumps({'type': 'ping'}))
time.sleep(30)
except:
break
def connect(self):
"""建立WebSocket连接"""
self.ws = websocket.WebSocketApp(
self.config.ws_url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
# 启动连接线程
wst = threading.Thread(target=self.ws.run_forever)
wst.daemon = True
wst.start()
def subscribe(self, symbols: List[str]):
"""订阅股票实时数据"""
if not self.connected:
raise ConnectionError("WebSocket未连接")
subscription_msg = {
'type': 'subscribe',
'symbols': symbols,
'channels': ['ticker', 'trade']
}
self.ws.send(json.dumps(subscription_msg))
self.subscriptions.update(symbols)
def get_latest_data(self, timeout: float = 1.0):
"""从队列获取最新数据"""
try:
return self.data_queue.get(timeout=timeout)
except queue.Empty:
return None
三、数据处理与存储
3.1 数据清洗与标准化
# data_processor.py - 数据处理
import pandas as pd
import numpy as np
from typing import Tuple, Dict
import talib
class StockDataProcessor:
"""股票数据处理类"""
@staticmethod
def clean_ohlc_data(df: pd.DataFrame) -> pd.DataFrame:
"""
清洗OHLC数据
Args:
df: 原始K线DataFrame
Returns:
清洗后的DataFrame
"""
# 创建副本避免修改原始数据
df_clean = df.copy()
# 处理缺失值
df_clean = df_clean.fillna(method='ffill')
# 移除异常值(价格跳变超过20%)
returns = df_clean['close'].pct_change()
outliers = np.abs(returns) > 0.20
df_clean.loc[outliers] = np.nan
# 再次填充
df_clean = df_clean.fillna(method='bfill')
# 确保数据顺序
df_clean = df_clean.sort_index()
return df_clean
@staticmethod
def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""
计算技术指标
Args:
df: 包含OHLCV数据的DataFrame
Returns:
添加技术指标的DataFrame
"""
df_indicators = df.copy()
# 移动平均线
df_indicators['MA5'] = talib.SMA(df_indicators['close'], timeperiod=5)
df_indicators['MA20'] = talib.SMA(df_indicators['close'], timeperiod=20)
df_indicators['MA60'] = talib.SMA(df_indicators['close'], timeperiod=60)
# 布林带
df_indicators['BB_upper'], df_indicators['BB_middle'], df_indicators['BB_lower'] = \
talib.BBANDS(df_indicators['close'], timeperiod=20)
# RSI
df_indicators['RSI'] = talib.RSI(df_indicators['close'], timeperiod=14)
# MACD
df_indicators['MACD'], df_indicators['MACD_signal'], df_indicators['MACD_hist'] = \
talib.MACD(df_indicators['close'])
# 成交量指标
df_indicators['volume_MA20'] = talib.SMA(df_indicators['volume'], timeperiod=20)
return df_indicators
@staticmethod
def detect_market_regime(df: pd.DataFrame) -> pd.Series:
"""
识别市场状态
Returns:
市场状态序列 (trending, ranging)
"""
# 计算波动率
returns = df['close'].pct_change()
volatility = returns.rolling(window=20).std()
# 趋势强度
adx = talib.ADX(df['high'], df['low'], df['close'], timeperiod=14)
# 判断市场状态
market_regime = pd.Series('ranging', index=df.index)
market_regime[adx > 25] = 'trending'
market_regime[volatility > volatility.quantile(0.8)] = 'high_volatility'
return market_regime
3.2 数据库存储设计
# database.py - 数据库操作
import psycopg2
from psycopg2.extras import execute_values
import pandas as pd
from datetime import datetime
from typing import List, Dict
import json
class StockDatabase:
"""股票数据库管理类"""
def __init__(self, connection_string: str):
self.conn_string = connection_string
self._create_tables()
def _get_connection(self):
"""获取数据库连接"""
return psycopg2.connect(self.conn_string)
def _create_tables(self):
"""创建数据表"""
create_tables_sql = """
-- 股票基本信息表
CREATE TABLE IF NOT EXISTS stocks (
symbol VARCHAR(20) PRIMARY KEY,
name VARCHAR(200),
sector VARCHAR(100),
listing_date DATE,
currency CHAR(3),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 日线数据表
CREATE TABLE IF NOT EXISTS daily_bars (
id SERIAL PRIMARY KEY,
symbol VARCHAR(20) REFERENCES stocks(symbol),
date DATE,
open DECIMAL(12, 4),
high DECIMAL(12, 4),
low DECIMAL(12, 4),
close DECIMAL(12, 4),
volume BIGINT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(symbol, date)
);
-- 实时行情表
CREATE TABLE IF NOT EXISTS realtime_quotes (
id SERIAL PRIMARY KEY,
symbol VARCHAR(20),
timestamp TIMESTAMP,
price DECIMAL(12, 4),
volume INTEGER,
bid DECIMAL(12, 4),
ask DECIMAL(12, 4),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建索引
CREATE INDEX IF NOT EXISTS idx_daily_bars_symbol_date
ON daily_bars(symbol, date);
CREATE INDEX IF NOT EXISTS idx_realtime_quotes_symbol_timestamp
ON realtime_quotes(symbol, timestamp);
"""
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute(create_tables_sql)
conn.commit()
def save_daily_data(self, symbol: str, df: pd.DataFrame):
"""保存日线数据"""
if df.empty:
return
# 准备数据
records = []
for idx, row in df.iterrows():
records.append((
symbol,
idx.date(),
float(row['open']),
float(row['high']),
float(row['low']),
float(row['close']),
int(row['volume'])
))
# 插入数据
insert_sql = """
INSERT INTO daily_bars (symbol, date, open, high, low, close, volume)
VALUES %s
ON CONFLICT (symbol, date) DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
volume = EXCLUDED.volume
"""
with self._get_connection() as conn:
with conn.cursor() as cur:
execute_values(cur, insert_sql, records)
conn.commit()
def get_historical_data(self, symbol: str,
start_date: str,
end_date: str) -> pd.DataFrame:
"""获取历史数据"""
query = """
SELECT date, open, high, low, close, volume
FROM daily_bars
WHERE symbol = %s
AND date >= %s
AND date <= %s
ORDER BY date
"""
with self._get_connection() as conn:
df = pd.read_sql_query(query, conn,
params=(symbol, start_date, end_date),
parse_dates=['date'])
df.set_index('date', inplace=True)
return df
四、数据可视化展示
4.1 交互式K线图表
# visualization.py - 数据可视化
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
import pandas as pd
import numpy as np
class StockVisualizer:
"""股票数据可视化类"""
@staticmethod
def create_candlestick_chart(df: pd.DataFrame,
symbol: str,
indicators: List[str] = None) -> go.Figure:
"""
创建K线图
Args:
df: 包含OHLCV数据的DataFrame
symbol: 股票代码
indicators: 要显示的技术指标列表
Returns:
Plotly图表对象
"""
if indicators is None:
indicators = ['MA20', 'BB_upper', 'BB_lower']
# 创建子图
fig = make_subplots(
rows=2,
cols=1,
shared_xaxes=True,
vertical_spacing=0.05,
row_heights=[0.7, 0.3],
subplot_titles=(f'{symbol} 价格走势', '成交量')
)
# 添加K线
fig.add_trace(
go.Candlestick(
x=df.index,
open=df['open'],
high=df['high'],
low=df['low'],
close=df['close'],
name='价格',
increasing_line_color='#26a69a',
decreasing_line_color='#ef5350'
),
row=1, col=1
)
# 添加技术指标
if 'MA20' in df.columns and 'MA20' in indicators:
fig.add_trace(
go.Scatter(
x=df.index,
y=df['MA20'],
name='MA20',
line=dict(color='orange', width=1)
),
row=1, col=1
)
# 添加成交量
colors = ['#26a69a' if close >= open_ else '#ef5350'
for close, open_ in zip(df['close'], df['open'])]
fig.add_trace(
go.Bar(
x=df.index,
y=df['volume'],
name='成交量',
marker_color=colors,
opacity=0.5
),
row=2, col=1
)
# 更新布局
fig.update_layout(
title=f'{symbol} 技术分析图表',
yaxis_title='价格 (IDR)',
yaxis2_title='成交量',
xaxis2_title='日期',
xaxis_rangeslider_visible=False,
height=700,
template='plotly_dark',
hovermode='x unified'
)
# 更新坐标轴
fig.update_xaxes(
rangeslider_thickness=0.05,
rangeselector=dict(
buttons=list([
dict(count=1, label="1m", step="month", stepmode="backward"),
dict(count=6, label="6m", step="month", stepmode="backward"),
dict(count=1, label="YTD", step="year", stepmode="todate"),
dict(count=1, label="1y", step="year", stepmode="backward"),
dict(step="all")
])
)
)
return fig
@staticmethod
def create_market_overview(stocks_data: Dict[str, pd.DataFrame]) -> go.Figure:
"""
创建市场概览仪表板
Args:
stocks_data: 多只股票数据字典
Returns:
组合图表
"""
# 创建子图
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('价格走势对比', '涨跌幅分布',
'成交量热力图', '相关性矩阵'),
specs=[[{"type": "scatter"}, {"type": "histogram"}],
[{"type": "heatmap"}, {"type": "heatmap"}]]
)
# 1. 价格走势对比 (归一化)
for symbol, df in stocks_data.items():
if not df.empty and 'close' in df.columns:
normalized_price = df['close'] / df['close'].iloc[0] * 100
fig.add_trace(
go.Scatter(
x=df.index,
y=normalized_price,
name=symbol,
mode='lines'
),
row=1, col=1
)
# 2. 涨跌幅分布
returns_data = []
for symbol, df in stocks_data.items():
if not df.empty and 'close' in df.columns:
daily_returns = df['close'].pct_change().dropna()
returns_data.extend(daily_returns.tolist())
fig.add_trace(
go.Histogram(
x=returns_data,
nbinsx=50,
name='涨跌幅分布',
marker_color='blue',
opacity=0.7
),
row=1, col=2
)
# 更新布局
fig.update_layout(
title='印尼股票市场概览',
height=800,
showlegend=True
)
return fig
4.2 实时监控仪表板
# dashboard.py - 实时监控仪表板
import dash
from dash import dcc, html, Input, Output
import dash_bootstrap_components as dbc
import plotly.graph_objects as go
from datetime import datetime, timedelta
import pandas as pd
def create_realtime_dashboard(api_client: IndonesiaStockAPI):
"""创建实时监控仪表板"""
app = dash.Dash(__name__,
external_stylesheets=[dbc.themes.DARKLY])
# 布局定义
app.layout = dbc.Container([
dbc.Row([
dbc.Col([
html.H1("印尼股票实时监控系统", className="text-center my-4"),
html.Hr()
])
]),
dbc.Row([
dbc.Col([
dbc.Card([
dbc.CardHeader("股票选择"),
dbc.CardBody([
dcc.Dropdown(
id='symbol-selector',
options=[
{'label': '中亚银行 (BBCA)', 'value': 'BBCA'},
{'label': '印尼电信 (TLKM)', 'value': 'TLKM'},
{'label': '人民银行 (BBRI)', 'value': 'BBRI'},
{'label': '阿斯特拉 (ASII)', 'value': 'ASII'},
{'label': 'GoTo (GOTO)', 'value': 'GOTO'}
],
value=['BBCA', 'TLKM'],
multi=True,
className="mb-3"
),
dbc.Button("更新数据",
id='update-button',
color="primary",
className="w-100")
])
], className="mb-4")
], width=3),
dbc.Col([
dbc.Card([
dbc.CardHeader("实时行情"),
dbc.CardBody([
dcc.Graph(id='price-chart'),
dcc.Interval(
id='interval-component',
interval=10*1000, # 10秒更新
n_intervals=0
)
])
])
], width=9)
]),
dbc.Row([
dbc.Col([
dbc.Card([
dbc.CardHeader("技术指标"),
dbc.CardBody([
dcc.Graph(id='technical-chart')
])
])
], width=6),
dbc.Col([
dbc.Card([
dbc.CardHeader("市场概览"),
dbc.CardBody([
html.Div(id='market-overview')
])
])
], width=6)
], className="mt-4")
], fluid=True)
# 回调函数
@app.callback(
[Output('price-chart', 'figure'),
Output('technical-chart', 'figure'),
Output('market-overview', 'children')],
[Input('update-button', 'n_clicks'),
Input('interval-component', 'n_intervals')],
[dash.dependencies.State('symbol-selector', 'value')]
)
def update_charts(n_clicks, n_intervals, selected_symbols):
"""更新图表数据"""
if not selected_symbols:
return go.Figure(), go.Figure(), "请选择股票"
# 获取实时数据
quotes = api_client.get_realtime_quotes(selected_symbols)
# 价格图表
price_fig = go.Figure()
for _, quote in quotes.iterrows():
price_fig.add_trace(go.Indicator(
mode="number+delta",
value=quote['last'],
delta={'reference': quote['prev_close']},
title={'text': quote['symbol']},
domain={'row': 0, 'column': quotes.index.get_loc(_)}
))
price_fig.update_layout(
grid={'rows': 1, 'columns': len(selected_symbols), 'pattern': "independent"},
height=200
)
# 技术图表
tech_fig = go.Figure()
# 这里可以添加更多技术分析图表
for symbol in selected_symbols[:3]: # 限制显示数量
try:
# 获取历史数据计算指标
hist_data = api_client.get_historical_data(
symbol,
(datetime.now() - timedelta(days=60)).strftime('%Y-%m-%d'),
datetime.now().strftime('%Y-%m-%d')
)
if not hist_data.empty:
# 计算RSI
hist_data['returns'] = hist_data['close'].pct_change()
tech_fig.add_trace(go.Scatter(
x=hist_data.index,
y=hist_data['returns'].rolling(20).std() * 100, # 波动率
name=f"{symbol}波动率",
mode='lines'
))
except Exception as e:
print(f"获取{symbol}数据失败: {e}")
tech_fig.update_layout(
title="波动率分析",
height=300
)
# 市场概览
overview_content = []
for _, quote in quotes.iterrows():
change_color = 'success' if quote['change'] >= 0 else 'danger'
overview_content.append(
dbc.Row([
dbc.Col(html.Strong(quote['symbol']), width=3),
dbc.Col(f"{quote['last']:,.0f}", width=3),
dbc.Col(
html.Span(
f"{quote['change']:+.2f} ({quote['change_pct']:+.2f}%)",
className=f"text-{change_color}"
),
width=6
)
], className="mb-2")
)
return price_fig, tech_fig, overview_content
return app
五、实战案例:构建完整的监控系统
5.1 主程序入口
# main.py - 主程序
import asyncio
import schedule
import time
from datetime import datetime
import logging
from typing import List
class IndonesiaStockMonitor:
"""印尼股票监控系统主类"""
def __init__(self, config_path: str = "config.yaml"):
self.config = self._load_config(config_path)
self.api_client = IndonesiaStockAPI(self.config.api_config)
self.db_client = StockDatabase(self.config.db_connection)
self.visualizer = StockVisualizer()
# 监控的股票列表
self.watchlist = ['BBCA', 'TLKM', 'BBRI', 'ASII', 'GOTO']
def _load_config(self, config_path: str):
"""加载配置文件"""
import yaml
with open(config_path, 'r') as f:
return yaml.safe_load(f)
def update_daily_data(self):
"""更新日线数据"""
logging.info(f"{datetime.now()}: 开始更新日线数据")
for symbol in self.watchlist:
try:
# 获取数据
end_date = datetime.now().strftime('%Y-%m-%d')
start_date = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')
df = self.api_client.get_historical_data(
symbol, start_date, end_date, interval="1d"
)
if not df.empty:
# 处理数据
processor = StockDataProcessor()
df_clean = processor.clean_ohlc_data(df)
df_with_indicators = processor.calculate_technical_indicators(df_clean)
# 保存到数据库
self.db_client.save_daily_data(symbol, df_with_indicators)
logging.info(f"已更新 {symbol} 的日线数据,共 {len(df)} 条记录")
except Exception as e:
logging.error(f"更新 {symbol} 数据失败: {e}")
def realtime_monitor(self):
"""实时监控任务"""
# 创建WebSocket客户端
ws_client = RealtimeDataClient(self.config.api_config)
ws_client.connect()
# 订阅股票
time.sleep(2) # 等待连接建立
ws_client.subscribe(self.watchlist)
logging.info("实时监控已启动")
try:
while True:
# 获取最新数据
data = ws_client.get_latest_data(timeout=0.5)
if data:
data_type, data_content = data
if data_type == 'ticker':
# 处理实时行情
self._process_realtime_ticker(data_content)
elif data_type == 'trade':
# 处理成交数据
self._process_trade_data(data_content)
# 每秒检查一次
time.sleep(1)
except KeyboardInterrupt:
logging.info("监控已停止")
def _process_realtime_ticker(self, ticker_data: dict):
"""处理实时行情数据"""
# 这里可以添加各种监控逻辑
# 例如:价格预警、异常波动检测等
symbol = ticker_data['symbol']
price = ticker_data['price']
change_pct = ticker_data['change_pct']
# 价格预警示例
if abs(change_pct) > 5: # 涨跌幅超过5%
alert_msg = f"预警: {symbol} 价格波动 {change_pct:.2f}%,当前价格 {price}"
self.send_alert(alert_msg)
# 保存到数据库
self.db_client.save_realtime_quote(ticker_data)
def send_alert(self, message: str):
"""发送警报"""
# 可以集成邮件、短信、Telegram等通知方式
logging.warning(f"警报: {message}")
# 示例:打印到控制台
print(f"[{datetime.now()}] {message}")
def run(self):
"""运行监控系统"""
logging.info("启动印尼股票监控系统")
# 启动定时任务
schedule.every().day.at("18:00").do(self.update_daily_data) # 每天更新
# 启动实时监控(在新线程中)
import threading
monitor_thread = threading.Thread(target=self.realtime_monitor, daemon=True)
monitor_thread.start()
# 主循环
try:
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次定时任务
except KeyboardInterrupt:
logging.info("系统正常退出")
# 配置示例 (config.yaml)
"""
api:
base_url: "https://api.financial-data.com"
ws_url: "wss://ws.financial-data.com"
api_key: "${STOCK_API_KEY}"
timeout: 30
max_retries: 3
database:
connection: "postgresql://user:password@localhost/indonesia_stocks"
monitoring:
watchlist: ["BBCA", "TLKM", "BBRI", "ASII", "GOTO"]
alert_threshold: 5.0 # 涨跌幅超过5%触发警报
update_interval: 3600 # 数据更新间隔(秒)
"""
5.2 部署与优化
# docker-compose.yml - 容器化部署
version: '3.8'
services:
postgres:
image: postgres:14
environment:
POSTGRES_DB: indonesia_stocks
POSTGRES_USER: stock_user
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
stock-api:
build: .
environment:
DATABASE_URL: postgresql://stock_user:${DB_PASSWORD}@postgres/indonesia_stocks
REDIS_URL: redis://redis:6379
STOCK_API_KEY: ${STOCK_API_KEY}
ports:
- "8000:8000"
depends_on:
- postgres
- redis
restart: unless-stopped
volumes:
postgres_data:
redis_data:
六、印尼市场特殊注意事项
6.1 市场特性
- 交易时间: 周一至周五 09:00-16:00(雅加达时间,UTC+7),与北京时间相同
- 货币单位: 印尼盾(IDR),注意单位转换(1人民币 ≈ 2,200印尼盾)
- 价格限制: 每日涨跌幅限制为±20%(主板),±35%(发展板)
- 结算周期: T+2交易结算制度
6.2 数据质量处理
def handle_indonesia_market_specialties(df: pd.DataFrame) -> pd.DataFrame:
"""
处理印尼市场特有的数据问题
"""
# 1. 处理大额数字(印尼盾面值较大)
if 'volume' in df.columns:
# 成交量通常以千为单位
df['volume'] = df['volume'] * 1000
# 2. 处理节假日
indonesia_holidays = [
'2024-01-01', '2024-03-11', '2024-03-12', # 新年、静居日
'2024-04-10', '2024-04-11', # 开斋节
'2024-05-23', # 卫塞节
'2024-06-01', # 潘查希拉日
'2024-07-07', # 伊斯兰新年
'2024-08-17', # 独立日
'2024-12-25' # 圣诞节
]
# 移除节假日数据
df = df[~df.index.isin(indonesia_holidays)]
# 3. 处理拆分和并股
# 印尼股票常有面值调整
adjustment_factors = {
'BBCA': 1.0, # 示例调整因子
'TLKM': 1.0,
}
return df
6.3 性能优化建议
- 数据缓存: 对不频繁变化的数据(如股票列表)使用Redis缓存
- 批量操作: 对多只股票的数据获取使用批量接口
- 异步处理: 使用asyncio提高I/O密集型任务的效率
- 连接池: 数据库连接和API连接使用连接池管理
# 异步数据获取示例
import aiohttp
import asyncio
async def fetch_multiple_stocks(symbols: List[str]):
"""异步获取多只股票数据"""
async with aiohttp.ClientSession() as session:
tasks = []
for symbol in symbols:
task = asyncio.create_task(
fetch_stock_data(session, symbol)
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def fetch_stock_data(session, symbol: str):
"""获取单只股票数据"""
url = f"{BASE_URL}/quotes/{symbol}"
async with session.get(url) as response:
return await response.json()
七、总结与扩展建议
7.1 核心要点总结
本文详细介绍了从API获取印尼股票数据的完整技术方案:
- 接口对接: 实现了REST API和WebSocket双通道数据获取
- 数据处理: 包含数据清洗、技术指标计算、异常值处理
- 存储方案: 使用PostgreSQL进行结构化存储
- 可视化: 基于Plotly和Dash的交互式图表
- 监控系统: 完整的实时监控和预警系统
7.2 扩展功能建议
- 量化策略回测: 基于历史数据实现MACD、RSI等策略回测
- 新闻情感分析: 集成印尼财经新闻的情感分析
- 宏观经济指标: 结合印尼GDP、CPI、利率等宏观数据
- 多市场对比: 加入马来西亚、泰国等东南亚市场对比
- 移动端适配: 开发移动端监控应用
7.3 注意事项
- 合规性: 确保数据使用符合印尼当地法规
- 数据授权: 商业使用需获得相应数据授权
- 系统监控: 监控API调用频率,避免超过限制
- 错误处理: 完善的错误处理和重试机制
- 数据备份: 定期备份历史数据
7.4 学习资源
- 印尼交易所官网: 了解市场规则和上市公司信息
- 金融数据API文档: 查阅最新的接口文档
- Pandas官方文档: 数据处理与分析
- Plotly文档: 数据可视化实现
- PostgreSQL手册: 数据库设计与优化
通过本文提供的技术方案,开发者可以快速构建专业的印尼股票数据系统。实际开发中应根据具体需求调整架构,特别注意印尼市场的特殊性,如交易时间、货币单位、本地节假日等。建议在正式环境中充分测试,确保系统稳定性和数据准确性。
更多推荐
所有评论(0)