Datafeed 使用指南

betalens.datafeed 是一套薄封装的数据中台,负责 Excel/EDE 文件清洗、验证、数据库读写以及 Wind 数据接入。

模块结构

  • betalens.datafeed.Datafeed:统一入口,维护 PostgreSQL 连接与日志

  • excel:读写 Excel/CSV、创建目录树、批量转换

  • validation:缺失值处理、日期列校验、枚举约束

  • query:时间序列与最近点查询、收益率计算

  • integration:增量写入、目录树批处理、错误回滚

  • wind_ingest / ede_processor:数据源适配器

  • config:配置文件管理

快速连接数据库

from betalens.datafeed import Datafeed

# 使用默认配置(从config.json读取)
df = Datafeed("daily_market_data")
print(df.sheet)  # => daily_market_data

# 自定义数据库配置
df_dev = Datafeed(
    "factor_store",
    db_config={
        'dbname': 'beta_dev',
        'user': 'postgres',
        'password': 'your_password',
        'host': 'localhost',
        'port': '5432'
    }
)

# 使用完毕后关闭连接
df.close()

Datafeed 会在构造时打开连接并创建日志文件。

常用查询

时间范围查询

# 查询时间范围内的数据
history = df.query_time_range(
    codes=["000001.SZ", "000002.SZ"],
    start_date="2024-01-01",
    end_date="2024-03-31",
    metric="收盘价(元)"
)

最近时点查询

# 查询每个时点之后最近的有效值(用于获取调仓成本价)
params = {
    'codes': ["000001.SZ", "000002.SZ"],
    'datetimes': ["2024-01-31 15:00:00", "2024-02-29 15:00:00"],
    'metric': "收盘价(元)",
    'time_tolerance': 24  # 时间容差(小时)
}
cost_price = df.query_nearest_after(params)

# 查询每个时点之前最近的有效值(用于获取历史特征)
params = {
    'codes': ["000001.SZ"],
    'datetimes': ["2024-03-31 10:00:00"],
    'metric': "归母净利润",
    'time_tolerance': 365 * 24  # 1年
}
last_report = df.query_nearest_before(params)

返回的 DataFrame 包含:

  • code: 代码

  • input_ts: 输入时间戳

  • datetime: 匹配到的数据时间戳

  • diff_hours: 时间差(小时)

  • {metric}: 数据值

  • name: 名称

辅助查询函数

# 获取最新数据日期
latest = df.get_latest_date(code="000001.SZ", metric="收盘价(元)")

# 获取可用日期列表
dates = df.get_available_dates(
    code="000001.SZ",
    metric="收盘价(元)",
    start_date="2024-01-01",
    end_date="2024-12-31"
)

Excel & EDE 导入

CSV/Excel 文件导入

insert_result = df.insert_csv_file(
    filepath=r".\exports\margin_trading.csv",
    config={
        "key_columns": ["datetime", "code", "metric"],
        "key_value_mapping": {"价值(元)": "value"},
        "additional_fields": {"source": "WIND"},
        "apply_time_alignment": True,
    },
    mode="incremental"  # 或 "insert"
)
print(insert_result)

EDE 文件导入

# 处理EDE格式的Excel文件
result = df.insert_ede_file(
    filepath='EDE20251103.xlsx',
    date_from='filename',  # 或 'metric'
    mode='incremental'
)
print(f"新增{result['new_rows']}行,跳过{result['skipped_rows']}行")

EDE格式特征:

  • 第一列:证券代码

  • 第二列:证券简称

  • 第三列及之后:指标列,格式为”指标名 [元数据] 值类型 [元数据] 单位”

批量导入

# 批量处理目录下的文件
summary = df.batch_process_excel_files(
    folder_path=r".\exports\macro",
    config={...},
    file_pattern="*.xlsx",
    recursive=True,
    mode='incremental'
)
print(summary)

Wind 数据抓取

# 抓取股票日行情
result = df.ingest_wind_daily_market(
    codes=['000001.SZ', '000002.SZ'],
    start_date='2024-01-01',
    end_date='2024-01-31',
    fields=None,  # 使用默认字段
    asset_type='stock',
    mode='incremental'
)

# 便捷方法
df.ingest_wind_daily_index(codes, start_date, end_date)  # 指数
df.ingest_wind_daily_fund(codes, start_date, end_date)   # 基金
df.ingest_wind_daily_bond(codes, start_date, end_date)   # 债券

数据验证

# 验证并修复DataFrame
cleaned_df, report = df.validate_dataframe(
    df=history,
    validations={
        "required_columns": ["datetime", "code", "value"],
        "datetime_column": "datetime",
        "fill_strategy": "forward_fill"
    }
)

# 检查Excel文件
is_valid, errors = df.check_excel_file(
    filepath=r".\data.xlsx",
    checks={"required_columns": ["datetime", "code"]}
)

增量更新

import pandas as pd

# 准备数据
new_data = pd.DataFrame({
    'datetime': ['2024-01-01 15:00:00'],
    'code': ['000001.SZ'],
    'metric': ['收盘价(元)'],
    'value': [10.5],
    'name': ['平安银行']
})

# 增量更新
new_rows, skipped_rows = df.incremental_update(
    df=new_data,
    date_column='datetime',
    code_column='code',
    metric_column='metric'
)

行业分类查询

行业归属是 point-in-time 状态数据:某股票从某日起属于某行业,直到下次变更。 betalens 不另造存储模型,而是复用长格式时序表(industry 表),约定:

  • metric:分类体系名,如 申万一级行业申万二级行业(2021)

  • value:行业代码数值部分(如 480301),便于索引分组

  • remark``(JSONB):``{"ind_name", "ind_code", "scheme"},存行业中文名等

  • datetime:该归属关系的生效时点(最早可知日)

查询语义 = 取 datetime <= 查询日 的最近一条,天然避免前视偏差。

正查 / 反查

from betalens.datafeed import query_industry, get_industry_members

# 正查:某股票在某日所属行业(注意 table_name 默认 'industry',
# 申万分类数据在 'industry' 表,须显式指定)
df = query_industry(
    cursor,
    codes=["000001.SZ"],
    dates=["2023-06-30"],
    scheme="申万一级行业",          # 不带版本后缀 → 自动选版本,见下
    table_name="industry",
)
# 返回列:code | query_date | effective_dt | sec_name |
#         industry_value | ind_name | ind_code | scheme

# 反查:某日某行业的成分股
members = get_industry_members(
    cursor, industry="银行", date="2023-06-30",
    scheme="申万一级行业", table_name="industry",
)

版本自动选择(申万 2014/2021 多版本)

申万行业分类历经多次改版(2014-02-21、2021-07-30),同一 6 位行业代码在不同 版本含义不同。入库时按「计入日期」落到版本族,metric 带版本后缀区分: 申万一级行业(旧版) / 申万一级行业(2014) / 申万一级行业(2021)

查询时无需关心版本——scheme 不带版本后缀**(``申万一级行业``)即触发版本 自动选择:底层用 ``metric LIKE ‘申万一级行业%’`` 匹配全部版本,配合 ``ORDER BY datetime DESC`` 取最近一条,结果**天然落到查询日生效的那个版本, 无需硬编码任何版本边界日期:

# 同一只股票,按查询日自动落到对应版本
df = query_industry(cursor, codes=["000001.SZ"],
                    dates=["2010-06-30", "2018-06-30", "2023-06-30"],
                    scheme="申万一级行业", table_name="industry")
# 2010 → 旧版(effective_dt=1991-04-03)
# 2018 → 2014 版(effective_dt=2014-02-21,银行)
# 2023 → 2021 版(effective_dt=2021-07-30,银行)

# 带版本后缀 → LIKE 退化为精确匹配,只查该版本
df = query_industry(cursor, codes=["000001.SZ"], dates=["2023-06-30"],
                    scheme="申万一级行业(2021)", table_name="industry")

# exact=True → 强制精确匹配(关闭自动选择),对旧无后缀 metric 向后兼容
df = query_industry(cursor, codes=["000001.SZ"], dates=["2023-06-30"],
                    scheme="申万一级行业", table_name="industry", exact=True)

Note

原理是「版本族记录的生效日天然落在各自版本区间内」,所以匹配全部版本 + 取最近 一条 = 按日期选版本。将来申万出新版本,入库时加新后缀即可,查询代码不用动

入库长表(申万分类示例)

把申万长表(股票代码 / 计入日期 / 行业代码(6位) / 更新日期)写入 industry 表的要点:

  • 6 位行业代码按每两位拆三级:前 2 位 = 一级、前 4 位 = 二级、全 6 位 = 三级, 各自一条 metric,value 存对应数值(如 48 / 4803 / 480301)。

  • 证券代码转 wind 格式:首位 6.SH0/3.SZ4/8/9.BJ``(北交所 ``8xxxxx / 9xxxxx / 689xxx 等)。

  • **中文行业名按版本字典解析**写入 ``remark.ind_name``(同一代码不同版本含义不同, 须用对应版本字典);旧版无字典覆盖时留空。

  • ``name`` 列(证券中文简称)NOT NULL:从含 wind 代码↔简称的现表补;历史/退市股 无简称时用 wind 代码占位。

  • remark 是 dict,execute_values 不自动转 JSONB,入库前注册适配器: psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)

  • incremental_insert()(datetime, code, metric) 去重写入,可重复运行。

指数股票池查询

指数成分股池与行业归属同属 point-in-time 状态数据:某指数从某调整日起拥有一组 成分股,直到下次调整。betalens 复用长格式时序表(index_universe 表),每个生效日 存为一行,成分股列表整体放入 remark

  • code:指数代码(如 000906.SH

  • name:指数名称(如 中证800

  • metric:固定为 universe,标识成分股池

  • value:成分股数量(便于校验)

  • remark``(JSONB):``{"index_code", "index_name", "constituents": [...]},成分股列表存这里

  • datetime:该股票池的生效时点(最早可知日)

查询语义与 query_nearest_before() 同构——取 datetime <= 查询日 的最近一条,天然避免前视偏差。

查询成分股列表

from betalens.datafeed import get_index_universe, get_index_universe_date

# 返回某指数某日生效的成分股代码列表(point-in-time)
codes = get_index_universe(cursor, "000906.SH", "2024-03-01")
# => ['000001.SZ', '000002.SZ', ...],约 800 只
# 实际取的是 <=2024-03-01 的最近生效日(如 2023-12-11)

# 查实际生效的快照日期(便于排查)
eff_dt = get_index_universe_date(cursor, "000906.SH", "2024-03-01")
# => Timestamp('2023-12-11 00:00:00')

# 早于首个生效日 → 返回空列表
get_index_universe(cursor, "000906.SH", "2000-01-01")   # => []

Note

query_nearest_before() 的 SELECT 只返回 value/name不返回 remark。故 get_index_universe() 先用它定位最近 生效日,再按精确 datetime 补一条小查询取出 remark.constituentscursor 兼容 RealDictCursor 与普通 cursor。

入库宽表(成分进出记录)

成分进出记录通常是 宽表快照:第一列为序号,其余每列列名为生效日期,列下方为该日 成分股 WindCode。脚本 makeupdatabase/load_index_universe.py 负责整理入库:

  • 每个日期列整理成一行datetime``=生效日、``code``=指数代码、``metric='universe'``value``=成分股数量、``remark.constituents``=该列非空去重保序的代码列表。

  • remark 是 dict,execute_values 不自动转 JSONB,入库前注册适配器: psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)

  • incremental_insert()(datetime, code, metric) 去重写入,可重复运行。

# 建表(复用通用 DDL 模板)
python betalens/datafeed/makeupdatabase/create_database.py --tables index_universe

# 入库(默认指向中证800;可换其他指数)
python betalens/datafeed/makeupdatabase/load_index_universe.py \
    --index-code 000906.SH --index-name 中证800 \
    --excel <成分进出记录.xlsx> --sheet Sheet2

交易日辅助函数

from betalens.datafeed import get_absolute_trade_days, trade_days_offset

# 获取交易日序列
trade_days = get_absolute_trade_days("2024-01-01", "2024-12-31", "D")  # 日频
trade_days = get_absolute_trade_days("2024-01-01", "2024-12-31", "M")  # 月频
trade_days = get_absolute_trade_days("2024-01-01", "2024-12-31", "Y")  # 年频

# 交易日偏移
from datetime import datetime
next_day = trade_days_offset(datetime(2024, 1, 1, 10, 0), offset=1)
prev_day = trade_days_offset(datetime(2024, 1, 1, 10, 0), offset=-1)

注意事项

  • 在离线测试或生成文档时,可通过 autodoc_mock_imports 忽略 psycopg2WindPy

  • 对于高频查询,优先使用 query_time_range 获取批量数据。

  • 所有插入方法默认开启事务;长批次任务请定期 conn.commit()

  • 配置文件 config.jsonconfig.example.json 复制并修改。

更多 API 细节请参阅 Datafeed API