Datafeed API

核心包:betalens.datafeed

Datafeed工具包 包含Excel处理、数据验证、数据库查询和集成等功能模块

子模块: - core: 核心Datafeed类及辅助函数 - excel: Excel文件处理工具 - validation: 数据验证和异常检查工具 - query: 数据库查询功能 - integration: 数据库与Excel交互功能

class betalens.datafeed.Datafeed(table_name, db_config=None, log_dir=None)[source]

Bases: object

数据管理主类(薄封装层)

职责: - 管理数据库连接 - 提供统一的日志记录 - 组合子模块功能为高层API - 不包含业务逻辑(所有逻辑在子模块中)

使用示例:

# 创建实例(使用默认配置) df = Datafeed(“daily_market_data”)

# 创建实例(自定义数据库配置) df = Datafeed(

table_name=”daily_market_data”, db_config={

‘dbname’: ‘my_database’, ‘user’: ‘my_user’, ‘password’: ‘my_password’, ‘host’: ‘localhost’, ‘port’: ‘5432’

}

)

# 单文件插入 df.insert_csv_file(“data.csv”, config={…})

# Wind数据抓取 df.ingest_wind_daily_market(

codes=[‘000001.SZ’], start_date=’2024-01-01’, end_date=’2024-01-31’

)

# 查询 data = df.query_time_range(

codes=[‘000001.SZ’], start_date=’2024-01-01’, metric=’收盘价(元)’

)

# 关闭 df.close()

__init__(table_name, db_config=None, log_dir=None)[source]

初始化Datafeed实例

Parameters:
  • table_name – 数据库表名

  • db_config – 数据库配置字典,包含以下键: - dbname: 数据库名 - user: 用户名 - password: 密码 - host: 主机地址 - port: 端口 如果为None,使用config.json中的配置

  • log_dir – 日志目录,如果为None,使用config.json中的配置

insert_csv_file(filepath, config, mode='incremental')[source]

单文件CSV插入(薄封装)

组合流程: 1. excel.read_file - 读取文件 2. excel.cross_section_to_db_format - 转换格式(如果需要) 3. validation.validate_and_fix - 验证和修复(如果配置) 4. integration.insert_dataframe/incremental_insert - 插入

Parameters:
  • filepath (str) – 文件路径

  • config (dict) – 配置字典,包含: - key_columns: 键列列表(用于cross_section转换) - value_columns: 值列列表(可选,自动推断) - key_value_mapping: 列名映射 - additional_fields: 额外字段 - validation: 验证配置 - apply_time_alignment: 是否应用时间对齐(开盘09:30,其他15:00)

  • mode (str) – ‘insert’(直接插入)或’incremental’(增量插入,默认)

Return type:

dict

Returns:

统计信息字典

insert_ede_file(filepath, date_from='filename', default_datetime=None, mode='incremental')[source]

处理并插入EDE格式的Excel文件(薄封装)

EDE格式特征: - 第一列:证券代码 -第二列:证券简称 - 第三列及之后:指标列,格式为”指标名 [元数据] 值类型 [元数据] 单位”

示例EDE格式:

证券代码 证券简称 流通A股 [交易日期] 最新 [单位] 股 002460.SZ 赣锋锂业 1,211,379,763.0000 1772.HK 赣锋锂业 1,211,379,763.0000

处理流程: 1. 读取Excel文件 2. 清理数据(去除空值、”数据来源:Wind”等) 3. 识别code、name列 4. 解析metric列,提取指标名称和元数据 5. 构建日期列(从文件名或列名中提取) 6. 转换为数据库格式 7. 插入数据库

Parameters:
  • filepath (str) – EDE格式Excel文件路径

  • date_from (str) – 日期来源,可选值: - ‘filename’: 从文件名提取日期(如EDE20251103.xlsx -> 2025-11-03 15:30:00) - ‘metric’: 从列名中的[日期]部分提取

  • default_datetime (Optional[str]) – 默认日期时间(当无法从文件名或列名提取时使用) 格式:’YYYY-MM-DD HH:MM:SS’,如’2025-11-03 15:30:00’

  • mode (str) – 插入模式 - ‘incremental’: 增量插入(默认),只插入新数据 - ‘insert’: 直接插入,会检查重复并跳过

Return type:

dict

Returns:

统计信息字典,包含:
  • success: 是否成功

  • new_rows: 新增行数

  • skipped_rows: 跳过行数

  • errors: 错误列表(如果有)

Example

>>> df = Datafeed('daily_market_data')
>>> result = df.insert_ede_file(
...     'EDE20251103.xlsx',
...     date_from='filename',
...     mode='incremental'
... )
>>> print(f"新增{result['new_rows']}行,跳过{result['skipped_rows']}行")
batch_process_excel_files(folder_path, config, file_pattern='*.csv', recursive=True, mode='insert')[source]

批量处理Excel文件并插入数据库(薄封装)

直接调用 integration.process_directory_tree

Parameters:
  • folder_path (str) – 文件夹路径

  • config (dict) – 处理配置

  • file_pattern (str) – 文件匹配模式

  • recursive (bool) – 是否递归搜索

  • mode (str) – 插入模式,’insert’或’incremental’

Returns:

处理统计字典

incremental_update(df, date_column='datetime', code_column='code', metric_column='metric')[source]

增量更新数据到数据库(薄封装)

直接调用 integration.incremental_insert

Parameters:
  • df (DataFrame) – 待更新的DataFrame

  • date_column (str) – 日期列名

  • code_column (str) – 代码列名

  • metric_column (str) – 指标列名

Returns:

(新增行数, 重复行数)

insert_with_conflict_check(df, date_column='datetime', code_column='code', metric_column='metric')[source]

插入数据时检测重复和冲突(批量查询优化版本)

  • 如果key(datetime, code, metric)相同但value不同:记录冲突,不插入

  • 如果key和value完全相同:跳过,不插入

  • 如果key不存在:插入新记录

Parameters:
  • df (DataFrame) – 待插入的DataFrame,需包含columns: datetime, code, name, metric, value

  • date_column (str) – 日期列名

  • code_column (str) – 代码列名

  • metric_column (str) – 指标列名

Returns:

(新增行数, 跳过行数, 冲突列表) 冲突列表格式:[{datetime, code, name, metric, db_value, new_value}, …]

update_data(df, date_column='datetime', code_column='code', metric_column='metric')[source]

使用SQL UPDATE批量更新数据

对于存在的记录(相同datetime, code, metric),更新value和name 对于不存在的记录,可选择插入(upsert模式)

Parameters:
  • df (DataFrame) – 待更新的DataFrame,需包含columns: datetime, code, name, metric, value

  • date_column (str) – 日期列名

  • code_column (str) – 代码列名

  • metric_column (str) – 指标列名

Returns:

更新行数

ingest_wind_daily_market(codes, start_date, end_date, fields=None, asset_type='stock', mode='incremental')[source]

从Wind获取日行情并插入数据库(薄封装)

组合流程: 1. wind_ingest.fetch_daily_market - 获取Wind数据 2. integration.incremental_insert/insert_dataframe - 插入

Parameters:
  • codes (list) – 代码列表

  • start_date (str) – 开始日期,格式’YYYY-MM-DD’

  • end_date (str) – 结束日期

  • fields (Optional[list]) – 字段列表,None使用默认字段

  • asset_type (str) – 资产类型,’stock’, ‘index’, ‘fund’, ‘bond’

  • mode (str) – 插入模式,’incremental’(默认)或’insert’

Return type:

dict

Returns:

统计信息字典

ingest_wind_daily_index(codes, start_date, end_date, fields=None, mode='incremental')[source]

Wind指数数据抓取(便捷封装)

Return type:

dict

Parameters:
ingest_wind_daily_fund(codes, start_date, end_date, fields=None, mode='incremental')[source]

Wind基金数据抓取(便捷封装)

Return type:

dict

Parameters:
ingest_wind_daily_bond(codes, start_date, end_date, fields=None, mode='incremental')[source]

Wind债券数据抓取(便捷封装)

Return type:

dict

Parameters:
run_query(conditions=None, params=None, select_columns='*')[source]

执行自定义SQL查询(薄封装)

替代原 query_data 方法,使用 query.build_query

Parameters:
  • conditions (Optional[list]) – SQL条件列表,如[‘datetime >= %s’, ‘code = %s’]

  • params (Optional[list]) – 参数列表

  • select_columns (str) – 要选择的列

Returns:

DataFrame

query_time_range(codes=None, start_date=None, end_date=None, metric=None, limit=None)[source]

查询指定时间范围的数据(薄封装)

直接调用 query.query_time_range

Parameters:
Returns:

DataFrame

query_nearest_after(params=None)[source]

根据输入时间戳序列查找每个时点之后最近的有效值(薄封装)

主要用于回测时提取价格 直接调用 query.query_nearest_after

Parameters:

params (dict) – 必须包含以下键: - codes: 代码列表 - datetimes: 目标时间戳列表(格式:’YYYY-MM-DD HH:MM’) - metric: 查询的指标名称 - time_tolerance: 允许的最大时间间隔(单位:小时,默认不限制)

Returns:

包含以下列:

code | input_ts | datetime | diff_hours | value | name

Return type:

DataFrame

query_nearest_in_range_after(params=None)[source]

在 (start, end) 区间内查找距 start 最近的有效值(薄封装)

Parameters:

params (dict) – 必须包含以下键: - codes: 代码列表 - ranges: [(start, end), …] 区间列表 - metric: 指标名 - time_tolerance: 锚点容差(小时,可选)

Returns:

code | input_ts(=start) | datetime | diff_hours | value | name

Return type:

DataFrame

query_nearest_in_range_before(params=None)[source]

在 (start, end) 区间内查找距 end 最近的有效值(薄封装)

Parameters:

params (dict) – 必须包含以下键: - codes: 代码列表 - ranges: [(start, end), …] 区间列表 - metric: 指标名 - time_tolerance: 锚点容差(小时,可选)

Returns:

code | input_ts(=end) | datetime | diff_hours | value | name

Return type:

DataFrame

query_nearest_before(params=None)[source]

根据输入时间戳序列查找每个时点之前最近的有效值(薄封装)

主要用于回测时提取历史价格特征 直接调用 query.query_nearest_before

Parameters:

params (dict) – 必须包含以下键: - codes: 代码列表 - datetimes: 目标时间戳列表(格式:’YYYY-MM-DD HH:MM’) - metric: 查询的指标名称 - time_tolerance: 允许的最大时间间隔(单位:小时,默认不限制)

Returns:

包含以下列:

code | input_ts | datetime | diff_hours | value | name

Return type:

DataFrame

get_latest_date(code=None, metric=None)[source]

获取数据库中的最新日期(薄封装)

直接调用 query.get_latest_date

Parameters:
  • code (Optional[str]) – 代码,None表示所有代码

  • metric (Optional[str]) – 指标,None表示所有指标

Returns:

最新日期

get_available_dates(code, metric, start_date=None, end_date=None)[source]

获取指定代码和指标的可用日期列表(薄封装)

直接调用 query.get_available_dates

Parameters:
Returns:

日期列表

validate_dataframe(df, validations)[source]

验证和修复DataFrame(薄封装)

直接调用 validation.validate_and_fix

Parameters:
  • df (DataFrame) – 待验证的DataFrame

  • validations (dict) – 验证配置

Returns:

(修复后的DataFrame, 验证报告)

check_excel_file(filepath, checks=None)[source]

检查Excel文件中的错误(薄封装)

直接调用 excel.check_excel_errors

Parameters:
Returns:

(是否通过, 错误列表)

truncate_table()[source]

清空表中所有数据

WARNING: 此操作不可逆,会删除表中所有记录

Returns:

删除的行数

close()[source]

关闭数据库连接

betalens.datafeed.read_file(filepath, logger=None, **kwargs)[source]

读取CSV或XLSX文件为DataFrame 支持多种编码格式(UTF-8, GB2312, GBK, GB18030等)

Parameters:
  • filepath (Union[str, Path]) – 文件路径

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

  • **kwargs – 传递给pd.read_csv或pd.read_excel的额外参数

Return type:

DataFrame

Returns:

DataFrame

Raises:
betalens.datafeed.read_csv_with_encoding(filepath, logger=None, **kwargs)[source]

尝试使用多种编码读取CSV文件

Parameters:
  • filepath (Path) – CSV文件路径

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

  • **kwargs – 传递给pd.read_csv的额外参数

Return type:

DataFrame

Returns:

DataFrame

Raises:

Exception – 所有编码尝试失败

betalens.datafeed.cross_section_to_db_format(df, key_columns, value_columns, key_value_mapping, additional_fields=None, logger=None)[source]

将cross-section格式数据转换为数据库三列表格式

Parameters:
  • df (DataFrame) – 输入DataFrame

  • key_columns (List[str]) – 键列(如code, name等,保持不变的列)

  • value_columns (List[str]) – 值列(需要转换的列,如各个日期或指标)

  • key_value_mapping (Dict[str, str]) – 列名映射,如{‘variable’: ‘metric’, ‘value’: ‘value’}

  • additional_fields (Optional[Dict[str, any]]) – 额外添加的字段,如{‘datetime’: ‘2024-01-01’}

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

转换后的DataFrame,格式为:[key_columns, metric, value, …]

Example

输入:

code | name | 2024-01-01 | 2024-01-02 A001 | 股票A | 100 | 101

输出:

code | name | date | value A001 | 股票A | 2024-01-01 | 100 A001 | 股票A | 2024-01-02 | 101

betalens.datafeed.batch_read_files(folder_path, file_pattern='*.csv', recursive=False, logger=None, **read_kwargs)[source]

批量读取文件夹中的文件

Parameters:
  • folder_path (Union[str, Path]) – 文件夹路径

  • file_pattern (str) – 文件匹配模式,如”.csv”, “.xlsx”

  • recursive (bool) – 是否递归搜索子文件夹

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

  • **read_kwargs – 传递给read_file的参数

Return type:

Dict[str, DataFrame]

Returns:

字典,键为文件路径,值为DataFrame

betalens.datafeed.batch_write_files(data_dict, output_dir, file_format='csv', create_subdirs=True, logger=None, **write_kwargs)[source]

批量写入文件 CSV默认使用utf-8-sig编码,确保Excel能正确显示中文

Parameters:
  • data_dict (Dict[str, DataFrame]) – 字典,键为相对路径/文件名,值为DataFrame

  • output_dir (Union[str, Path]) – 输出根目录

  • file_format (str) – 输出格式,’csv’或’xlsx’

  • create_subdirs (bool) – 是否创建子目录

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

  • **write_kwargs – 传递给to_csv或to_excel的参数

Return type:

List[str]

Returns:

成功写入的文件路径列表

betalens.datafeed.create_directory_tree(data_dict, output_dir, categorize_by=None, logger=None)[source]

创建目录树并分类保存文件

Parameters:
  • data_dict (Dict[str, DataFrame]) – 数据字典

  • output_dir (Union[str, Path]) – 输出目录

  • categorize_by (Optional[str]) – 分类依据,如’date’, ‘metric’等(DataFrame中的列名)

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Dict[str, List[str]]

Returns:

目录树字典,键为类别,值为文件路径列表

betalens.datafeed.check_excel_errors(df, checks=None, logger=None)[source]

检查Excel数据中的错误

Parameters:
  • df (DataFrame) – 待检查的DataFrame

  • checks (Optional[Dict[str, any]]) –

    检查配置字典,如: {

    ’check_empty_rows’: True, # 检查空行 ‘check_null_values’: True, # 检查空值 ‘check_duplicates’: [‘col1’, ‘col2’], # 检查重复(指定列) ‘check_data_types’: {‘col1’: ‘int’, ‘col2’: ‘float’}, # 检查数据类型 ‘check_value_range’: {‘col1’: (0, 100)}, # 检查值范围

    }

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Tuple[bool, List[Dict]]

Returns:

(是否通过检查, 错误列表)

class betalens.datafeed.FillStrategy(value)[source]

Bases: Enum

填充策略枚举

RAISE_ERROR = 'raise_error'
DROP = 'drop'
FILL_FORWARD = 'ffill'
FILL_BACKWARD = 'bfill'
FILL_VALUE = 'fill_value'
FILL_MEAN = 'mean'
FILL_MEDIAN = 'median'
FILL_MODE = 'mode'
INTERPOLATE = 'interpolate'
betalens.datafeed.check_null_values(df, columns=None, check_types=None, logger=None)[source]

检查空值、NaN、None

Parameters:
  • df (DataFrame) – 待检查的DataFrame

  • columns (Optional[List[str]]) – 要检查的列名列表,None表示检查所有列

  • check_types (Optional[List[str]]) – 检查类型列表,可选[‘null’, ‘nan’, ‘none’, ‘empty_string’]

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Dict[str, Any]

Returns:

检查结果字典

betalens.datafeed.check_datetime_column(df, date_column, expected_freq=None, check_sorted=True, check_duplicates=True, check_format=True, logger=None)[source]

检查日期列的各种问题

Parameters:
  • df (DataFrame) – DataFrame

  • date_column (str) – 日期列名

  • expected_freq (Optional[str]) – 期望的频率,如’D’(日), ‘W’(周), ‘M’(月), ‘Q’(季度), ‘Y’(年)

  • check_sorted (bool) – 是否检查排序

  • check_duplicates (bool) – 是否检查重复

  • check_format (bool) – 是否检查格式

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Dict[str, Any]

Returns:

检查结果字典

betalens.datafeed.fix_null_values(df, strategy, columns=None, fill_value=None, inplace=False, logger=None)[source]

修复空值

Parameters:
  • df (DataFrame) – DataFrame

  • strategy (Union[FillStrategy, str]) – 填充策略

  • columns (Optional[List[str]]) – 要处理的列,None表示所有列

  • fill_value (Optional[Any]) – 当strategy为FILL_VALUE时使用的填充值

  • inplace (bool) – 是否原地修改

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

修复后的DataFrame

betalens.datafeed.drop_duplicates_strict(df, subset=None, keep='first', verify_all_fields=True, ignore_cols=None, inplace=False, logger=None)[source]

严格去重:确保只有完全相同的行才会被删除

Parameters:
  • df (DataFrame) – DataFrame

  • subset (Optional[List[str]]) – 用于判断重复的列,None表示所有列

  • keep (str) – ‘first’, ‘last’, False(删除所有重复)

  • verify_all_fields (bool) – 是否验证subset外的其他字段也相同

  • ignore_cols (Optional[List[str]]) – 验证时忽略的列(如索引、时间戳等)

  • inplace (bool) – 是否原地修改

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Tuple[DataFrame, Dict[str, Any]]

Returns:

(修复后的DataFrame, 去重报告)

betalens.datafeed.fix_datetime_column(df, date_column, fix_format=True, fix_duplicates='keep_first', fix_sort=True, sort_order='ascending', dedupe_subset=None, verify_all_fields=True, inplace=False, logger=None)[source]

修复日期列的问题

Parameters:
  • df (DataFrame) – DataFrame

  • date_column (str) – 日期列名

  • fix_format (bool) – 是否修复格式(转换为datetime)

  • fix_duplicates (Optional[str]) – 如何处理重复,None表示不处理

  • fix_sort (bool) – 是否排序

  • sort_order (str) – 排序顺序,’ascending’或’descending’

  • dedupe_subset (Optional[List[str]]) – 去重时使用的列组合,None则使用[date_column] 推荐: [‘code’, ‘metric’, date_column] 避免误删不同metric的数据

  • verify_all_fields (bool) – 是否验证subset外的其他字段也相同(严格模式)

  • inplace (bool) – 是否原地修改

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

修复后的DataFrame

betalens.datafeed.validate_and_fix(df, validations, inplace=False, logger=None)[source]

综合验证和修复

Parameters:
  • df (DataFrame) – DataFrame

  • validations (Dict[str, Dict]) –

    验证配置字典,格式如: {

    ’null_check’: {

    ‘columns’: [‘col1’, ‘col2’], ‘fix_strategy’: ‘ffill’

    }, ‘datetime_check’: {

    ’column’: ‘date’, ‘expected_freq’: ‘D’, ‘fix_format’: True, ‘fix_duplicates’: ‘keep_first’, ‘fix_sort’: True

    }

    }

  • inplace (bool) – 是否原地修改

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Tuple[DataFrame, Dict[str, Any]]

Returns:

(修复后的DataFrame, 验证报告)

class betalens.datafeed.DataValidator(logger=None)[source]

Bases: object

DataValidator - 已弃用,请直接使用模块级函数

此类已弃用,方法不再实现。请使用模块级函数: - check_null_values() - check_datetime_column() - fix_null_values() - drop_duplicates_strict() - fix_datetime_column() - validate_and_fix()

Parameters:

logger (Logger | None)

__init__(logger=None)[source]
Parameters:

logger (Logger | None)

check_null_values(df, columns=None, check_types=None)[source]
check_datetime_column(df, date_column, expected_freq=None, check_sorted=True, check_duplicates=True, check_format=True)[source]
fix_null_values(df, strategy, columns=None, fill_value=None, inplace=False)[source]
drop_duplicates_strict(df, subset=None, keep='first', verify_all_fields=True, ignore_cols=None, inplace=False)[source]
fix_datetime_column(df, date_column, fix_format=True, fix_duplicates='keep_first', fix_sort=True, sort_order='ascending', dedupe_subset=None, verify_all_fields=True, inplace=False)[source]
validate_and_fix(df, validations, inplace=False)[source]
betalens.datafeed.build_query(table_name, conditions=None, params=None, select_columns='*', order_by=None, limit=None)[source]

构建SQL查询

Parameters:
  • table_name (str) – 数据库表名

  • conditions (Optional[List[str]]) – 条件列表

  • params (Optional[List]) – 参数列表

  • select_columns (str) – 要选择的列

  • order_by (Optional[str]) – ORDER BY 子句(如 “datetime DESC”)

  • limit (Optional[int]) – 最大返回行数

Return type:

Tuple[str, List]

Returns:

(SQL语句, 参数列表)

betalens.datafeed.generate_input_pairs(codes, datetimes)[source]

生成(code, datetime)笛卡尔积

Parameters:
  • codes (List[str]) – 代码列表

  • datetimes (List[str]) – 时间戳列表

Return type:

List[Tuple[str, str]]

Returns:

(code, datetime)元组列表

betalens.datafeed.generate_input_range_pairs(codes, ranges)[source]

生成 (code, start_ts, end_ts) 笛卡尔积

Parameters:
Return type:

List[Tuple[str, str, str]]

Returns:

(code, start_ts, end_ts) 元组列表

betalens.datafeed.build_nearest_query(table_name, input_tuples, metric, direction='after', time_tolerance=None)[source]

构建最近时点匹配查询

Parameters:
  • table_name (str) – 表名

  • input_tuples (List[Tuple[str, str]]) – (code, datetime)元组列表

  • metric (str) – 指标名

  • direction (str) – 查询方向,’after’(之后)或’before’(之前)

  • time_tolerance (Optional[float]) – 时间容差(小时)

Return type:

Tuple[str, List]

Returns:

(SQL语句, 参数列表)

betalens.datafeed.build_nearest_in_range_query(table_name, input_tuples, metric, direction='after', time_tolerance=None)[source]

构建区间内最近时点匹配查询

在每个 (code, start_ts, end_ts) 区间内,按方向查找距锚点最近的数据: - direction=’after’:锚点为 start_ts,区间过滤 t.datetime > start AND t.datetime < end - direction=’before’:锚点为 end_ts,区间过滤 t.datetime <= end AND t.datetime >= start

Parameters:
  • table_name (str) – 表名

  • input_tuples (List[Tuple[str, str, str]]) – (code, start_ts, end_ts) 元组列表

  • metric (str) – 指标名

  • direction (str) – 查询方向,’after’ 或 ‘before’

  • time_tolerance (Optional[float]) – 锚点容差(小时),与区间共同生效(取交集)

Return type:

Tuple[str, List]

Returns:

(SQL语句, 参数列表)

betalens.datafeed.query_nearest_after(cursor, table_name, codes, datetimes, metric, time_tolerance=None, logger=None)[source]

查询每个时点之后最近的有效值

用途:主要用于回测时提取价格 时间结构:最新特征 <= 提数时点 < 调仓时点

Parameters:
  • cursor – 数据库游标

  • table_name (str) – 表名

  • codes (List[str]) – 代码列表

  • datetimes (List[str]) – 时间戳列表,格式’YYYY-MM-DD HH:MM:SS’

  • metric (str) – 查询的指标名称

  • time_tolerance (Optional[float]) – 允许的最大时间间隔(单位:小时)

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

DataFrame,包含列:
  • code: 代码

  • input_ts: 输入时间戳(提数时点)

  • datetime: 匹配到的数据时间戳

  • diff_hours: 时间差(小时)

  • value: 数据值

  • name: 名称

betalens.datafeed.query_nearest_before(cursor, table_name, codes, datetimes, metric, time_tolerance=None, logger=None)[source]

查询每个时点之前最近的有效值

用途:主要用于回测时提取历史价格特征 时间结构:调仓时点 <= 提数时点 < 最新特征时点

Parameters:
  • cursor – 数据库游标

  • table_name (str) – 表名

  • codes (List[str]) – 代码列表

  • datetimes (List[str]) – 时间戳列表,格式’YYYY-MM-DD HH:MM:SS’

  • metric (str) – 查询的指标名称

  • time_tolerance (Optional[float]) – 允许的最大时间间隔(单位:小时)

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

DataFrame,包含列:
  • code: 代码

  • input_ts: 输入时间戳(提数时点)

  • datetime: 匹配到的数据时间戳

  • diff_hours: 时间差(小时)

  • value: 数据值

  • name: 名称

betalens.datafeed.query_nearest_in_range_after(cursor, table_name, codes, ranges, metric, time_tolerance=None, logger=None)[source]

在每个 (start, end) 区间内查询距 start 最近的有效值(向后查)

时间结构:start <= t.datetime - epsilon, t.datetime < end,锚点 = start

Parameters:
  • cursor – 数据库游标

  • table_name (str) – 表名

  • codes (List[str]) – 代码列表

  • ranges (List[Tuple[str, str]]) – (start, end) 区间列表,时间格式 ‘YYYY-MM-DD HH:MM:SS’

  • metric (str) – 指标名

  • time_tolerance (Optional[float]) – 锚点容差(小时),与区间共同生效

  • logger (Optional[Logger]) – 日志记录器

Returns:

code, input_ts(=start), datetime, diff_hours, value, name

Return type:

DataFrame

betalens.datafeed.query_nearest_in_range_before(cursor, table_name, codes, ranges, metric, time_tolerance=None, logger=None)[source]

在每个 (start, end) 区间内查询距 end 最近的有效值(向前查)

时间结构:start <= t.datetime <= end,锚点 = end

Parameters:
  • cursor – 数据库游标

  • table_name (str) – 表名

  • codes (List[str]) – 代码列表

  • ranges (List[Tuple[str, str]]) – (start, end) 区间列表,时间格式 ‘YYYY-MM-DD HH:MM:SS’

  • metric (str) – 指标名

  • time_tolerance (Optional[float]) – 锚点容差(小时),与区间共同生效

  • logger (Optional[Logger]) – 日志记录器

Returns:

code, input_ts(=end), datetime, diff_hours, value, name

Return type:

DataFrame

betalens.datafeed.query_time_range(cursor, table_name, codes=None, start_date=None, end_date=None, metric=None, limit=None, logger=None)[source]

查询指定时间范围的数据

Parameters:
  • cursor – 数据库游标

  • table_name (str) – 表名

  • codes (Optional[List[str]]) – 代码列表,None表示所有代码

  • start_date (Optional[str]) – 开始日期

  • end_date (Optional[str]) – 结束日期

  • metric (Optional[str]) – 指标名称

  • limit (Optional[int]) – 最大返回行数,None表示不限制(按 datetime DESC 返回最新的 N 行)

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

DataFrame

betalens.datafeed.get_available_dates(cursor, table_name, code, metric, start_date=None, end_date=None, logger=None)[source]

获取指定代码和指标的可用日期列表

Parameters:
  • cursor – 数据库游标

  • table_name (str) – 表名

  • code (str) – 代码

  • metric (str) – 指标

  • start_date (Optional[str]) – 开始日期

  • end_date (Optional[str]) – 结束日期

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

List[datetime]

Returns:

日期列表

betalens.datafeed.get_latest_date(cursor, table_name, code=None, metric=None, logger=None)[source]

获取最新的数据日期

Parameters:
  • cursor – 数据库游标

  • table_name (str) – 表名

  • code (Optional[str]) – 代码,None表示所有代码

  • metric (Optional[str]) – 指标,None表示所有指标

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Optional[datetime]

Returns:

最新日期

betalens.datafeed.pivot_to_wide(df, index_cols, pivot_col, value_col)[source]

将长格式数据转换为宽格式

Parameters:
  • df (DataFrame) – 长格式DataFrame

  • index_cols (List[str]) – 索引列

  • pivot_col (str) – 用于pivot的列(将变为新列名)

  • value_col (str) – 值列

Return type:

DataFrame

Returns:

宽格式DataFrame

betalens.datafeed.align_to_dates(df, target_dates, date_column='datetime', method='ffill')[source]

将数据对齐到目标日期序列

Parameters:
  • df (DataFrame) – 输入DataFrame

  • target_dates (List[datetime]) – 目标日期列表

  • date_column (str) – 日期列名

  • method (str) – 填充方法,’ffill’或’bfill’

Return type:

DataFrame

Returns:

对齐后的DataFrame

betalens.datafeed.calculate_returns(df, price_column, periods=[1], group_by=None)[source]

计算收益率

Parameters:
  • df (DataFrame) – 包含价格数据的DataFrame

  • price_column (str) – 价格列名

  • periods (List[int]) – 计算周期列表

  • group_by (Optional[str]) – 分组列(如code)

Return type:

DataFrame

Returns:

添加了收益率列的DataFrame

betalens.datafeed.query_industry(cursor, codes, dates, scheme='申万一级行业', table_name='industry', exact=False, logger=None)[source]

正查:每个 (code, date) 在该日所属的行业(point-in-time,取 datetime<=date 的最近一条)

Parameters:
  • cursor – 数据库游标(建议 RealDictCursor)

  • codes (List[str]) – 证券代码列表

  • dates (Union[str, List[str]]) – 查询日期,单个或列表,格式 ‘YYYY-MM-DD’ 或 ‘YYYY-MM-DD HH:MM:SS’

  • scheme (str) – 分类体系(即 metric)。不带版本后缀(如 ‘申万一级行业’)时自动匹配全部 版本,最近一条天然落到查询日生效的版本;带后缀(如 ‘申万一级行业(2021)’) 则只查该版本。

  • table_name (str) – 表名,默认 ‘industry’

  • exact (bool) – 强制精确匹配 metric(关闭版本自动选择),默认 False

  • logger (Optional[Logger]) – 日志器

Returns:

code | query_date | effective_dt | sec_name |

industry_value | ind_name | ind_code | scheme

无归属记录的 (code,date) 行业字段为 NaN/None

Return type:

DataFrame

betalens.datafeed.get_industry_members(cursor, industry, date, scheme='申万一级行业', table_name='industry', by='name', exact=False, logger=None)[source]

反查:某日某行业的成分股(每只股票取 datetime<=date 的最近归属,再筛目标行业)

Parameters:
  • cursor – 数据库游标

  • industry (Union[str, int, float]) – 目标行业,可为行业名(str,匹配 remark->>’ind_name’) 或行业代码数值(int/float,匹配 value)

  • date (str) – 查询日期

  • scheme (str) – 分类体系(metric)。不带版本后缀时自动匹配全部版本(最近一条天然落到 查询日生效的版本);带后缀只查该版本。

  • table_name (str) – 表名

  • by (str) – ‘name’ 用行业名匹配,’value’ 用行业代码数值匹配; industry 类型也会自动推断

  • exact (bool) – 强制精确匹配 metric(关闭版本自动选择),默认 False

  • logger (Optional[Logger]) – 日志器

Returns:

code | sec_name | industry_value | ind_name | ind_code | scheme

Return type:

DataFrame

betalens.datafeed.build_industry_records(df, scheme='申万一级行业', code_col='code', name_col='name', date_col='effective_dt', ind_name_col='ind_name', ind_code_col='ind_code')[source]

入库辅助:把行业归属明细整理成可直接 incremental_insert 的长格式

输入每行 = 一条归属事件 (证券, 生效日, 行业)。输出列:

datetime, code, name, metric(=scheme), value(=行业代码数值), remark(dict)

Parameters:
  • df (DataFrame) – 明细 DataFrame

  • scheme (str) – 分类体系,写入 metric

  • code_col/name_col/date_col – 证券代码/名称/生效日 列名

  • ind_name_col (str) – 行业名列名

  • ind_code_col (Optional[str]) – 行业代码列名(如 ‘801780.SI’);为 None 则不填 value

  • code_col (str)

  • name_col (str)

  • date_col (str)

Return type:

DataFrame

Returns:

长格式 DataFrame(datetime, code, name, metric, value, remark)

betalens.datafeed.get_index_universe(cursor, index_code, date, table_name='index_universe', metric='universe', logger=None)[source]

返回 index_code 在 date 当日生效的成分股代码列表(point-in-time)。

步骤:用 query.query_nearest_before 找到 <=date 的最近生效快照日,再取该行 remark 中的 constituents 列表。该日前无可用股票池则返回空列表。

Parameters:
  • cursor – 数据库游标(建议 RealDictCursor)

  • index_code (str) – 指数代码,如 ‘000906.SH’

  • date (str) – 查询日期,’YYYY-MM-DD’ 或 ‘YYYY-MM-DD HH:MM:SS’

  • table_name (str) – 表名,默认 ‘index_universe’

  • metric (str) – 指标名,默认 ‘universe’

  • logger (Optional[Logger]) – 日志器

Return type:

List[str]

Returns:

成分股代码列表(如 [‘000001.SZ’, …]);无可用股票池则返回 []

betalens.datafeed.get_index_universe_date(cursor, index_code, date, table_name='index_universe', metric='universe', logger=None)[source]

返回某指数在某日实际生效的快照日期(point-in-time,取 datetime<=date 的最近一条)。

复用 query.query_nearest_before 定位最近生效日。

Parameters:
  • cursor – 数据库游标(建议 RealDictCursor)

  • index_code (str) – 指数代码,如 ‘000906.SH’

  • date (str) – 查询日期,’YYYY-MM-DD’ 或 ‘YYYY-MM-DD HH:MM:SS’

  • table_name (str) – 表名,默认 ‘index_universe’

  • metric (str) – 指标名,默认 ‘universe’

  • logger (Optional[Logger]) – 日志器

Returns:

生效快照的 datetime(pandas.Timestamp);该日前无可用股票池则返回 None

betalens.datafeed.process_excel_to_db_format(filepath, config, logger=None)[source]

处理Excel文件为数据库格式

Parameters:
  • filepath (Union[str, Path]) – Excel文件路径

  • config (Dict[str, Any]) – 处理配置,包含: - key_columns: 键列列表 - value_columns: 值列列表(如果为None,则自动推断) - key_value_mapping: 列名映射字典 - additional_fields: 额外字段字典 - validation: 验证配置 - read_kwargs: 读取参数

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Tuple[Optional[DataFrame], List[Dict]]

Returns:

(处理后的DataFrame, 错误列表)

betalens.datafeed.check_existing_rows(cursor, df, table, key_columns=None, logger=None)[source]

检查数据库中是否已存在相关数据行

Parameters:
  • cursor – 数据库游标

  • df (DataFrame) – 待检查的DataFrame

  • table (str) – 目标表名

  • key_columns (Optional[List[str]]) – 用于判断重复的关键列,默认使用[‘code’, ‘metric’, ‘datetime’]

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

包含已存在行的DataFrame(只包含key_columns列),如果没有则返回空DataFrame

betalens.datafeed.insert_dataframe(cursor, conn, df, table, batch_size=1000, check_duplicates=True, key_columns=None, skip_duplicates=True, logger=None)[source]

将DataFrame插入数据库

Parameters:
  • cursor – 数据库游标

  • conn – 数据库连接

  • df (DataFrame) – 待插入的DataFrame

  • table (str) – 目标表名

  • batch_size (int) – 批量插入大小

  • check_duplicates (bool) – 是否检查数据库中已存在的数据(默认True)

  • key_columns (Optional[List[str]]) – 用于判断重复的关键列,默认使用[‘code’, ‘metric’, ‘datetime’]

  • skip_duplicates (bool) – 是否跳过重复数据(默认True),如果为False,重复数据会导致插入失败

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Tuple[bool, str, Dict[str, Any]]

Returns:

(是否成功, 消息, 统计信息字典)

betalens.datafeed.get_existing_dates(cursor, table, code, metric, logger=None)[source]

获取数据库中已存在的日期

Parameters:
  • cursor – 数据库游标

  • table (str) – 表名

  • code (str) – 代码

  • metric (str) – 指标

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

List[datetime]

Returns:

日期列表

betalens.datafeed.incremental_insert(cursor, conn, df, table, date_column='datetime', code_column='code', metric_column='metric', logger=None)[source]

增量插入:只插入数据库中不存在的数据

Parameters:
  • cursor – 数据库游标

  • conn – 数据库连接

  • df (DataFrame) – 待插入的DataFrame

  • table (str) – 目标表名

  • date_column (str) – 日期列名

  • code_column (str) – 代码列名

  • metric_column (str) – 指标列名

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Tuple[int, int]

Returns:

(新增行数, 重复行数)

betalens.datafeed.save_error_file(filepath, df, errors, error_dir='./errors', error_subdir='failed_files', logger=None)[source]

保存错误文件和日志

Parameters:
  • filepath (Union[str, Path]) – 原始文件路径

  • df (Optional[DataFrame]) – 处理后的DataFrame(可能为None)

  • errors (List[Dict]) – 错误列表

  • error_dir (Union[str, Path]) – 错误文件保存目录

  • error_subdir (str) – 错误子目录

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

str

Returns:

错误记录ID

betalens.datafeed.process_directory_tree(cursor, conn, root_dir, table, config, file_pattern='*.csv', recursive=True, mode='insert', error_dir='./errors', logger=None)[source]

按照目录树结构处理和插入Excel文件

Parameters:
  • cursor – 数据库游标

  • conn – 数据库连接

  • root_dir (Union[str, Path]) – 根目录

  • table (str) – 目标表名

  • config (Dict[str, Any]) – 处理配置

  • file_pattern (str) – 文件匹配模式

  • recursive (bool) – 是否递归搜索

  • mode (str) – 插入模式,’insert’(直接插入)或’incremental’(增量插入)

  • error_dir (Union[str, Path]) – 错误文件保存目录

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Dict[str, Any]

Returns:

处理统计字典

class betalens.datafeed.ConfigManager(config_file=None)[source]

Bases: object

配置管理器

Parameters:

config_file (str | None)

__init__(config_file=None)[source]

初始化配置管理器

Parameters:

config_file (Optional[str]) – 配置文件路径,默认为当前模块目录下的config.json

load()[source]

从文件加载配置

如果文件不存在或加载失败,使用默认配置

Return type:

None

save(config_file=None)[source]

保存当前配置到文件

Parameters:

config_file (Optional[str]) – 配置文件路径,默认使用初始化时的路径

Return type:

None

get(key_path, default=None)[source]

获取配置值

Parameters:
  • key_path (str) – 配置键路径,使用点号分隔,如 ‘database.dbname’

  • default (Optional[Any]) – 默认值,如果键不存在则返回此值

Return type:

Any

Returns:

配置值

Example

>>> config = ConfigManager()
>>> config.get('database.dbname')
'datafeed'
>>> config.get('database.port')
'5432'
set(key_path, value)[source]

设置配置值

Parameters:
  • key_path (str) – 配置键路径,使用点号分隔,如 ‘database.dbname’

  • value (Any) – 配置值

Return type:

None

Example

>>> config = ConfigManager()
>>> config.set('database.dbname', 'my_database')
>>> config.get('database.dbname')
'my_database'
get_section(section)[source]

获取配置节

Parameters:

section (str) – 配置节名称,如 ‘database’, ‘excel’

Return type:

Dict[str, Any]

Returns:

配置节字典

property config: Dict[str, Any]

获取完整配置字典

__getitem__(key)[source]

支持字典式访问

Return type:

Any

Parameters:

key (str)

__setitem__(key, value)[source]

支持字典式设置

Return type:

None

Parameters:
betalens.datafeed.get_config(config_file=None)[source]

获取全局配置实例

Parameters:

config_file (Optional[str]) – 配置文件路径,仅在首次调用时有效

Return type:

ConfigManager

Returns:

ConfigManager实例

betalens.datafeed.reset_config()[source]

重置全局配置实例

Return type:

None

betalens.datafeed.get_database_config()[source]

获取数据库配置

Return type:

Dict[str, str]

betalens.datafeed.get_logging_config()[source]

获取日志配置

Return type:

Dict[str, str]

betalens.datafeed.get_excel_config()[source]

获取Excel配置

Return type:

Dict[str, Any]

betalens.datafeed.get_wind_config()[source]

获取Wind配置

Return type:

Dict[str, Any]

betalens.datafeed.get_ede_config()[source]

获取EDE配置

Return type:

Dict[str, Any]

betalens.datafeed.func_timer(function)[source]

用装饰器实现函数计时 :type function: :param function: 需要计时的函数 :return: None

betalens.datafeed.get_absolute_trade_days(begin_date, end_date, period, use_pmc=True)[source]

获取交易日序列

Parameters:
  • begin_date – 开始日期,字符串格式

  • end_date – 结束日期,字符串格式

  • period – 周期,如’D’(日), ‘W’(周), ‘M’(月), ‘Q’(季), ‘S’(半年), ‘Y’(年)

  • use_pmc – 默认True,使用pandas_market_calendars(中国A股/北京时区);False时使用akshare

Returns:

交易日列表(datetime.datetime对象)

betalens.datafeed.trade_days_offset(begin_datetime, offset, period='D')[source]

交易日偏移计算

Parameters:
  • begin_datetime – 起始datetime对象

  • offset – 偏移量(整数)

  • period – 周期,默认’D’

Returns:

偏移后的datetime对象

子模块

core

## 研究数据库结构(摘要)

### 表1 个券行情(日频) - 列:入库实际时间(=最早可交易时间)、windcode、中文名、数据性质(收盘价/成交量等)、数值、备注(json) - 规则:开盘价最早 09:30;其余价量最早 15:00 可确定 - 处理:计算日频收益率(如 close-to-close)用于回测与挖掘

### 表2 个券基本面(日频入库,事件驱动) - 列:入库实际时间(=最早可交易时间)、数据理论发生时间(报告期,如 0331/0630/0930/1231)、windcode、中文名、数据性质(如归母净利润)、数值、备注(json) - 规则:按公告时点入库,关注盘前/盘中/盘后;理论发生时间仅用于展示,不做因果外推 - 年报次序不一:优先一致预期或线性外推作占位,尽量避免非原生数据入库

### 表3 宏观经济(事件驱动) - 列:入库实际时间(=最早可交易时间)、数据理论发生时间(如“1月GDP”)、windcode、中文名、数据性质(可含均线算子)、数值、备注(json) - 规则:与表2一致(事件驱动、区分公告时点与发生时点)

### 表4 因子库 - 列:入库实际时间(=最早可交易时间)、数据编制方式、数值、备注(json)

## 投资数据库结构(摘要) - 不保留完整历史 - 从研究数据库按需拉取最近滚动窗口数据;其余数据在线拉取 - 在线数据仅记录入库时间(可交易可用时点)

## 投资数据库结构: - 出于投资目的,实际不需要历史数据 - 从研究数据库拉取所需最近一个滚动窗口内的数据,其余全部在线拉取,并直接记录入库时间

更新日志: 2025-10-31: 重构datafeed.py,新增工具模块 - excel.py: Excel文件处理模块 - validation.py: 数据验证和异常检查工具 - query.py: 数据库查询功能重构 - integration.py: 数据库与Excel交互功能 - wind_ingest.py: Wind数据抓取模块

2025-11-03: 简化core.py为薄封装层 - 移除所有业务逻辑到子模块 - core.py仅保留连接管理和函数组合 - 所有数据转换、插入逻辑位于子模块

betalens.datafeed.core.func_timer(function)[source]

用装饰器实现函数计时 :type function: :param function: 需要计时的函数 :return: None

class betalens.datafeed.core.Datafeed(table_name, db_config=None, log_dir=None)[source]

Bases: object

数据管理主类(薄封装层)

职责: - 管理数据库连接 - 提供统一的日志记录 - 组合子模块功能为高层API - 不包含业务逻辑(所有逻辑在子模块中)

使用示例:

# 创建实例(使用默认配置) df = Datafeed(“daily_market_data”)

# 创建实例(自定义数据库配置) df = Datafeed(

table_name=”daily_market_data”, db_config={

‘dbname’: ‘my_database’, ‘user’: ‘my_user’, ‘password’: ‘my_password’, ‘host’: ‘localhost’, ‘port’: ‘5432’

}

)

# 单文件插入 df.insert_csv_file(“data.csv”, config={…})

# Wind数据抓取 df.ingest_wind_daily_market(

codes=[‘000001.SZ’], start_date=’2024-01-01’, end_date=’2024-01-31’

)

# 查询 data = df.query_time_range(

codes=[‘000001.SZ’], start_date=’2024-01-01’, metric=’收盘价(元)’

)

# 关闭 df.close()

__init__(table_name, db_config=None, log_dir=None)[source]

初始化Datafeed实例

Parameters:
  • table_name – 数据库表名

  • db_config – 数据库配置字典,包含以下键: - dbname: 数据库名 - user: 用户名 - password: 密码 - host: 主机地址 - port: 端口 如果为None,使用config.json中的配置

  • log_dir – 日志目录,如果为None,使用config.json中的配置

insert_csv_file(filepath, config, mode='incremental')[source]

单文件CSV插入(薄封装)

组合流程: 1. excel.read_file - 读取文件 2. excel.cross_section_to_db_format - 转换格式(如果需要) 3. validation.validate_and_fix - 验证和修复(如果配置) 4. integration.insert_dataframe/incremental_insert - 插入

Parameters:
  • filepath (str) – 文件路径

  • config (dict) – 配置字典,包含: - key_columns: 键列列表(用于cross_section转换) - value_columns: 值列列表(可选,自动推断) - key_value_mapping: 列名映射 - additional_fields: 额外字段 - validation: 验证配置 - apply_time_alignment: 是否应用时间对齐(开盘09:30,其他15:00)

  • mode (str) – ‘insert’(直接插入)或’incremental’(增量插入,默认)

Return type:

dict

Returns:

统计信息字典

insert_ede_file(filepath, date_from='filename', default_datetime=None, mode='incremental')[source]

处理并插入EDE格式的Excel文件(薄封装)

EDE格式特征: - 第一列:证券代码 -第二列:证券简称 - 第三列及之后:指标列,格式为”指标名 [元数据] 值类型 [元数据] 单位”

示例EDE格式:

证券代码 证券简称 流通A股 [交易日期] 最新 [单位] 股 002460.SZ 赣锋锂业 1,211,379,763.0000 1772.HK 赣锋锂业 1,211,379,763.0000

处理流程: 1. 读取Excel文件 2. 清理数据(去除空值、”数据来源:Wind”等) 3. 识别code、name列 4. 解析metric列,提取指标名称和元数据 5. 构建日期列(从文件名或列名中提取) 6. 转换为数据库格式 7. 插入数据库

Parameters:
  • filepath (str) – EDE格式Excel文件路径

  • date_from (str) – 日期来源,可选值: - ‘filename’: 从文件名提取日期(如EDE20251103.xlsx -> 2025-11-03 15:30:00) - ‘metric’: 从列名中的[日期]部分提取

  • default_datetime (Optional[str]) – 默认日期时间(当无法从文件名或列名提取时使用) 格式:’YYYY-MM-DD HH:MM:SS’,如’2025-11-03 15:30:00’

  • mode (str) – 插入模式 - ‘incremental’: 增量插入(默认),只插入新数据 - ‘insert’: 直接插入,会检查重复并跳过

Return type:

dict

Returns:

统计信息字典,包含:
  • success: 是否成功

  • new_rows: 新增行数

  • skipped_rows: 跳过行数

  • errors: 错误列表(如果有)

Example

>>> df = Datafeed('daily_market_data')
>>> result = df.insert_ede_file(
...     'EDE20251103.xlsx',
...     date_from='filename',
...     mode='incremental'
... )
>>> print(f"新增{result['new_rows']}行,跳过{result['skipped_rows']}行")
batch_process_excel_files(folder_path, config, file_pattern='*.csv', recursive=True, mode='insert')[source]

批量处理Excel文件并插入数据库(薄封装)

直接调用 integration.process_directory_tree

Parameters:
  • folder_path (str) – 文件夹路径

  • config (dict) – 处理配置

  • file_pattern (str) – 文件匹配模式

  • recursive (bool) – 是否递归搜索

  • mode (str) – 插入模式,’insert’或’incremental’

Returns:

处理统计字典

incremental_update(df, date_column='datetime', code_column='code', metric_column='metric')[source]

增量更新数据到数据库(薄封装)

直接调用 integration.incremental_insert

Parameters:
  • df (DataFrame) – 待更新的DataFrame

  • date_column (str) – 日期列名

  • code_column (str) – 代码列名

  • metric_column (str) – 指标列名

Returns:

(新增行数, 重复行数)

insert_with_conflict_check(df, date_column='datetime', code_column='code', metric_column='metric')[source]

插入数据时检测重复和冲突(批量查询优化版本)

  • 如果key(datetime, code, metric)相同但value不同:记录冲突,不插入

  • 如果key和value完全相同:跳过,不插入

  • 如果key不存在:插入新记录

Parameters:
  • df (DataFrame) – 待插入的DataFrame,需包含columns: datetime, code, name, metric, value

  • date_column (str) – 日期列名

  • code_column (str) – 代码列名

  • metric_column (str) – 指标列名

Returns:

(新增行数, 跳过行数, 冲突列表) 冲突列表格式:[{datetime, code, name, metric, db_value, new_value}, …]

update_data(df, date_column='datetime', code_column='code', metric_column='metric')[source]

使用SQL UPDATE批量更新数据

对于存在的记录(相同datetime, code, metric),更新value和name 对于不存在的记录,可选择插入(upsert模式)

Parameters:
  • df (DataFrame) – 待更新的DataFrame,需包含columns: datetime, code, name, metric, value

  • date_column (str) – 日期列名

  • code_column (str) – 代码列名

  • metric_column (str) – 指标列名

Returns:

更新行数

ingest_wind_daily_market(codes, start_date, end_date, fields=None, asset_type='stock', mode='incremental')[source]

从Wind获取日行情并插入数据库(薄封装)

组合流程: 1. wind_ingest.fetch_daily_market - 获取Wind数据 2. integration.incremental_insert/insert_dataframe - 插入

Parameters:
  • codes (list) – 代码列表

  • start_date (str) – 开始日期,格式’YYYY-MM-DD’

  • end_date (str) – 结束日期

  • fields (Optional[list]) – 字段列表,None使用默认字段

  • asset_type (str) – 资产类型,’stock’, ‘index’, ‘fund’, ‘bond’

  • mode (str) – 插入模式,’incremental’(默认)或’insert’

Return type:

dict

Returns:

统计信息字典

ingest_wind_daily_index(codes, start_date, end_date, fields=None, mode='incremental')[source]

Wind指数数据抓取(便捷封装)

Return type:

dict

Parameters:
ingest_wind_daily_fund(codes, start_date, end_date, fields=None, mode='incremental')[source]

Wind基金数据抓取(便捷封装)

Return type:

dict

Parameters:
ingest_wind_daily_bond(codes, start_date, end_date, fields=None, mode='incremental')[source]

Wind债券数据抓取(便捷封装)

Return type:

dict

Parameters:
run_query(conditions=None, params=None, select_columns='*')[source]

执行自定义SQL查询(薄封装)

替代原 query_data 方法,使用 query.build_query

Parameters:
  • conditions (Optional[list]) – SQL条件列表,如[‘datetime >= %s’, ‘code = %s’]

  • params (Optional[list]) – 参数列表

  • select_columns (str) – 要选择的列

Returns:

DataFrame

query_time_range(codes=None, start_date=None, end_date=None, metric=None, limit=None)[source]

查询指定时间范围的数据(薄封装)

直接调用 query.query_time_range

Parameters:
Returns:

DataFrame

query_nearest_after(params=None)[source]

根据输入时间戳序列查找每个时点之后最近的有效值(薄封装)

主要用于回测时提取价格 直接调用 query.query_nearest_after

Parameters:

params (dict) – 必须包含以下键: - codes: 代码列表 - datetimes: 目标时间戳列表(格式:’YYYY-MM-DD HH:MM’) - metric: 查询的指标名称 - time_tolerance: 允许的最大时间间隔(单位:小时,默认不限制)

Returns:

包含以下列:

code | input_ts | datetime | diff_hours | value | name

Return type:

DataFrame

query_nearest_in_range_after(params=None)[source]

在 (start, end) 区间内查找距 start 最近的有效值(薄封装)

Parameters:

params (dict) – 必须包含以下键: - codes: 代码列表 - ranges: [(start, end), …] 区间列表 - metric: 指标名 - time_tolerance: 锚点容差(小时,可选)

Returns:

code | input_ts(=start) | datetime | diff_hours | value | name

Return type:

DataFrame

query_nearest_in_range_before(params=None)[source]

在 (start, end) 区间内查找距 end 最近的有效值(薄封装)

Parameters:

params (dict) – 必须包含以下键: - codes: 代码列表 - ranges: [(start, end), …] 区间列表 - metric: 指标名 - time_tolerance: 锚点容差(小时,可选)

Returns:

code | input_ts(=end) | datetime | diff_hours | value | name

Return type:

DataFrame

query_nearest_before(params=None)[source]

根据输入时间戳序列查找每个时点之前最近的有效值(薄封装)

主要用于回测时提取历史价格特征 直接调用 query.query_nearest_before

Parameters:

params (dict) – 必须包含以下键: - codes: 代码列表 - datetimes: 目标时间戳列表(格式:’YYYY-MM-DD HH:MM’) - metric: 查询的指标名称 - time_tolerance: 允许的最大时间间隔(单位:小时,默认不限制)

Returns:

包含以下列:

code | input_ts | datetime | diff_hours | value | name

Return type:

DataFrame

get_latest_date(code=None, metric=None)[source]

获取数据库中的最新日期(薄封装)

直接调用 query.get_latest_date

Parameters:
  • code (Optional[str]) – 代码,None表示所有代码

  • metric (Optional[str]) – 指标,None表示所有指标

Returns:

最新日期

get_available_dates(code, metric, start_date=None, end_date=None)[source]

获取指定代码和指标的可用日期列表(薄封装)

直接调用 query.get_available_dates

Parameters:
Returns:

日期列表

validate_dataframe(df, validations)[source]

验证和修复DataFrame(薄封装)

直接调用 validation.validate_and_fix

Parameters:
  • df (DataFrame) – 待验证的DataFrame

  • validations (dict) – 验证配置

Returns:

(修复后的DataFrame, 验证报告)

check_excel_file(filepath, checks=None)[source]

检查Excel文件中的错误(薄封装)

直接调用 excel.check_excel_errors

Parameters:
Returns:

(是否通过, 错误列表)

truncate_table()[source]

清空表中所有数据

WARNING: 此操作不可逆,会删除表中所有记录

Returns:

删除的行数

close()[source]

关闭数据库连接

betalens.datafeed.core.get_absolute_trade_days(begin_date, end_date, period, use_pmc=True)[source]

获取交易日序列

Parameters:
  • begin_date – 开始日期,字符串格式

  • end_date – 结束日期,字符串格式

  • period – 周期,如’D’(日), ‘W’(周), ‘M’(月), ‘Q’(季), ‘S’(半年), ‘Y’(年)

  • use_pmc – 默认True,使用pandas_market_calendars(中国A股/北京时区);False时使用akshare

Returns:

交易日列表(datetime.datetime对象)

betalens.datafeed.core.trade_days_offset(begin_datetime, offset, period='D')[source]

交易日偏移计算

Parameters:
  • begin_datetime – 起始datetime对象

  • offset – 偏移量(整数)

  • period – 周期,默认’D’

Returns:

偏移后的datetime对象

excel

Excel文件处理工具模块(函数式) 功能: - 读取CSV/XLSX文件转为DataFrame - 将cross-section数据转换为数据库三列表格式 - 批量文件操作 - 文件夹分类和目录树生成 - Excel错误检查(错行、空值、异常值)

betalens.datafeed.excel.read_csv_with_encoding(filepath, logger=None, **kwargs)[source]

尝试使用多种编码读取CSV文件

Parameters:
  • filepath (Path) – CSV文件路径

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

  • **kwargs – 传递给pd.read_csv的额外参数

Return type:

DataFrame

Returns:

DataFrame

Raises:

Exception – 所有编码尝试失败

betalens.datafeed.excel.read_file(filepath, logger=None, **kwargs)[source]

读取CSV或XLSX文件为DataFrame 支持多种编码格式(UTF-8, GB2312, GBK, GB18030等)

Parameters:
  • filepath (Union[str, Path]) – 文件路径

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

  • **kwargs – 传递给pd.read_csv或pd.read_excel的额外参数

Return type:

DataFrame

Returns:

DataFrame

Raises:
betalens.datafeed.excel.cross_section_to_db_format(df, key_columns, value_columns, key_value_mapping, additional_fields=None, logger=None)[source]

将cross-section格式数据转换为数据库三列表格式

Parameters:
  • df (DataFrame) – 输入DataFrame

  • key_columns (List[str]) – 键列(如code, name等,保持不变的列)

  • value_columns (List[str]) – 值列(需要转换的列,如各个日期或指标)

  • key_value_mapping (Dict[str, str]) – 列名映射,如{‘variable’: ‘metric’, ‘value’: ‘value’}

  • additional_fields (Optional[Dict[str, any]]) – 额外添加的字段,如{‘datetime’: ‘2024-01-01’}

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

转换后的DataFrame,格式为:[key_columns, metric, value, …]

Example

输入:

code | name | 2024-01-01 | 2024-01-02 A001 | 股票A | 100 | 101

输出:

code | name | date | value A001 | 股票A | 2024-01-01 | 100 A001 | 股票A | 2024-01-02 | 101

betalens.datafeed.excel.batch_read_files(folder_path, file_pattern='*.csv', recursive=False, logger=None, **read_kwargs)[source]

批量读取文件夹中的文件

Parameters:
  • folder_path (Union[str, Path]) – 文件夹路径

  • file_pattern (str) – 文件匹配模式,如”.csv”, “.xlsx”

  • recursive (bool) – 是否递归搜索子文件夹

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

  • **read_kwargs – 传递给read_file的参数

Return type:

Dict[str, DataFrame]

Returns:

字典,键为文件路径,值为DataFrame

betalens.datafeed.excel.batch_write_files(data_dict, output_dir, file_format='csv', create_subdirs=True, logger=None, **write_kwargs)[source]

批量写入文件 CSV默认使用utf-8-sig编码,确保Excel能正确显示中文

Parameters:
  • data_dict (Dict[str, DataFrame]) – 字典,键为相对路径/文件名,值为DataFrame

  • output_dir (Union[str, Path]) – 输出根目录

  • file_format (str) – 输出格式,’csv’或’xlsx’

  • create_subdirs (bool) – 是否创建子目录

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

  • **write_kwargs – 传递给to_csv或to_excel的参数

Return type:

List[str]

Returns:

成功写入的文件路径列表

betalens.datafeed.excel.create_directory_tree(data_dict, output_dir, categorize_by=None, logger=None)[source]

创建目录树并分类保存文件

Parameters:
  • data_dict (Dict[str, DataFrame]) – 数据字典

  • output_dir (Union[str, Path]) – 输出目录

  • categorize_by (Optional[str]) – 分类依据,如’date’, ‘metric’等(DataFrame中的列名)

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Dict[str, List[str]]

Returns:

目录树字典,键为类别,值为文件路径列表

betalens.datafeed.excel.apply_time_alignment(df, date_column='日期', metric_column='variable', open_metric_names=None, open_time=None, other_time=None, inplace=False, logger=None)[source]

根据指标类型为日期列添加时间戳

开盘价在交易日09:30可用,其他价格/成交量在15:00收盘后可用。

Parameters:
  • df (DataFrame) – DataFrame

  • date_column (str) – 日期列名

  • metric_column (str) – 指标列名(用于判断是否为开盘价)

  • open_metric_names (Optional[set]) – 开盘价指标名称集合,默认从config.json读取

  • open_time (Optional[str]) – 开盘价对应的时间,默认从config.json读取

  • other_time (Optional[str]) – 其他指标对应的时间,默认从config.json读取

  • inplace (bool) – 是否原地修改

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

添加时间戳后的DataFrame

Example

>>> df = pd.DataFrame({
...     '日期': ['2024-01-01', '2024-01-01'],
...     'variable': ['开盘价(元)', '收盘价(元)'],
...     'value': [100, 101]
... })
>>> df = apply_time_alignment(df)
>>> df['日期']
0    2024-01-01 09:30:01
1    2024-01-01 15:00:01
betalens.datafeed.excel.check_excel_errors(df, checks=None, logger=None)[source]

检查Excel数据中的错误

Parameters:
  • df (DataFrame) – 待检查的DataFrame

  • checks (Optional[Dict[str, any]]) –

    检查配置字典,如: {

    ’check_empty_rows’: True, # 检查空行 ‘check_null_values’: True, # 检查空值 ‘check_duplicates’: [‘col1’, ‘col2’], # 检查重复(指定列) ‘check_data_types’: {‘col1’: ‘int’, ‘col2’: ‘float’}, # 检查数据类型 ‘check_value_range’: {‘col1’: (0, 100)}, # 检查值范围

    }

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Tuple[bool, List[Dict]]

Returns:

(是否通过检查, 错误列表)

validation

数据验证和异常检查工具模块(函数式) 功能: - 检查空值、NaN、None - 检查日期列的格式、重复、排序、频率 - 提供多种修复策略(替换、填充、删除、抛出错误)

class betalens.datafeed.validation.FillStrategy(value)[source]

Bases: Enum

填充策略枚举

RAISE_ERROR = 'raise_error'
DROP = 'drop'
FILL_FORWARD = 'ffill'
FILL_BACKWARD = 'bfill'
FILL_VALUE = 'fill_value'
FILL_MEAN = 'mean'
FILL_MEDIAN = 'median'
FILL_MODE = 'mode'
INTERPOLATE = 'interpolate'
betalens.datafeed.validation.check_null_values(df, columns=None, check_types=None, logger=None)[source]

检查空值、NaN、None

Parameters:
  • df (DataFrame) – 待检查的DataFrame

  • columns (Optional[List[str]]) – 要检查的列名列表,None表示检查所有列

  • check_types (Optional[List[str]]) – 检查类型列表,可选[‘null’, ‘nan’, ‘none’, ‘empty_string’]

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Dict[str, Any]

Returns:

检查结果字典

betalens.datafeed.validation.check_datetime_column(df, date_column, expected_freq=None, check_sorted=True, check_duplicates=True, check_format=True, logger=None)[source]

检查日期列的各种问题

Parameters:
  • df (DataFrame) – DataFrame

  • date_column (str) – 日期列名

  • expected_freq (Optional[str]) – 期望的频率,如’D’(日), ‘W’(周), ‘M’(月), ‘Q’(季度), ‘Y’(年)

  • check_sorted (bool) – 是否检查排序

  • check_duplicates (bool) – 是否检查重复

  • check_format (bool) – 是否检查格式

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Dict[str, Any]

Returns:

检查结果字典

betalens.datafeed.validation.fix_null_values(df, strategy, columns=None, fill_value=None, inplace=False, logger=None)[source]

修复空值

Parameters:
  • df (DataFrame) – DataFrame

  • strategy (Union[FillStrategy, str]) – 填充策略

  • columns (Optional[List[str]]) – 要处理的列,None表示所有列

  • fill_value (Optional[Any]) – 当strategy为FILL_VALUE时使用的填充值

  • inplace (bool) – 是否原地修改

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

修复后的DataFrame

betalens.datafeed.validation.drop_duplicates_strict(df, subset=None, keep='first', verify_all_fields=True, ignore_cols=None, inplace=False, logger=None)[source]

严格去重:确保只有完全相同的行才会被删除

Parameters:
  • df (DataFrame) – DataFrame

  • subset (Optional[List[str]]) – 用于判断重复的列,None表示所有列

  • keep (str) – ‘first’, ‘last’, False(删除所有重复)

  • verify_all_fields (bool) – 是否验证subset外的其他字段也相同

  • ignore_cols (Optional[List[str]]) – 验证时忽略的列(如索引、时间戳等)

  • inplace (bool) – 是否原地修改

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Tuple[DataFrame, Dict[str, Any]]

Returns:

(修复后的DataFrame, 去重报告)

betalens.datafeed.validation.fix_datetime_column(df, date_column, fix_format=True, fix_duplicates='keep_first', fix_sort=True, sort_order='ascending', dedupe_subset=None, verify_all_fields=True, inplace=False, logger=None)[source]

修复日期列的问题

Parameters:
  • df (DataFrame) – DataFrame

  • date_column (str) – 日期列名

  • fix_format (bool) – 是否修复格式(转换为datetime)

  • fix_duplicates (Optional[str]) – 如何处理重复,None表示不处理

  • fix_sort (bool) – 是否排序

  • sort_order (str) – 排序顺序,’ascending’或’descending’

  • dedupe_subset (Optional[List[str]]) – 去重时使用的列组合,None则使用[date_column] 推荐: [‘code’, ‘metric’, date_column] 避免误删不同metric的数据

  • verify_all_fields (bool) – 是否验证subset外的其他字段也相同(严格模式)

  • inplace (bool) – 是否原地修改

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

修复后的DataFrame

betalens.datafeed.validation.validate_and_fix(df, validations, inplace=False, logger=None)[source]

综合验证和修复

Parameters:
  • df (DataFrame) – DataFrame

  • validations (Dict[str, Dict]) –

    验证配置字典,格式如: {

    ’null_check’: {

    ‘columns’: [‘col1’, ‘col2’], ‘fix_strategy’: ‘ffill’

    }, ‘datetime_check’: {

    ’column’: ‘date’, ‘expected_freq’: ‘D’, ‘fix_format’: True, ‘fix_duplicates’: ‘keep_first’, ‘fix_sort’: True

    }

    }

  • inplace (bool) – 是否原地修改

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Tuple[DataFrame, Dict[str, Any]]

Returns:

(修复后的DataFrame, 验证报告)

class betalens.datafeed.validation.DataValidator(logger=None)[source]

Bases: object

DataValidator - 已弃用,请直接使用模块级函数

此类已弃用,方法不再实现。请使用模块级函数: - check_null_values() - check_datetime_column() - fix_null_values() - drop_duplicates_strict() - fix_datetime_column() - validate_and_fix()

Parameters:

logger (Logger | None)

__init__(logger=None)[source]
Parameters:

logger (Logger | None)

check_null_values(df, columns=None, check_types=None)[source]
check_datetime_column(df, date_column, expected_freq=None, check_sorted=True, check_duplicates=True, check_format=True)[source]
fix_null_values(df, strategy, columns=None, fill_value=None, inplace=False)[source]
drop_duplicates_strict(df, subset=None, keep='first', verify_all_fields=True, ignore_cols=None, inplace=False)[source]
fix_datetime_column(df, date_column, fix_format=True, fix_duplicates='keep_first', fix_sort=True, sort_order='ascending', dedupe_subset=None, verify_all_fields=True, inplace=False)[source]
validate_and_fix(df, validations, inplace=False)[source]

query

数据库查询工具模块(函数式) 功能: - 重构query_nearest_after和query_nearest_before - 解耦数据库查询逻辑 - 提供灵活的查询参数构建 - 支持时间点匹配和数据提取

betalens.datafeed.query.build_query(table_name, conditions=None, params=None, select_columns='*', order_by=None, limit=None)[source]

构建SQL查询

Parameters:
  • table_name (str) – 数据库表名

  • conditions (Optional[List[str]]) – 条件列表

  • params (Optional[List]) – 参数列表

  • select_columns (str) – 要选择的列

  • order_by (Optional[str]) – ORDER BY 子句(如 “datetime DESC”)

  • limit (Optional[int]) – 最大返回行数

Return type:

Tuple[str, List]

Returns:

(SQL语句, 参数列表)

betalens.datafeed.query.generate_input_pairs(codes, datetimes)[source]

生成(code, datetime)笛卡尔积

Parameters:
  • codes (List[str]) – 代码列表

  • datetimes (List[str]) – 时间戳列表

Return type:

List[Tuple[str, str]]

Returns:

(code, datetime)元组列表

betalens.datafeed.query.generate_input_range_pairs(codes, ranges)[source]

生成 (code, start_ts, end_ts) 笛卡尔积

Parameters:
Return type:

List[Tuple[str, str, str]]

Returns:

(code, start_ts, end_ts) 元组列表

betalens.datafeed.query.build_nearest_in_range_query(table_name, input_tuples, metric, direction='after', time_tolerance=None)[source]

构建区间内最近时点匹配查询

在每个 (code, start_ts, end_ts) 区间内,按方向查找距锚点最近的数据: - direction=’after’:锚点为 start_ts,区间过滤 t.datetime > start AND t.datetime < end - direction=’before’:锚点为 end_ts,区间过滤 t.datetime <= end AND t.datetime >= start

Parameters:
  • table_name (str) – 表名

  • input_tuples (List[Tuple[str, str, str]]) – (code, start_ts, end_ts) 元组列表

  • metric (str) – 指标名

  • direction (str) – 查询方向,’after’ 或 ‘before’

  • time_tolerance (Optional[float]) – 锚点容差(小时),与区间共同生效(取交集)

Return type:

Tuple[str, List]

Returns:

(SQL语句, 参数列表)

betalens.datafeed.query.build_nearest_query(table_name, input_tuples, metric, direction='after', time_tolerance=None)[source]

构建最近时点匹配查询

Parameters:
  • table_name (str) – 表名

  • input_tuples (List[Tuple[str, str]]) – (code, datetime)元组列表

  • metric (str) – 指标名

  • direction (str) – 查询方向,’after’(之后)或’before’(之前)

  • time_tolerance (Optional[float]) – 时间容差(小时)

Return type:

Tuple[str, List]

Returns:

(SQL语句, 参数列表)

betalens.datafeed.query.query_nearest_after(cursor, table_name, codes, datetimes, metric, time_tolerance=None, logger=None)[source]

查询每个时点之后最近的有效值

用途:主要用于回测时提取价格 时间结构:最新特征 <= 提数时点 < 调仓时点

Parameters:
  • cursor – 数据库游标

  • table_name (str) – 表名

  • codes (List[str]) – 代码列表

  • datetimes (List[str]) – 时间戳列表,格式’YYYY-MM-DD HH:MM:SS’

  • metric (str) – 查询的指标名称

  • time_tolerance (Optional[float]) – 允许的最大时间间隔(单位:小时)

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

DataFrame,包含列:
  • code: 代码

  • input_ts: 输入时间戳(提数时点)

  • datetime: 匹配到的数据时间戳

  • diff_hours: 时间差(小时)

  • value: 数据值

  • name: 名称

betalens.datafeed.query.query_nearest_before(cursor, table_name, codes, datetimes, metric, time_tolerance=None, logger=None)[source]

查询每个时点之前最近的有效值

用途:主要用于回测时提取历史价格特征 时间结构:调仓时点 <= 提数时点 < 最新特征时点

Parameters:
  • cursor – 数据库游标

  • table_name (str) – 表名

  • codes (List[str]) – 代码列表

  • datetimes (List[str]) – 时间戳列表,格式’YYYY-MM-DD HH:MM:SS’

  • metric (str) – 查询的指标名称

  • time_tolerance (Optional[float]) – 允许的最大时间间隔(单位:小时)

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

DataFrame,包含列:
  • code: 代码

  • input_ts: 输入时间戳(提数时点)

  • datetime: 匹配到的数据时间戳

  • diff_hours: 时间差(小时)

  • value: 数据值

  • name: 名称

betalens.datafeed.query.query_nearest_in_range_after(cursor, table_name, codes, ranges, metric, time_tolerance=None, logger=None)[source]

在每个 (start, end) 区间内查询距 start 最近的有效值(向后查)

时间结构:start <= t.datetime - epsilon, t.datetime < end,锚点 = start

Parameters:
  • cursor – 数据库游标

  • table_name (str) – 表名

  • codes (List[str]) – 代码列表

  • ranges (List[Tuple[str, str]]) – (start, end) 区间列表,时间格式 ‘YYYY-MM-DD HH:MM:SS’

  • metric (str) – 指标名

  • time_tolerance (Optional[float]) – 锚点容差(小时),与区间共同生效

  • logger (Optional[Logger]) – 日志记录器

Returns:

code, input_ts(=start), datetime, diff_hours, value, name

Return type:

DataFrame

betalens.datafeed.query.query_nearest_in_range_before(cursor, table_name, codes, ranges, metric, time_tolerance=None, logger=None)[source]

在每个 (start, end) 区间内查询距 end 最近的有效值(向前查)

时间结构:start <= t.datetime <= end,锚点 = end

Parameters:
  • cursor – 数据库游标

  • table_name (str) – 表名

  • codes (List[str]) – 代码列表

  • ranges (List[Tuple[str, str]]) – (start, end) 区间列表,时间格式 ‘YYYY-MM-DD HH:MM:SS’

  • metric (str) – 指标名

  • time_tolerance (Optional[float]) – 锚点容差(小时),与区间共同生效

  • logger (Optional[Logger]) – 日志记录器

Returns:

code, input_ts(=end), datetime, diff_hours, value, name

Return type:

DataFrame

betalens.datafeed.query.query_time_range(cursor, table_name, codes=None, start_date=None, end_date=None, metric=None, limit=None, logger=None)[source]

查询指定时间范围的数据

Parameters:
  • cursor – 数据库游标

  • table_name (str) – 表名

  • codes (Optional[List[str]]) – 代码列表,None表示所有代码

  • start_date (Optional[str]) – 开始日期

  • end_date (Optional[str]) – 结束日期

  • metric (Optional[str]) – 指标名称

  • limit (Optional[int]) – 最大返回行数,None表示不限制(按 datetime DESC 返回最新的 N 行)

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

DataFrame

betalens.datafeed.query.get_available_dates(cursor, table_name, code, metric, start_date=None, end_date=None, logger=None)[source]

获取指定代码和指标的可用日期列表

Parameters:
  • cursor – 数据库游标

  • table_name (str) – 表名

  • code (str) – 代码

  • metric (str) – 指标

  • start_date (Optional[str]) – 开始日期

  • end_date (Optional[str]) – 结束日期

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

List[datetime]

Returns:

日期列表

betalens.datafeed.query.get_latest_date(cursor, table_name, code=None, metric=None, logger=None)[source]

获取最新的数据日期

Parameters:
  • cursor – 数据库游标

  • table_name (str) – 表名

  • code (Optional[str]) – 代码,None表示所有代码

  • metric (Optional[str]) – 指标,None表示所有指标

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Optional[datetime]

Returns:

最新日期

betalens.datafeed.query.pivot_to_wide(df, index_cols, pivot_col, value_col)[source]

将长格式数据转换为宽格式

Parameters:
  • df (DataFrame) – 长格式DataFrame

  • index_cols (List[str]) – 索引列

  • pivot_col (str) – 用于pivot的列(将变为新列名)

  • value_col (str) – 值列

Return type:

DataFrame

Returns:

宽格式DataFrame

betalens.datafeed.query.align_to_dates(df, target_dates, date_column='datetime', method='ffill')[source]

将数据对齐到目标日期序列

Parameters:
  • df (DataFrame) – 输入DataFrame

  • target_dates (List[datetime]) – 目标日期列表

  • date_column (str) – 日期列名

  • method (str) – 填充方法,’ffill’或’bfill’

Return type:

DataFrame

Returns:

对齐后的DataFrame

betalens.datafeed.query.calculate_returns(df, price_column, periods=[1], group_by=None)[source]

计算收益率

Parameters:
  • df (DataFrame) – 包含价格数据的DataFrame

  • price_column (str) – 价格列名

  • periods (List[int]) – 计算周期列表

  • group_by (Optional[str]) – 分组列(如code)

Return type:

DataFrame

Returns:

添加了收益率列的DataFrame

integration

数据库-Excel交互功能模块(函数式) 功能: - 按照目录树结构读取和处理Excel文件 - 将处理后的数据插入数据库 - 增量更新功能(只插入新数据) - 错误检查和日志记录 - 保存错误数据和源文件

betalens.datafeed.integration.process_excel_to_db_format(filepath, config, logger=None)[source]

处理Excel文件为数据库格式

Parameters:
  • filepath (Union[str, Path]) – Excel文件路径

  • config (Dict[str, Any]) – 处理配置,包含: - key_columns: 键列列表 - value_columns: 值列列表(如果为None,则自动推断) - key_value_mapping: 列名映射字典 - additional_fields: 额外字段字典 - validation: 验证配置 - read_kwargs: 读取参数

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Tuple[Optional[DataFrame], List[Dict]]

Returns:

(处理后的DataFrame, 错误列表)

betalens.datafeed.integration.check_existing_rows(cursor, df, table, key_columns=None, logger=None)[source]

检查数据库中是否已存在相关数据行

Parameters:
  • cursor – 数据库游标

  • df (DataFrame) – 待检查的DataFrame

  • table (str) – 目标表名

  • key_columns (Optional[List[str]]) – 用于判断重复的关键列,默认使用[‘code’, ‘metric’, ‘datetime’]

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

包含已存在行的DataFrame(只包含key_columns列),如果没有则返回空DataFrame

betalens.datafeed.integration.insert_dataframe(cursor, conn, df, table, batch_size=1000, check_duplicates=True, key_columns=None, skip_duplicates=True, logger=None)[source]

将DataFrame插入数据库

Parameters:
  • cursor – 数据库游标

  • conn – 数据库连接

  • df (DataFrame) – 待插入的DataFrame

  • table (str) – 目标表名

  • batch_size (int) – 批量插入大小

  • check_duplicates (bool) – 是否检查数据库中已存在的数据(默认True)

  • key_columns (Optional[List[str]]) – 用于判断重复的关键列,默认使用[‘code’, ‘metric’, ‘datetime’]

  • skip_duplicates (bool) – 是否跳过重复数据(默认True),如果为False,重复数据会导致插入失败

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Tuple[bool, str, Dict[str, Any]]

Returns:

(是否成功, 消息, 统计信息字典)

betalens.datafeed.integration.get_existing_dates(cursor, table, code, metric, logger=None)[source]

获取数据库中已存在的日期

Parameters:
  • cursor – 数据库游标

  • table (str) – 表名

  • code (str) – 代码

  • metric (str) – 指标

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

List[datetime]

Returns:

日期列表

betalens.datafeed.integration.incremental_insert(cursor, conn, df, table, date_column='datetime', code_column='code', metric_column='metric', logger=None)[source]

增量插入:只插入数据库中不存在的数据

Parameters:
  • cursor – 数据库游标

  • conn – 数据库连接

  • df (DataFrame) – 待插入的DataFrame

  • table (str) – 目标表名

  • date_column (str) – 日期列名

  • code_column (str) – 代码列名

  • metric_column (str) – 指标列名

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Tuple[int, int]

Returns:

(新增行数, 重复行数)

betalens.datafeed.integration.save_error_file(filepath, df, errors, error_dir='./errors', error_subdir='failed_files', logger=None)[source]

保存错误文件和日志

Parameters:
  • filepath (Union[str, Path]) – 原始文件路径

  • df (Optional[DataFrame]) – 处理后的DataFrame(可能为None)

  • errors (List[Dict]) – 错误列表

  • error_dir (Union[str, Path]) – 错误文件保存目录

  • error_subdir (str) – 错误子目录

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

str

Returns:

错误记录ID

betalens.datafeed.integration.process_directory_tree(cursor, conn, root_dir, table, config, file_pattern='*.csv', recursive=True, mode='insert', error_dir='./errors', logger=None)[source]

按照目录树结构处理和插入Excel文件

Parameters:
  • cursor – 数据库游标

  • conn – 数据库连接

  • root_dir (Union[str, Path]) – 根目录

  • table (str) – 目标表名

  • config (Dict[str, Any]) – 处理配置

  • file_pattern (str) – 文件匹配模式

  • recursive (bool) – 是否递归搜索

  • mode (str) – 插入模式,’insert’(直接插入)或’incremental’(增量插入)

  • error_dir (Union[str, Path]) – 错误文件保存目录

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Dict[str, Any]

Returns:

处理统计字典

universe

指数历史股票池查询模块(函数式)

设计要点

指数成分股池是 point-in-time 的”状态”数据:某指数从某调整日起拥有一组成分股, 直到下次调整。每个生效日存为一行,约定:

  • code : 指数代码(如 ‘000906.SH’)

  • name : 指数名称(如 ‘中证800’)

  • metric : 固定为 ‘universe’,标识成分股池

  • value : 成分股数量(便于校验)

  • remark : JSONB,约定 {“index_code”, “index_name”, “constituents”: […]}

  • datetime: 该股票池的生效时点(最早可知日)

查询语义 = 取 datetime <= 查询日 的最近一条,与 query.query_nearest_before 同构, 天然避免前视偏差。由于成分股列表存在 remark(JSONB),而 query_nearest_before 只 返回 value/name,故本模块用它先”定位”最近生效日,再补一条小查询取出 remark。

主要接口

get_index_universe : 返回某指数某日生效的成分股代码列表 get_index_universe_date : 返回某指数某日实际生效的快照日期(便于排查)

betalens.datafeed.universe.get_index_universe_date(cursor, index_code, date, table_name='index_universe', metric='universe', logger=None)[source]

返回某指数在某日实际生效的快照日期(point-in-time,取 datetime<=date 的最近一条)。

复用 query.query_nearest_before 定位最近生效日。

Parameters:
  • cursor – 数据库游标(建议 RealDictCursor)

  • index_code (str) – 指数代码,如 ‘000906.SH’

  • date (str) – 查询日期,’YYYY-MM-DD’ 或 ‘YYYY-MM-DD HH:MM:SS’

  • table_name (str) – 表名,默认 ‘index_universe’

  • metric (str) – 指标名,默认 ‘universe’

  • logger (Optional[Logger]) – 日志器

Returns:

生效快照的 datetime(pandas.Timestamp);该日前无可用股票池则返回 None

betalens.datafeed.universe.get_index_universe(cursor, index_code, date, table_name='index_universe', metric='universe', logger=None)[source]

返回 index_code 在 date 当日生效的成分股代码列表(point-in-time)。

步骤:用 query.query_nearest_before 找到 <=date 的最近生效快照日,再取该行 remark 中的 constituents 列表。该日前无可用股票池则返回空列表。

Parameters:
  • cursor – 数据库游标(建议 RealDictCursor)

  • index_code (str) – 指数代码,如 ‘000906.SH’

  • date (str) – 查询日期,’YYYY-MM-DD’ 或 ‘YYYY-MM-DD HH:MM:SS’

  • table_name (str) – 表名,默认 ‘index_universe’

  • metric (str) – 指标名,默认 ‘universe’

  • logger (Optional[Logger]) – 日志器

Return type:

List[str]

Returns:

成分股代码列表(如 [‘000001.SZ’, …]);无可用股票池则返回 []

config

配置管理模块 功能: - 加载和管理datafeed模块的配置参数 - 支持从config.json文件读取配置 - 支持运行时动态修改配置 - 提供配置验证和默认值

class betalens.datafeed.config.ConfigManager(config_file=None)[source]

Bases: object

配置管理器

Parameters:

config_file (str | None)

__init__(config_file=None)[source]

初始化配置管理器

Parameters:

config_file (Optional[str]) – 配置文件路径,默认为当前模块目录下的config.json

load()[source]

从文件加载配置

如果文件不存在或加载失败,使用默认配置

Return type:

None

save(config_file=None)[source]

保存当前配置到文件

Parameters:

config_file (Optional[str]) – 配置文件路径,默认使用初始化时的路径

Return type:

None

get(key_path, default=None)[source]

获取配置值

Parameters:
  • key_path (str) – 配置键路径,使用点号分隔,如 ‘database.dbname’

  • default (Optional[Any]) – 默认值,如果键不存在则返回此值

Return type:

Any

Returns:

配置值

Example

>>> config = ConfigManager()
>>> config.get('database.dbname')
'datafeed'
>>> config.get('database.port')
'5432'
set(key_path, value)[source]

设置配置值

Parameters:
  • key_path (str) – 配置键路径,使用点号分隔,如 ‘database.dbname’

  • value (Any) – 配置值

Return type:

None

Example

>>> config = ConfigManager()
>>> config.set('database.dbname', 'my_database')
>>> config.get('database.dbname')
'my_database'
get_section(section)[source]

获取配置节

Parameters:

section (str) – 配置节名称,如 ‘database’, ‘excel’

Return type:

Dict[str, Any]

Returns:

配置节字典

property config: Dict[str, Any]

获取完整配置字典

__getitem__(key)[source]

支持字典式访问

Return type:

Any

Parameters:

key (str)

__setitem__(key, value)[source]

支持字典式设置

Return type:

None

Parameters:
betalens.datafeed.config.get_config(config_file=None)[source]

获取全局配置实例

Parameters:

config_file (Optional[str]) – 配置文件路径,仅在首次调用时有效

Return type:

ConfigManager

Returns:

ConfigManager实例

betalens.datafeed.config.reset_config()[source]

重置全局配置实例

Return type:

None

betalens.datafeed.config.get_database_config()[source]

获取数据库配置

Return type:

Dict[str, str]

betalens.datafeed.config.get_logging_config()[source]

获取日志配置

Return type:

Dict[str, str]

betalens.datafeed.config.get_excel_config()[source]

获取Excel配置

Return type:

Dict[str, Any]

betalens.datafeed.config.get_wind_config()[source]

获取Wind配置

Return type:

Dict[str, Any]

betalens.datafeed.config.get_ede_config()[source]

获取EDE配置

Return type:

Dict[str, Any]

wind_ingest

Wind数据抓取模块(函数式) 功能: - 从WindPy获取日行情数据 - 转换为数据库标准格式 - 支持多种资产类型(股票、指数、基金、债券)

betalens.datafeed.wind_ingest.fetch_daily_market(codes, start_date, end_date, fields=None, asset_type='stock', apply_time_stamps=True, logger=None)[source]

从Wind获取日行情数据并转换为数据库格式

Parameters:
  • codes (List[str]) – 代码列表,如[‘000001.SZ’, ‘000002.SZ’]

  • start_date (str) – 开始日期,格式’YYYY-MM-DD’

  • end_date (str) – 结束日期,格式’YYYY-MM-DD’

  • fields (Optional[List[str]]) – 字段列表,如[‘open’, ‘high’, ‘low’, ‘close’, ‘volume’] 默认为None,使用预设字段

  • asset_type (str) – 资产类型,’stock’(股票), ‘index’(指数), ‘fund’(基金), ‘bond’(债券)

  • apply_time_stamps (bool) – 是否应用交易时间戳(开盘价09:30,其他15:00)

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

DataFrame,格式为:

datetime | code | name | metric | value

Example

>>> df = fetch_daily_market(
...     codes=['000001.SZ'],
...     start_date='2024-01-01',
...     end_date='2024-01-31'
... )
>>> df.head()
   datetime            code   name      metric  value
0  2024-01-02 09:30:01  000001.SZ  平安银行  开盘价(元)   10.50
1  2024-01-02 15:00:01  000001.SZ  平安银行  收盘价(元)   10.55
betalens.datafeed.wind_ingest.fetch_daily_index(codes, start_date, end_date, fields=None, apply_time_stamps=True, logger=None)[source]

获取指数日行情数据

这是fetch_daily_market的便捷封装,asset_type=’index’

Parameters:
  • codes (List[str]) – 指数代码列表,如[‘000001.SH’, ‘399001.SZ’]

  • start_date (str) – 开始日期

  • end_date (str) – 结束日期

  • fields (Optional[List[str]]) – 字段列表,默认为None

  • apply_time_stamps (bool) – 是否应用交易时间戳

  • logger (Optional[Logger]) – 日志记录器

Return type:

DataFrame

Returns:

DataFrame

betalens.datafeed.wind_ingest.fetch_daily_fund(codes, start_date, end_date, fields=None, apply_time_stamps=True, logger=None)[source]

获取基金日行情数据

这是fetch_daily_market的便捷封装,asset_type=’fund’

Parameters:
  • codes (List[str]) – 基金代码列表

  • start_date (str) – 开始日期

  • end_date (str) – 结束日期

  • fields (Optional[List[str]]) – 字段列表,默认为None

  • apply_time_stamps (bool) – 是否应用交易时间戳

  • logger (Optional[Logger]) – 日志记录器

Return type:

DataFrame

Returns:

DataFrame

betalens.datafeed.wind_ingest.fetch_daily_bond(codes, start_date, end_date, fields=None, apply_time_stamps=True, logger=None)[source]

获取债券日行情数据

这是fetch_daily_market的便捷封装,asset_type=’bond’

Parameters:
  • codes (List[str]) – 债券代码列表

  • start_date (str) – 开始日期

  • end_date (str) – 结束日期

  • fields (Optional[List[str]]) – 字段列表,默认为None

  • apply_time_stamps (bool) – 是否应用交易时间戳

  • logger (Optional[Logger]) – 日志记录器

Return type:

DataFrame

Returns:

DataFrame

ede_processor

EDE格式Excel文件处理工具模块(函数式) 功能: - 处理特定的EDE格式Excel文件(来自Wind等数据源) - 识别证券代码、名称和指标列 - 从列名中提取metric和元数据(日期、单位等) - 转换为数据库标准格式并插入

betalens.datafeed.ede_processor.extract_date_from_filename(filepath, pattern=None, default_time=None, logger=None)[source]

从文件名中提取日期

Parameters:
  • filepath (Union[str, Path]) – 文件路径

  • pattern (Optional[str]) – 日期匹配正则表达式,默认从config.json读取

  • default_time (Optional[str]) – 默认时间,默认从config.json读取

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Returns:

MM:SS),如果未找到则返回None

Return type:

日期时间字符串(格式:YYYY-MM-DD HH

betalens.datafeed.ede_processor.parse_metric_column(column_name, logger=None)[source]

解析EDE格式的指标列名,提取metric名称和元数据

EDE格式示例:

“流通A股 [交易日期] 最新 [单位] 股” “流通市值 [交易日期] 最新 [单位] 万元”

Parameters:
  • column_name (str) – 列名

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Tuple[str, Dict[str, str]]

Returns:

(metric名称, 元数据字典)

Example

>>> parse_metric_column("流通A股 [交易日期] 最新 [单位] 股")
('流通A股', {'日期说明': '交易日期', '值类型': '最新', '单位说明': '单位', '单位': '股'})
betalens.datafeed.ede_processor.extract_date_from_metric_metadata(metadata, column_name, default_time=None, logger=None)[source]

从metric元数据中提取日期

在某些情况下,列名中可能包含具体日期,如:

“流通A股 [20251103] 最新 [单位] 股”

Parameters:
  • metadata (Dict[str, str]) – metric元数据字典

  • column_name (str) – 原始列名

  • default_time (Optional[str]) – 默认时间,默认从config.json读取

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Optional[str]

Returns:

日期时间字符串,如果未找到则返回None

betalens.datafeed.ede_processor.clean_ede_dataframe(df, logger=None)[source]

清理EDE格式的DataFrame

操作: 1. 删除完全空白的行 2. 删除包含”数据来源”等无关字符串的行 3. 删除完全空白的列

Parameters:
  • df (DataFrame) – 原始DataFrame

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

DataFrame

Returns:

清理后的DataFrame

betalens.datafeed.ede_processor.identify_code_name_columns(df, code_column_names=None, name_column_names=None, logger=None)[source]

识别DataFrame中的代码列和名称列

Parameters:
  • df (DataFrame) – DataFrame

  • code_column_names (Optional[List[str]]) – 可能的代码列名列表,默认从config.json读取

  • name_column_names (Optional[List[str]]) – 可能的名称列名列表,默认从config.json读取

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Tuple[Optional[str], Optional[str]]

Returns:

(代码列名, 名称列名),如果未找到则返回None

betalens.datafeed.ede_processor.process_ede_file(filepath, date_from='filename', default_datetime=None, code_column_names=None, name_column_names=None, logger=None)[source]

处理EDE格式的Excel文件并转换为数据库格式

EDE格式特征:
  • 第一列:证券代码

  • 第二列:证券简称

  • 第三列及之后:指标列,格式为”指标名 [元数据] 值类型 [元数据] 单位”

Parameters:
  • filepath (Union[str, Path]) – 文件路径

  • date_from (str) – 日期来源,’filename’(从文件名提取)或’metric’(从列名提取)

  • default_datetime (Optional[str]) – 默认日期时间(当无法从文件名或列名提取时使用)

  • code_column_names (Optional[List[str]]) – 可能的代码列名列表

  • name_column_names (Optional[List[str]]) – 可能的名称列名列表

  • logger (Optional[Logger]) – 日志记录器,如果为None则使用默认logger

Return type:

Tuple[Optional[DataFrame], List[Dict]]

Returns:

(处理后的DataFrame, 错误列表)

处理后的DataFrame格式:
  • code: 证券代码

  • name: 证券简称

  • metric: 指标名称

  • value: 数值

  • datetime: 日期时间

  • note: 元数据(JSON格式)