Source code for betalens.datafeed.universe

#%%By Janis 260603
"""
指数历史股票池查询模块(函数式)

设计要点
--------
指数成分股池是 point-in-time 的"状态"数据:某指数从某调整日起拥有一组成分股,
直到下次调整。每个生效日存为一行,约定:
    - code   : 指数代码(如 '000906.SH')
    - name   : 指数名称(如 '中证800')
    - metric : 固定为 'universe',标识成分股池
    - value  : 成分股数量(便于校验)
    - remark : JSONB,约定 {"index_code", "index_name", "constituents": [...]}
    - datetime: 该股票池的生效时点(最早可知日)

查询语义 = 取 datetime <= 查询日 的最近一条,与 query.query_nearest_before 同构,
天然避免前视偏差。由于成分股列表存在 remark(JSONB),而 query_nearest_before 只
返回 value/name,故本模块用它先"定位"最近生效日,再补一条小查询取出 remark。

主要接口
--------
    get_index_universe      : 返回某指数某日生效的成分股代码列表
    get_index_universe_date : 返回某指数某日实际生效的快照日期(便于排查)
"""

import logging
from typing import Optional, List

from .query import query_nearest_before


DEFAULT_TABLE = 'index_universe'
DEFAULT_METRIC = 'universe'


def _get_default_logger():
    logger = logging.getLogger('IndexUniverseQuery')
    if not logger.handlers:
        logger.setLevel(logging.INFO)
        ch = logging.StreamHandler()
        ch.setLevel(logging.INFO)
        ch.setFormatter(logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
        logger.addHandler(ch)
    return logger


[docs] def get_index_universe_date( cursor, index_code: str, date: str, table_name: str = DEFAULT_TABLE, metric: str = DEFAULT_METRIC, logger: Optional[logging.Logger] = None, ): """ 返回某指数在某日实际生效的快照日期(point-in-time,取 datetime<=date 的最近一条)。 复用 query.query_nearest_before 定位最近生效日。 Args: cursor: 数据库游标(建议 RealDictCursor) index_code: 指数代码,如 '000906.SH' date: 查询日期,'YYYY-MM-DD' 或 'YYYY-MM-DD HH:MM:SS' table_name: 表名,默认 'index_universe' metric: 指标名,默认 'universe' logger: 日志器 Returns: 生效快照的 datetime(pandas.Timestamp);该日前无可用股票池则返回 None """ if logger is None: logger = _get_default_logger() if not index_code: raise ValueError("index_code不能为空") if not date: raise ValueError("date不能为空") df = query_nearest_before( cursor, table_name=table_name, codes=[index_code], datetimes=[date], metric=metric, logger=logger, ) if df.empty or 'datetime' not in df.columns: return None eff_dt = df.iloc[0]['datetime'] # query_nearest_before 用 LEFT JOIN,无匹配时 datetime 为 NaT/None if eff_dt is None or (hasattr(eff_dt, '__class__') and str(eff_dt) == 'NaT'): return None import pandas as pd if pd.isna(eff_dt): return None return eff_dt
[docs] def get_index_universe( cursor, index_code: str, date: str, table_name: str = DEFAULT_TABLE, metric: str = DEFAULT_METRIC, logger: Optional[logging.Logger] = None, ) -> List[str]: """ 返回 index_code 在 date 当日生效的成分股代码列表(point-in-time)。 步骤:用 query.query_nearest_before 找到 <=date 的最近生效快照日,再取该行 remark 中的 constituents 列表。该日前无可用股票池则返回空列表。 Args: cursor: 数据库游标(建议 RealDictCursor) index_code: 指数代码,如 '000906.SH' date: 查询日期,'YYYY-MM-DD' 或 'YYYY-MM-DD HH:MM:SS' table_name: 表名,默认 'index_universe' metric: 指标名,默认 'universe' logger: 日志器 Returns: 成分股代码列表(如 ['000001.SZ', ...]);无可用股票池则返回 [] """ if logger is None: logger = _get_default_logger() eff_dt = get_index_universe_date( cursor, index_code, date, table_name=table_name, metric=metric, logger=logger) if eff_dt is None: logger.info(f"{index_code} @ {date}: 无可用股票池") return [] # 按精确 datetime 取该行 remark(query_nearest_before 不返回 remark) cursor.execute( f"SELECT remark FROM {table_name} " f"WHERE code = %s AND metric = %s AND datetime = %s", (index_code, metric, eff_dt) ) row = cursor.fetchone() if row is None: logger.warning(f"{index_code} @ {eff_dt}: 定位到生效日但取 remark 为空") return [] # 兼容 RealDictCursor(dict) 与普通 cursor(tuple) remark = row['remark'] if isinstance(row, dict) else row[0] if not isinstance(remark, dict): logger.warning(f"{index_code} @ {eff_dt}: remark 非预期结构") return [] constituents = list(remark.get('constituents', [])) logger.info(f"{index_code} @ {date}: 生效日 {eff_dt}, 成分股 {len(constituents)} 只") return constituents