Source code for betalens.datafeed.wind_ingest

#%%By Janis 250226
"""
Wind数据抓取模块(函数式)
功能:
- 从WindPy获取日行情数据
- 转换为数据库标准格式
- 支持多种资产类型(股票、指数、基金、债券)
"""

import pandas as pd
import numpy as np
from typing import List, Optional, Dict, Any, Union
from datetime import datetime
import logging

from .excel import apply_time_alignment
from .config import get_wind_config


def _get_default_logger():
    """获取默认logger"""
    logger = logging.getLogger('WindIngest')
    if not logger.handlers:
        logger.setLevel(logging.INFO)
        ch = logging.StreamHandler()
        ch.setLevel(logging.INFO)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        ch.setFormatter(formatter)
        logger.addHandler(ch)
    return logger


[docs] def fetch_daily_market( codes: List[str], start_date: str, end_date: str, fields: Optional[List[str]] = None, asset_type: str = 'stock', apply_time_stamps: bool = True, logger: Optional[logging.Logger] = None ) -> pd.DataFrame: """ 从Wind获取日行情数据并转换为数据库格式 Args: codes: 代码列表,如['000001.SZ', '000002.SZ'] start_date: 开始日期,格式'YYYY-MM-DD' end_date: 结束日期,格式'YYYY-MM-DD' fields: 字段列表,如['open', 'high', 'low', 'close', 'volume'] 默认为None,使用预设字段 asset_type: 资产类型,'stock'(股票), 'index'(指数), 'fund'(基金), 'bond'(债券) apply_time_stamps: 是否应用交易时间戳(开盘价09:30,其他15:00) logger: 日志记录器,如果为None则使用默认logger Returns: DataFrame,格式为: datetime | code | name | metric | value Example: >>> df = fetch_daily_market( ... codes=['000001.SZ'], ... start_date='2024-01-01', ... end_date='2024-01-31' ... ) >>> df.head() datetime code name metric value 0 2024-01-02 09:30:01 000001.SZ 平安银行 开盘价(元) 10.50 1 2024-01-02 15:00:01 000001.SZ 平安银行 收盘价(元) 10.55 """ if logger is None: logger = _get_default_logger() try: from WindPy import w except ImportError: logger.error("无法导入WindPy,请确保已安装Wind金融终端") raise ImportError("需要安装WindPy: pip install WindPy") # 启动Wind w.start() logger.info("Wind连接已启动") # 从配置文件获取资产类型的默认字段 wind_config = get_wind_config() asset_fields_config = wind_config.get('asset_fields', {}) # 根据资产类型设置默认字段 if fields is None: if asset_type in asset_fields_config: type_config = asset_fields_config[asset_type] fields = type_config.get('fields', ['open', 'high', 'low', 'close', 'volume']) field_names = type_config.get('field_names', fields) else: logger.warning(f"未知资产类型 '{asset_type}',使用默认股票字段") default_stock = asset_fields_config.get('stock', {}) fields = default_stock.get('fields', ['open', 'high', 'low', 'close', 'volume']) field_names = default_stock.get('field_names', fields) else: # 用户自定义字段,字段名与Wind字段名相同 field_names = fields # 构建字段映射 field_mapping = dict(zip(fields, field_names)) logger.info(f"准备获取数据: 代码数={len(codes)}, 日期范围={start_date}~{end_date}, 字段={fields}") # 获取数据 all_data = [] for code in codes: try: # 调用Wind API result = w.wsd(code, fields, start_date, end_date, "") # 检查错误 if result.ErrorCode != 0: logger.error(f"获取 {code} 数据失败: {result.Data}") continue # 获取代码名称 name_result = w.wss(code, "sec_name") if name_result.ErrorCode == 0 and name_result.Data and name_result.Data[0]: code_name = name_result.Data[0][0] else: code_name = code # 转换为DataFrame dates = result.Times data_dict = {'日期': dates, '代码': code, '简称': code_name} for i, field in enumerate(fields): data_dict[field_names[i]] = result.Data[i] df_code = pd.DataFrame(data_dict) all_data.append(df_code) logger.info(f"成功获取 {code} ({code_name}) 数据: {len(df_code)} 行") except Exception as e: logger.error(f"获取 {code} 数据时发生异常: {str(e)}") continue if not all_data: logger.warning("未获取到任何数据") return pd.DataFrame(columns=['datetime', 'code', 'name', 'metric', 'value']) # 合并所有数据 df_all = pd.concat(all_data, ignore_index=True) logger.info(f"合并完成,共 {len(df_all)} 行") # 转换为数据库格式(长格式) # 值列 = 除了日期、代码、简称之外的所有列 value_columns = [col for col in df_all.columns if col not in ['日期', '代码', '简称']] df_melted = pd.melt( df_all, id_vars=['日期', '代码', '简称'], value_vars=value_columns, var_name='metric', value_name='value' ) # 重命名列 df_melted.rename(columns={'代码': 'code', '简称': 'name'}, inplace=True) # 应用时间戳(开盘价09:30,其他15:00) if apply_time_stamps: df_melted = apply_time_alignment( df_melted, date_column='日期', metric_column='metric', inplace=True, logger=logger ) else: # 不应用时间戳,统一使用15:00:01 df_melted['日期'] = df_melted['日期'].astype(str) + " 15:00:01" df_melted['日期'] = pd.to_datetime(df_melted['日期']) # 重命名日期列为datetime df_melted.rename(columns={'日期': 'datetime'}, inplace=True) # 调整列顺序 df_melted = df_melted[['datetime', 'code', 'name', 'metric', 'value']] # 删除空值行 df_melted.dropna(subset=['value'], inplace=True) logger.info(f"数据转换完成: {len(df_melted)} 行,格式: datetime|code|name|metric|value") return df_melted
[docs] def fetch_daily_index( codes: List[str], start_date: str, end_date: str, fields: Optional[List[str]] = None, apply_time_stamps: bool = True, logger: Optional[logging.Logger] = None ) -> pd.DataFrame: """ 获取指数日行情数据 这是fetch_daily_market的便捷封装,asset_type='index' Args: codes: 指数代码列表,如['000001.SH', '399001.SZ'] start_date: 开始日期 end_date: 结束日期 fields: 字段列表,默认为None apply_time_stamps: 是否应用交易时间戳 logger: 日志记录器 Returns: DataFrame """ return fetch_daily_market( codes=codes, start_date=start_date, end_date=end_date, fields=fields, asset_type='index', apply_time_stamps=apply_time_stamps, logger=logger )
[docs] def fetch_daily_fund( codes: List[str], start_date: str, end_date: str, fields: Optional[List[str]] = None, apply_time_stamps: bool = True, logger: Optional[logging.Logger] = None ) -> pd.DataFrame: """ 获取基金日行情数据 这是fetch_daily_market的便捷封装,asset_type='fund' Args: codes: 基金代码列表 start_date: 开始日期 end_date: 结束日期 fields: 字段列表,默认为None apply_time_stamps: 是否应用交易时间戳 logger: 日志记录器 Returns: DataFrame """ return fetch_daily_market( codes=codes, start_date=start_date, end_date=end_date, fields=fields, asset_type='fund', apply_time_stamps=apply_time_stamps, logger=logger )
[docs] def fetch_daily_bond( codes: List[str], start_date: str, end_date: str, fields: Optional[List[str]] = None, apply_time_stamps: bool = True, logger: Optional[logging.Logger] = None ) -> pd.DataFrame: """ 获取债券日行情数据 这是fetch_daily_market的便捷封装,asset_type='bond' Args: codes: 债券代码列表 start_date: 开始日期 end_date: 结束日期 fields: 字段列表,默认为None apply_time_stamps: 是否应用交易时间戳 logger: 日志记录器 Returns: DataFrame """ return fetch_daily_market( codes=codes, start_date=start_date, end_date=end_date, fields=fields, asset_type='bond', apply_time_stamps=apply_time_stamps, logger=logger )