通过API获取与展示印尼股票数据的实战指南

引言:印尼股市的数据价值与获取挑战

印度尼西亚证券市场(IDX)作为东南亚最具活力的资本市场之一,近年来吸引了众多国际投资者的目光。雅加达综合指数(JKSE)在过去三年中表现稳健,年化波动率约为18%,为量化策略提供了丰富的交易机会。然而,对于开发者而言,直接获取印尼股票的实时行情、历史K线和技术指标数据面临诸多挑战:交易所官方接口申请流程复杂、数据格式不统一、延迟较高以及成本问题。

本文将详细介绍如何通过标准化的数据接口,构建一个完整的印尼股票数据获取与可视化系统。我们将从环境配置、接口调用到数据展示,提供完整的代码实现。

一、技术架构设计

1.1 系统架构概览

数据获取层 → 数据处理层 → 存储层 → 可视化层
    ↓           ↓          ↓         ↓
 HTTP/WebSocket 数据清洗    数据库    图表库
    ↓           ↓          ↓         ↓
 数据API       Pandas    PostgreSQL  Plotly

1.2 技术栈选择

  • 数据获取: Requests (HTTP)、websocket-client (实时)
  • 数据处理: Pandas、NumPy
  • 数据存储: PostgreSQL (关系型) / Redis (缓存)
  • 可视化: Plotly、Dash
  • 调度任务: APScheduler
  • 部署: Docker、FastAPI

二、API接口对接实战

2.1 认证与配置管理

# config.py - 配置文件
import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class APIConfig:
    """API配置类"""
    base_url: str = "https://api.financial-data.com"
    ws_url: str = "wss://ws.financial-data.com"
    api_key: Optional[str] = None
    timeout: int = 30
    max_retries: int = 3
    
    def __post_init__(self):
        # 优先从环境变量读取API Key
        self.api_key = os.getenv('STOCK_API_KEY', self.api_key)
        
    def validate(self) -> bool:
        """验证配置完整性"""
        if not self.api_key:
            raise ValueError("API Key未配置,请设置STOCK_API_KEY环境变量")
        if len(self.api_key) != 32:
            raise ValueError("API Key格式不正确")
        return True

# 配置实例化
config = APIConfig()

2.2 基础数据接口封装

# api_client.py - API客户端
import requests
import pandas as pd
import time
from typing import Dict, List, Any, Optional
from functools import lru_cache
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class IndonesiaStockAPI:
    """印尼股票数据API客户端"""
    
    def __init__(self, config: APIConfig):
        self.config = config
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'IndonesiaStockData/1.0',
            'Accept': 'application/json'
        })
        
    def _make_request(self, endpoint: str, params: Dict) -> Dict:
        """执行API请求,包含重试机制"""
        url = f"{self.config.base_url}/{endpoint}"
        params['apikey'] = self.config.api_key
        
        for attempt in range(self.config.max_retries):
            try:
                response = self.session.get(
                    url, 
                    params=params,
                    timeout=self.config.timeout
                )
                response.raise_for_status()
                data = response.json()
                
                # 检查API返回状态
                if data.get('code') != 200:
                    error_msg = data.get('message', '未知错误')
                    logger.error(f"API错误: {error_msg}")
                    raise ValueError(f"API返回错误: {error_msg}")
                    
                return data.get('data', {})
                
            except requests.exceptions.RequestException as e:
                logger.warning(f"请求失败 (尝试 {attempt+1}/{self.config.max_retries}): {e}")
                if attempt < self.config.max_retries - 1:
                    time.sleep(2 ** attempt)  # 指数退避
                else:
                    raise
                    
        return {}
    
    @lru_cache(maxsize=128)
    def get_stock_list(self, market: str = "IDX") -> pd.DataFrame:
        """
        获取印尼股票列表
        
        Args:
            market: 市场类型,IDX(主板)或IDX-B(发展板)
            
        Returns:
            DataFrame包含股票代码、名称、行业等信息
        """
        params = {
            'country': 'ID',
            'market': market,
            'fields': 'symbol,name,sector,listing_date,currency'
        }
        
        data = self._make_request("equities/list", params)
        
        if not data:
            return pd.DataFrame()
            
        df = pd.DataFrame(data)
        
        # 数据清洗
        df['listing_date'] = pd.to_datetime(df['listing_date'])
        df['currency'] = 'IDR'
        
        return df
    
    def get_realtime_quotes(self, symbols: List[str]) -> pd.DataFrame:
        """
        获取实时行情数据
        
        Args:
            symbols: 股票代码列表,如 ['BBCA', 'TLKM', 'BBRI']
            
        Returns:
            DataFrame包含实时价格、成交量、涨跌幅等
        """
        params = {
            'symbols': ','.join(symbols),
            'fields': 'last,change,change_pct,volume,open,high,low,prev_close'
        }
        
        data = self._make_request("quotes/realtime", params)
        
        if not data:
            return pd.DataFrame()
            
        quotes = []
        for symbol, quote_data in data.items():
            quote_data['symbol'] = symbol
            quotes.append(quote_data)
            
        return pd.DataFrame(quotes)
    
    def get_historical_data(self, 
                          symbol: str, 
                          start_date: str, 
                          end_date: str,
                          interval: str = "1d") -> pd.DataFrame:
        """
        获取历史K线数据
        
        Args:
            symbol: 股票代码
            start_date: 开始日期 (YYYY-MM-DD)
            end_date: 结束日期 (YYYY-MM-DD)
            interval: 时间间隔 (1d, 1h, 5m)
            
        Returns:
            DataFrame包含OHLCV数据
        """
        params = {
            'symbol': symbol,
            'from': start_date,
            'to': end_date,
            'interval': interval
        }
        
        data = self._make_request("quotes/historical", params)
        
        if not data:
            return pd.DataFrame()
            
        df = pd.DataFrame(data['candles'])
        df['time'] = pd.to_datetime(df['time'], unit='ms')
        df.set_index('time', inplace=True)
        
        # 重命名列
        df.columns = ['open', 'high', 'low', 'close', 'volume']
        
        return df

2.3 WebSocket实时数据订阅

# websocket_client.py - 实时数据客户端
import websocket
import json
import threading
import queue
from datetime import datetime
import logging

class RealtimeDataClient:
    """WebSocket实时数据客户端"""
    
    def __init__(self, config: APIConfig):
        self.config = config
        self.ws = None
        self.data_queue = queue.Queue()
        self.connected = False
        self.subscriptions = set()
        
    def on_message(self, ws, message):
        """处理接收到的消息"""
        try:
            data = json.loads(message)
            message_type = data.get('type', '')
            
            if message_type == 'ticker':
                # 实时行情数据
                ticker_data = {
                    'symbol': data.get('s'),
                    'timestamp': datetime.fromtimestamp(data.get('t')/1000),
                    'price': data.get('p'),
                    'volume': data.get('v'),
                    'bid': data.get('b'),
                    'ask': data.get('a'),
                    'change': data.get('c'),
                    'change_pct': data.get('cp')
                }
                self.data_queue.put(('ticker', ticker_data))
                
            elif message_type == 'trade':
                # 逐笔成交数据
                trade_data = {
                    'symbol': data.get('s'),
                    'timestamp': datetime.fromtimestamp(data.get('t')/1000),
                    'price': data.get('p'),
                    'quantity': data.get('q'),
                    'side': 'buy' if data.get('sd') == 1 else 'sell'
                }
                self.data_queue.put(('trade', trade_data))
                
        except json.JSONDecodeError as e:
            logger.error(f"JSON解析错误: {e}")
            
    def on_error(self, ws, error):
        logger.error(f"WebSocket错误: {error}")
        
    def on_close(self, ws, close_status_code, close_msg):
        logger.info(f"连接关闭: {close_msg}")
        self.connected = False
        
    def on_open(self, ws):
        logger.info("WebSocket连接已建立")
        self.connected = True
        
        # 重新订阅之前的股票
        if self.subscriptions:
            self.subscribe(list(self.subscriptions))
            
        # 启动心跳线程
        threading.Thread(target=self._heartbeat, daemon=True).start()
        
    def _heartbeat(self):
        """发送心跳包保持连接"""
        while self.connected:
            try:
                self.ws.send(json.dumps({'type': 'ping'}))
                time.sleep(30)
            except:
                break
                
    def connect(self):
        """建立WebSocket连接"""
        self.ws = websocket.WebSocketApp(
            self.config.ws_url,
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )
        
        # 启动连接线程
        wst = threading.Thread(target=self.ws.run_forever)
        wst.daemon = True
        wst.start()
        
    def subscribe(self, symbols: List[str]):
        """订阅股票实时数据"""
        if not self.connected:
            raise ConnectionError("WebSocket未连接")
            
        subscription_msg = {
            'type': 'subscribe',
            'symbols': symbols,
            'channels': ['ticker', 'trade']
        }
        
        self.ws.send(json.dumps(subscription_msg))
        self.subscriptions.update(symbols)
        
    def get_latest_data(self, timeout: float = 1.0):
        """从队列获取最新数据"""
        try:
            return self.data_queue.get(timeout=timeout)
        except queue.Empty:
            return None

三、数据处理与存储

3.1 数据清洗与标准化

# data_processor.py - 数据处理
import pandas as pd
import numpy as np
from typing import Tuple, Dict
import talib

class StockDataProcessor:
    """股票数据处理类"""
    
    @staticmethod
    def clean_ohlc_data(df: pd.DataFrame) -> pd.DataFrame:
        """
        清洗OHLC数据
        
        Args:
            df: 原始K线DataFrame
            
        Returns:
            清洗后的DataFrame
        """
        # 创建副本避免修改原始数据
        df_clean = df.copy()
        
        # 处理缺失值
        df_clean = df_clean.fillna(method='ffill')
        
        # 移除异常值(价格跳变超过20%)
        returns = df_clean['close'].pct_change()
        outliers = np.abs(returns) > 0.20
        df_clean.loc[outliers] = np.nan
        
        # 再次填充
        df_clean = df_clean.fillna(method='bfill')
        
        # 确保数据顺序
        df_clean = df_clean.sort_index()
        
        return df_clean
    
    @staticmethod
    def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
        """
        计算技术指标
        
        Args:
            df: 包含OHLCV数据的DataFrame
            
        Returns:
            添加技术指标的DataFrame
        """
        df_indicators = df.copy()
        
        # 移动平均线
        df_indicators['MA5'] = talib.SMA(df_indicators['close'], timeperiod=5)
        df_indicators['MA20'] = talib.SMA(df_indicators['close'], timeperiod=20)
        df_indicators['MA60'] = talib.SMA(df_indicators['close'], timeperiod=60)
        
        # 布林带
        df_indicators['BB_upper'], df_indicators['BB_middle'], df_indicators['BB_lower'] = \
            talib.BBANDS(df_indicators['close'], timeperiod=20)
            
        # RSI
        df_indicators['RSI'] = talib.RSI(df_indicators['close'], timeperiod=14)
        
        # MACD
        df_indicators['MACD'], df_indicators['MACD_signal'], df_indicators['MACD_hist'] = \
            talib.MACD(df_indicators['close'])
            
        # 成交量指标
        df_indicators['volume_MA20'] = talib.SMA(df_indicators['volume'], timeperiod=20)
        
        return df_indicators
    
    @staticmethod
    def detect_market_regime(df: pd.DataFrame) -> pd.Series:
        """
        识别市场状态
        
        Returns:
            市场状态序列 (trending, ranging)
        """
        # 计算波动率
        returns = df['close'].pct_change()
        volatility = returns.rolling(window=20).std()
        
        # 趋势强度
        adx = talib.ADX(df['high'], df['low'], df['close'], timeperiod=14)
        
        # 判断市场状态
        market_regime = pd.Series('ranging', index=df.index)
        market_regime[adx > 25] = 'trending'
        market_regime[volatility > volatility.quantile(0.8)] = 'high_volatility'
        
        return market_regime

3.2 数据库存储设计

# database.py - 数据库操作
import psycopg2
from psycopg2.extras import execute_values
import pandas as pd
from datetime import datetime
from typing import List, Dict
import json

class StockDatabase:
    """股票数据库管理类"""
    
    def __init__(self, connection_string: str):
        self.conn_string = connection_string
        self._create_tables()
        
    def _get_connection(self):
        """获取数据库连接"""
        return psycopg2.connect(self.conn_string)
    
    def _create_tables(self):
        """创建数据表"""
        create_tables_sql = """
        -- 股票基本信息表
        CREATE TABLE IF NOT EXISTS stocks (
            symbol VARCHAR(20) PRIMARY KEY,
            name VARCHAR(200),
            sector VARCHAR(100),
            listing_date DATE,
            currency CHAR(3),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        
        -- 日线数据表
        CREATE TABLE IF NOT EXISTS daily_bars (
            id SERIAL PRIMARY KEY,
            symbol VARCHAR(20) REFERENCES stocks(symbol),
            date DATE,
            open DECIMAL(12, 4),
            high DECIMAL(12, 4),
            low DECIMAL(12, 4),
            close DECIMAL(12, 4),
            volume BIGINT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            UNIQUE(symbol, date)
        );
        
        -- 实时行情表
        CREATE TABLE IF NOT EXISTS realtime_quotes (
            id SERIAL PRIMARY KEY,
            symbol VARCHAR(20),
            timestamp TIMESTAMP,
            price DECIMAL(12, 4),
            volume INTEGER,
            bid DECIMAL(12, 4),
            ask DECIMAL(12, 4),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        
        -- 创建索引
        CREATE INDEX IF NOT EXISTS idx_daily_bars_symbol_date 
        ON daily_bars(symbol, date);
        
        CREATE INDEX IF NOT EXISTS idx_realtime_quotes_symbol_timestamp 
        ON realtime_quotes(symbol, timestamp);
        """
        
        with self._get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(create_tables_sql)
                conn.commit()
    
    def save_daily_data(self, symbol: str, df: pd.DataFrame):
        """保存日线数据"""
        if df.empty:
            return
            
        # 准备数据
        records = []
        for idx, row in df.iterrows():
            records.append((
                symbol,
                idx.date(),
                float(row['open']),
                float(row['high']),
                float(row['low']),
                float(row['close']),
                int(row['volume'])
            ))
        
        # 插入数据
        insert_sql = """
        INSERT INTO daily_bars (symbol, date, open, high, low, close, volume)
        VALUES %s
        ON CONFLICT (symbol, date) DO UPDATE SET
            open = EXCLUDED.open,
            high = EXCLUDED.high,
            low = EXCLUDED.low,
            close = EXCLUDED.close,
            volume = EXCLUDED.volume
        """
        
        with self._get_connection() as conn:
            with conn.cursor() as cur:
                execute_values(cur, insert_sql, records)
                conn.commit()
    
    def get_historical_data(self, symbol: str, 
                           start_date: str, 
                           end_date: str) -> pd.DataFrame:
        """获取历史数据"""
        query = """
        SELECT date, open, high, low, close, volume
        FROM daily_bars
        WHERE symbol = %s
            AND date >= %s
            AND date <= %s
        ORDER BY date
        """
        
        with self._get_connection() as conn:
            df = pd.read_sql_query(query, conn, 
                                  params=(symbol, start_date, end_date),
                                  parse_dates=['date'])
            df.set_index('date', inplace=True)
            return df

四、数据可视化展示

4.1 交互式K线图表

# visualization.py - 数据可视化
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
import pandas as pd
import numpy as np

class StockVisualizer:
    """股票数据可视化类"""
    
    @staticmethod
    def create_candlestick_chart(df: pd.DataFrame, 
                                symbol: str,
                                indicators: List[str] = None) -> go.Figure:
        """
        创建K线图
        
        Args:
            df: 包含OHLCV数据的DataFrame
            symbol: 股票代码
            indicators: 要显示的技术指标列表
            
        Returns:
            Plotly图表对象
        """
        if indicators is None:
            indicators = ['MA20', 'BB_upper', 'BB_lower']
        
        # 创建子图
        fig = make_subplots(
            rows=2, 
            cols=1,
            shared_xaxes=True,
            vertical_spacing=0.05,
            row_heights=[0.7, 0.3],
            subplot_titles=(f'{symbol} 价格走势', '成交量')
        )
        
        # 添加K线
        fig.add_trace(
            go.Candlestick(
                x=df.index,
                open=df['open'],
                high=df['high'],
                low=df['low'],
                close=df['close'],
                name='价格',
                increasing_line_color='#26a69a',
                decreasing_line_color='#ef5350'
            ),
            row=1, col=1
        )
        
        # 添加技术指标
        if 'MA20' in df.columns and 'MA20' in indicators:
            fig.add_trace(
                go.Scatter(
                    x=df.index,
                    y=df['MA20'],
                    name='MA20',
                    line=dict(color='orange', width=1)
                ),
                row=1, col=1
            )
        
        # 添加成交量
        colors = ['#26a69a' if close >= open_ else '#ef5350' 
                 for close, open_ in zip(df['close'], df['open'])]
        
        fig.add_trace(
            go.Bar(
                x=df.index,
                y=df['volume'],
                name='成交量',
                marker_color=colors,
                opacity=0.5
            ),
            row=2, col=1
        )
        
        # 更新布局
        fig.update_layout(
            title=f'{symbol} 技术分析图表',
            yaxis_title='价格 (IDR)',
            yaxis2_title='成交量',
            xaxis2_title='日期',
            xaxis_rangeslider_visible=False,
            height=700,
            template='plotly_dark',
            hovermode='x unified'
        )
        
        # 更新坐标轴
        fig.update_xaxes(
            rangeslider_thickness=0.05,
            rangeselector=dict(
                buttons=list([
                    dict(count=1, label="1m", step="month", stepmode="backward"),
                    dict(count=6, label="6m", step="month", stepmode="backward"),
                    dict(count=1, label="YTD", step="year", stepmode="todate"),
                    dict(count=1, label="1y", step="year", stepmode="backward"),
                    dict(step="all")
                ])
            )
        )
        
        return fig
    
    @staticmethod
    def create_market_overview(stocks_data: Dict[str, pd.DataFrame]) -> go.Figure:
        """
        创建市场概览仪表板
        
        Args:
            stocks_data: 多只股票数据字典
            
        Returns:
            组合图表
        """
        # 创建子图
        fig = make_subplots(
            rows=2, cols=2,
            subplot_titles=('价格走势对比', '涨跌幅分布',
                          '成交量热力图', '相关性矩阵'),
            specs=[[{"type": "scatter"}, {"type": "histogram"}],
                   [{"type": "heatmap"}, {"type": "heatmap"}]]
        )
        
        # 1. 价格走势对比 (归一化)
        for symbol, df in stocks_data.items():
            if not df.empty and 'close' in df.columns:
                normalized_price = df['close'] / df['close'].iloc[0] * 100
                fig.add_trace(
                    go.Scatter(
                        x=df.index,
                        y=normalized_price,
                        name=symbol,
                        mode='lines'
                    ),
                    row=1, col=1
                )
        
        # 2. 涨跌幅分布
        returns_data = []
        for symbol, df in stocks_data.items():
            if not df.empty and 'close' in df.columns:
                daily_returns = df['close'].pct_change().dropna()
                returns_data.extend(daily_returns.tolist())
        
        fig.add_trace(
            go.Histogram(
                x=returns_data,
                nbinsx=50,
                name='涨跌幅分布',
                marker_color='blue',
                opacity=0.7
            ),
            row=1, col=2
        )
        
        # 更新布局
        fig.update_layout(
            title='印尼股票市场概览',
            height=800,
            showlegend=True
        )
        
        return fig

4.2 实时监控仪表板

# dashboard.py - 实时监控仪表板
import dash
from dash import dcc, html, Input, Output
import dash_bootstrap_components as dbc
import plotly.graph_objects as go
from datetime import datetime, timedelta
import pandas as pd

def create_realtime_dashboard(api_client: IndonesiaStockAPI):
    """创建实时监控仪表板"""
    app = dash.Dash(__name__, 
                   external_stylesheets=[dbc.themes.DARKLY])
    
    # 布局定义
    app.layout = dbc.Container([
        dbc.Row([
            dbc.Col([
                html.H1("印尼股票实时监控系统", className="text-center my-4"),
                html.Hr()
            ])
        ]),
        
        dbc.Row([
            dbc.Col([
                dbc.Card([
                    dbc.CardHeader("股票选择"),
                    dbc.CardBody([
                        dcc.Dropdown(
                            id='symbol-selector',
                            options=[
                                {'label': '中亚银行 (BBCA)', 'value': 'BBCA'},
                                {'label': '印尼电信 (TLKM)', 'value': 'TLKM'},
                                {'label': '人民银行 (BBRI)', 'value': 'BBRI'},
                                {'label': '阿斯特拉 (ASII)', 'value': 'ASII'},
                                {'label': 'GoTo (GOTO)', 'value': 'GOTO'}
                            ],
                            value=['BBCA', 'TLKM'],
                            multi=True,
                            className="mb-3"
                        ),
                        dbc.Button("更新数据", 
                                  id='update-button',
                                  color="primary",
                                  className="w-100")
                    ])
                ], className="mb-4")
            ], width=3),
            
            dbc.Col([
                dbc.Card([
                    dbc.CardHeader("实时行情"),
                    dbc.CardBody([
                        dcc.Graph(id='price-chart'),
                        dcc.Interval(
                            id='interval-component',
                            interval=10*1000,  # 10秒更新
                            n_intervals=0
                        )
                    ])
                ])
            ], width=9)
        ]),
        
        dbc.Row([
            dbc.Col([
                dbc.Card([
                    dbc.CardHeader("技术指标"),
                    dbc.CardBody([
                        dcc.Graph(id='technical-chart')
                    ])
                ])
            ], width=6),
            
            dbc.Col([
                dbc.Card([
                    dbc.CardHeader("市场概览"),
                    dbc.CardBody([
                        html.Div(id='market-overview')
                    ])
                ])
            ], width=6)
        ], className="mt-4")
    ], fluid=True)
    
    # 回调函数
    @app.callback(
        [Output('price-chart', 'figure'),
         Output('technical-chart', 'figure'),
         Output('market-overview', 'children')],
        [Input('update-button', 'n_clicks'),
         Input('interval-component', 'n_intervals')],
        [dash.dependencies.State('symbol-selector', 'value')]
    )
    def update_charts(n_clicks, n_intervals, selected_symbols):
        """更新图表数据"""
        if not selected_symbols:
            return go.Figure(), go.Figure(), "请选择股票"
        
        # 获取实时数据
        quotes = api_client.get_realtime_quotes(selected_symbols)
        
        # 价格图表
        price_fig = go.Figure()
        for _, quote in quotes.iterrows():
            price_fig.add_trace(go.Indicator(
                mode="number+delta",
                value=quote['last'],
                delta={'reference': quote['prev_close']},
                title={'text': quote['symbol']},
                domain={'row': 0, 'column': quotes.index.get_loc(_)}
            ))
        
        price_fig.update_layout(
            grid={'rows': 1, 'columns': len(selected_symbols), 'pattern': "independent"},
            height=200
        )
        
        # 技术图表
        tech_fig = go.Figure()
        
        # 这里可以添加更多技术分析图表
        for symbol in selected_symbols[:3]:  # 限制显示数量
            try:
                # 获取历史数据计算指标
                hist_data = api_client.get_historical_data(
                    symbol, 
                    (datetime.now() - timedelta(days=60)).strftime('%Y-%m-%d'),
                    datetime.now().strftime('%Y-%m-%d')
                )
                
                if not hist_data.empty:
                    # 计算RSI
                    hist_data['returns'] = hist_data['close'].pct_change()
                    tech_fig.add_trace(go.Scatter(
                        x=hist_data.index,
                        y=hist_data['returns'].rolling(20).std() * 100,  # 波动率
                        name=f"{symbol}波动率",
                        mode='lines'
                    ))
            except Exception as e:
                print(f"获取{symbol}数据失败: {e}")
        
        tech_fig.update_layout(
            title="波动率分析",
            height=300
        )
        
        # 市场概览
        overview_content = []
        for _, quote in quotes.iterrows():
            change_color = 'success' if quote['change'] >= 0 else 'danger'
            overview_content.append(
                dbc.Row([
                    dbc.Col(html.Strong(quote['symbol']), width=3),
                    dbc.Col(f"{quote['last']:,.0f}", width=3),
                    dbc.Col(
                        html.Span(
                            f"{quote['change']:+.2f} ({quote['change_pct']:+.2f}%)",
                            className=f"text-{change_color}"
                        ),
                        width=6
                    )
                ], className="mb-2")
            )
        
        return price_fig, tech_fig, overview_content
    
    return app

五、实战案例:构建完整的监控系统

5.1 主程序入口

# main.py - 主程序
import asyncio
import schedule
import time
from datetime import datetime
import logging
from typing import List

class IndonesiaStockMonitor:
    """印尼股票监控系统主类"""
    
    def __init__(self, config_path: str = "config.yaml"):
        self.config = self._load_config(config_path)
        self.api_client = IndonesiaStockAPI(self.config.api_config)
        self.db_client = StockDatabase(self.config.db_connection)
        self.visualizer = StockVisualizer()
        
        # 监控的股票列表
        self.watchlist = ['BBCA', 'TLKM', 'BBRI', 'ASII', 'GOTO']
        
    def _load_config(self, config_path: str):
        """加载配置文件"""
        import yaml
        with open(config_path, 'r') as f:
            return yaml.safe_load(f)
    
    def update_daily_data(self):
        """更新日线数据"""
        logging.info(f"{datetime.now()}: 开始更新日线数据")
        
        for symbol in self.watchlist:
            try:
                # 获取数据
                end_date = datetime.now().strftime('%Y-%m-%d')
                start_date = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')
                
                df = self.api_client.get_historical_data(
                    symbol, start_date, end_date, interval="1d"
                )
                
                if not df.empty:
                    # 处理数据
                    processor = StockDataProcessor()
                    df_clean = processor.clean_ohlc_data(df)
                    df_with_indicators = processor.calculate_technical_indicators(df_clean)
                    
                    # 保存到数据库
                    self.db_client.save_daily_data(symbol, df_with_indicators)
                    
                    logging.info(f"已更新 {symbol} 的日线数据,共 {len(df)} 条记录")
                    
            except Exception as e:
                logging.error(f"更新 {symbol} 数据失败: {e}")
    
    def realtime_monitor(self):
        """实时监控任务"""
        # 创建WebSocket客户端
        ws_client = RealtimeDataClient(self.config.api_config)
        ws_client.connect()
        
        # 订阅股票
        time.sleep(2)  # 等待连接建立
        ws_client.subscribe(self.watchlist)
        
        logging.info("实时监控已启动")
        
        try:
            while True:
                # 获取最新数据
                data = ws_client.get_latest_data(timeout=0.5)
                
                if data:
                    data_type, data_content = data
                    
                    if data_type == 'ticker':
                        # 处理实时行情
                        self._process_realtime_ticker(data_content)
                        
                    elif data_type == 'trade':
                        # 处理成交数据
                        self._process_trade_data(data_content)
                        
                # 每秒检查一次
                time.sleep(1)
                
        except KeyboardInterrupt:
            logging.info("监控已停止")
    
    def _process_realtime_ticker(self, ticker_data: dict):
        """处理实时行情数据"""
        # 这里可以添加各种监控逻辑
        # 例如:价格预警、异常波动检测等
        
        symbol = ticker_data['symbol']
        price = ticker_data['price']
        change_pct = ticker_data['change_pct']
        
        # 价格预警示例
        if abs(change_pct) > 5:  # 涨跌幅超过5%
            alert_msg = f"预警: {symbol} 价格波动 {change_pct:.2f}%,当前价格 {price}"
            self.send_alert(alert_msg)
            
        # 保存到数据库
        self.db_client.save_realtime_quote(ticker_data)
    
    def send_alert(self, message: str):
        """发送警报"""
        # 可以集成邮件、短信、Telegram等通知方式
        logging.warning(f"警报: {message}")
        
        # 示例:打印到控制台
        print(f"[{datetime.now()}] {message}")
    
    def run(self):
        """运行监控系统"""
        logging.info("启动印尼股票监控系统")
        
        # 启动定时任务
        schedule.every().day.at("18:00").do(self.update_daily_data)  # 每天更新
        
        # 启动实时监控(在新线程中)
        import threading
        monitor_thread = threading.Thread(target=self.realtime_monitor, daemon=True)
        monitor_thread.start()
        
        # 主循环
        try:
            while True:
                schedule.run_pending()
                time.sleep(60)  # 每分钟检查一次定时任务
        except KeyboardInterrupt:
            logging.info("系统正常退出")

# 配置示例 (config.yaml)
"""
api:
  base_url: "https://api.financial-data.com"
  ws_url: "wss://ws.financial-data.com"
  api_key: "${STOCK_API_KEY}"
  timeout: 30
  max_retries: 3

database:
  connection: "postgresql://user:password@localhost/indonesia_stocks"

monitoring:
  watchlist: ["BBCA", "TLKM", "BBRI", "ASII", "GOTO"]
  alert_threshold: 5.0  # 涨跌幅超过5%触发警报
  update_interval: 3600  # 数据更新间隔(秒)
"""

5.2 部署与优化

# docker-compose.yml - 容器化部署
version: '3.8'

services:
  postgres:
    image: postgres:14
    environment:
      POSTGRES_DB: indonesia_stocks
      POSTGRES_USER: stock_user
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"
  
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
  
  stock-api:
    build: .
    environment:
      DATABASE_URL: postgresql://stock_user:${DB_PASSWORD}@postgres/indonesia_stocks
      REDIS_URL: redis://redis:6379
      STOCK_API_KEY: ${STOCK_API_KEY}
    ports:
      - "8000:8000"
    depends_on:
      - postgres
      - redis
    restart: unless-stopped

volumes:
  postgres_data:
  redis_data:

六、印尼市场特殊注意事项

6.1 市场特性

  1. 交易时间: 周一至周五 09:00-16:00(雅加达时间,UTC+7),与北京时间相同
  2. 货币单位: 印尼盾(IDR),注意单位转换(1人民币 ≈ 2,200印尼盾)
  3. 价格限制: 每日涨跌幅限制为±20%(主板),±35%(发展板)
  4. 结算周期: T+2交易结算制度

6.2 数据质量处理

def handle_indonesia_market_specialties(df: pd.DataFrame) -> pd.DataFrame:
    """
    处理印尼市场特有的数据问题
    """
    # 1. 处理大额数字(印尼盾面值较大)
    if 'volume' in df.columns:
        # 成交量通常以千为单位
        df['volume'] = df['volume'] * 1000
    
    # 2. 处理节假日
    indonesia_holidays = [
        '2024-01-01', '2024-03-11', '2024-03-12',  # 新年、静居日
        '2024-04-10', '2024-04-11',  # 开斋节
        '2024-05-23',  # 卫塞节
        '2024-06-01',  # 潘查希拉日
        '2024-07-07',  # 伊斯兰新年
        '2024-08-17',  # 独立日
        '2024-12-25'   # 圣诞节
    ]
    
    # 移除节假日数据
    df = df[~df.index.isin(indonesia_holidays)]
    
    # 3. 处理拆分和并股
    # 印尼股票常有面值调整
    adjustment_factors = {
        'BBCA': 1.0,  # 示例调整因子
        'TLKM': 1.0,
    }
    
    return df

6.3 性能优化建议

  1. 数据缓存: 对不频繁变化的数据(如股票列表)使用Redis缓存
  2. 批量操作: 对多只股票的数据获取使用批量接口
  3. 异步处理: 使用asyncio提高I/O密集型任务的效率
  4. 连接池: 数据库连接和API连接使用连接池管理
# 异步数据获取示例
import aiohttp
import asyncio

async def fetch_multiple_stocks(symbols: List[str]):
    """异步获取多只股票数据"""
    async with aiohttp.ClientSession() as session:
        tasks = []
        for symbol in symbols:
            task = asyncio.create_task(
                fetch_stock_data(session, symbol)
            )
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

async def fetch_stock_data(session, symbol: str):
    """获取单只股票数据"""
    url = f"{BASE_URL}/quotes/{symbol}"
    async with session.get(url) as response:
        return await response.json()

七、总结与扩展建议

7.1 核心要点总结

本文详细介绍了从API获取印尼股票数据的完整技术方案:

  1. 接口对接: 实现了REST API和WebSocket双通道数据获取
  2. 数据处理: 包含数据清洗、技术指标计算、异常值处理
  3. 存储方案: 使用PostgreSQL进行结构化存储
  4. 可视化: 基于Plotly和Dash的交互式图表
  5. 监控系统: 完整的实时监控和预警系统

7.2 扩展功能建议

  1. 量化策略回测: 基于历史数据实现MACD、RSI等策略回测
  2. 新闻情感分析: 集成印尼财经新闻的情感分析
  3. 宏观经济指标: 结合印尼GDP、CPI、利率等宏观数据
  4. 多市场对比: 加入马来西亚、泰国等东南亚市场对比
  5. 移动端适配: 开发移动端监控应用

7.3 注意事项

  1. 合规性: 确保数据使用符合印尼当地法规
  2. 数据授权: 商业使用需获得相应数据授权
  3. 系统监控: 监控API调用频率,避免超过限制
  4. 错误处理: 完善的错误处理和重试机制
  5. 数据备份: 定期备份历史数据

7.4 学习资源

  • 印尼交易所官网: 了解市场规则和上市公司信息
  • 金融数据API文档: 查阅最新的接口文档
  • Pandas官方文档: 数据处理与分析
  • Plotly文档: 数据可视化实现
  • PostgreSQL手册: 数据库设计与优化

通过本文提供的技术方案,开发者可以快速构建专业的印尼股票数据系统。实际开发中应根据具体需求调整架构,特别注意印尼市场的特殊性,如交易时间、货币单位、本地节假日等。建议在正式环境中充分测试,确保系统稳定性和数据准确性。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐