Datafeed API
Datafeed工具包 包含Excel处理、数据验证、数据库查询和集成等功能模块
子模块: - core: 核心Datafeed类及辅助函数 - excel: Excel文件处理工具 - validation: 数据验证和异常检查工具 - query: 数据库查询功能 - integration: 数据库与Excel交互功能
- class betalens.datafeed.Datafeed(table_name, db_config=None, log_dir=None)[source]
Bases:
object数据管理主类(薄封装层)
职责: - 管理数据库连接 - 提供统一的日志记录 - 组合子模块功能为高层API - 不包含业务逻辑(所有逻辑在子模块中)
- 使用示例:
# 创建实例(使用默认配置) df = Datafeed(“daily_market_data”)
# 创建实例(自定义数据库配置) df = Datafeed(
table_name=”daily_market_data”, db_config={
‘dbname’: ‘my_database’, ‘user’: ‘my_user’, ‘password’: ‘my_password’, ‘host’: ‘localhost’, ‘port’: ‘5432’
}
)
# 单文件插入 df.insert_csv_file(“data.csv”, config={…})
# Wind数据抓取 df.ingest_wind_daily_market(
codes=[‘000001.SZ’], start_date=’2024-01-01’, end_date=’2024-01-31’
)
# 查询 data = df.query_time_range(
codes=[‘000001.SZ’], start_date=’2024-01-01’, metric=’收盘价(元)’
)
# 关闭 df.close()
- __init__(table_name, db_config=None, log_dir=None)[source]
初始化Datafeed实例
- Parameters:
table_name – 数据库表名
db_config – 数据库配置字典,包含以下键: - dbname: 数据库名 - user: 用户名 - password: 密码 - host: 主机地址 - port: 端口 如果为None,使用config.json中的配置
log_dir – 日志目录,如果为None,使用config.json中的配置
- insert_csv_file(filepath, config, mode='incremental')[source]
单文件CSV插入(薄封装)
组合流程: 1. excel.read_file - 读取文件 2. excel.cross_section_to_db_format - 转换格式(如果需要) 3. validation.validate_and_fix - 验证和修复(如果配置) 4. integration.insert_dataframe/incremental_insert - 插入
- Parameters:
- Return type:
- Returns:
统计信息字典
- insert_ede_file(filepath, date_from='filename', default_datetime=None, mode='incremental')[source]
处理并插入EDE格式的Excel文件(薄封装)
EDE格式特征: - 第一列:证券代码 -第二列:证券简称 - 第三列及之后:指标列,格式为”指标名 [元数据] 值类型 [元数据] 单位”
- 示例EDE格式:
证券代码 证券简称 流通A股 [交易日期] 最新 [单位] 股 002460.SZ 赣锋锂业 1,211,379,763.0000 1772.HK 赣锋锂业 1,211,379,763.0000
处理流程: 1. 读取Excel文件 2. 清理数据(去除空值、”数据来源:Wind”等) 3. 识别code、name列 4. 解析metric列,提取指标名称和元数据 5. 构建日期列(从文件名或列名中提取) 6. 转换为数据库格式 7. 插入数据库
- Parameters:
filepath (
str) – EDE格式Excel文件路径date_from (
str) – 日期来源,可选值: - ‘filename’: 从文件名提取日期(如EDE20251103.xlsx -> 2025-11-03 15:30:00) - ‘metric’: 从列名中的[日期]部分提取default_datetime (
Optional[str]) – 默认日期时间(当无法从文件名或列名提取时使用) 格式:’YYYY-MM-DD HH:MM:SS’,如’2025-11-03 15:30:00’mode (
str) – 插入模式 - ‘incremental’: 增量插入(默认),只插入新数据 - ‘insert’: 直接插入,会检查重复并跳过
- Return type:
- Returns:
- 统计信息字典,包含:
success: 是否成功
new_rows: 新增行数
skipped_rows: 跳过行数
errors: 错误列表(如果有)
Example
>>> df = Datafeed('daily_market_data') >>> result = df.insert_ede_file( ... 'EDE20251103.xlsx', ... date_from='filename', ... mode='incremental' ... ) >>> print(f"新增{result['new_rows']}行,跳过{result['skipped_rows']}行")
- batch_process_excel_files(folder_path, config, file_pattern='*.csv', recursive=True, mode='insert')[source]
批量处理Excel文件并插入数据库(薄封装)
直接调用 integration.process_directory_tree
- incremental_update(df, date_column='datetime', code_column='code', metric_column='metric')[source]
增量更新数据到数据库(薄封装)
直接调用 integration.incremental_insert
- insert_with_conflict_check(df, date_column='datetime', code_column='code', metric_column='metric')[source]
插入数据时检测重复和冲突(批量查询优化版本)
如果key(datetime, code, metric)相同但value不同:记录冲突,不插入
如果key和value完全相同:跳过,不插入
如果key不存在:插入新记录
- update_data(df, date_column='datetime', code_column='code', metric_column='metric')[source]
使用SQL UPDATE批量更新数据
对于存在的记录(相同datetime, code, metric),更新value和name 对于不存在的记录,可选择插入(upsert模式)
- ingest_wind_daily_market(codes, start_date, end_date, fields=None, asset_type='stock', mode='incremental')[source]
从Wind获取日行情并插入数据库(薄封装)
组合流程: 1. wind_ingest.fetch_daily_market - 获取Wind数据 2. integration.incremental_insert/insert_dataframe - 插入
- ingest_wind_daily_index(codes, start_date, end_date, fields=None, mode='incremental')[source]
Wind指数数据抓取(便捷封装)
- ingest_wind_daily_fund(codes, start_date, end_date, fields=None, mode='incremental')[source]
Wind基金数据抓取(便捷封装)
- ingest_wind_daily_bond(codes, start_date, end_date, fields=None, mode='incremental')[source]
Wind债券数据抓取(便捷封装)
- run_query(conditions=None, params=None, select_columns='*')[source]
执行自定义SQL查询(薄封装)
替代原 query_data 方法,使用 query.build_query
- query_time_range(codes=None, start_date=None, end_date=None, metric=None, limit=None)[source]
查询指定时间范围的数据(薄封装)
直接调用 query.query_time_range
- query_nearest_after(params=None)[source]
根据输入时间戳序列查找每个时点之后最近的有效值(薄封装)
主要用于回测时提取价格 直接调用 query.query_nearest_after
- Parameters:
params (dict) – 必须包含以下键: - codes: 代码列表 - datetimes: 目标时间戳列表(格式:’YYYY-MM-DD HH:MM’) - metric: 查询的指标名称 - time_tolerance: 允许的最大时间间隔(单位:小时,默认不限制)
- Returns:
- 包含以下列:
code | input_ts | datetime | diff_hours | value | name
- Return type:
DataFrame
- query_nearest_in_range_after(params=None)[source]
在 (start, end) 区间内查找距 start 最近的有效值(薄封装)
- Parameters:
params (dict) – 必须包含以下键: - codes: 代码列表 - ranges: [(start, end), …] 区间列表 - metric: 指标名 - time_tolerance: 锚点容差(小时,可选)
- Returns:
code | input_ts(=start) | datetime | diff_hours | value | name
- Return type:
DataFrame
- query_nearest_in_range_before(params=None)[source]
在 (start, end) 区间内查找距 end 最近的有效值(薄封装)
- Parameters:
params (dict) – 必须包含以下键: - codes: 代码列表 - ranges: [(start, end), …] 区间列表 - metric: 指标名 - time_tolerance: 锚点容差(小时,可选)
- Returns:
code | input_ts(=end) | datetime | diff_hours | value | name
- Return type:
DataFrame
- query_nearest_before(params=None)[source]
根据输入时间戳序列查找每个时点之前最近的有效值(薄封装)
主要用于回测时提取历史价格特征 直接调用 query.query_nearest_before
- Parameters:
params (dict) – 必须包含以下键: - codes: 代码列表 - datetimes: 目标时间戳列表(格式:’YYYY-MM-DD HH:MM’) - metric: 查询的指标名称 - time_tolerance: 允许的最大时间间隔(单位:小时,默认不限制)
- Returns:
- 包含以下列:
code | input_ts | datetime | diff_hours | value | name
- Return type:
DataFrame
- get_available_dates(code, metric, start_date=None, end_date=None)[source]
获取指定代码和指标的可用日期列表(薄封装)
直接调用 query.get_available_dates
- validate_dataframe(df, validations)[source]
验证和修复DataFrame(薄封装)
直接调用 validation.validate_and_fix
- Parameters:
df (
DataFrame) – 待验证的DataFramevalidations (
dict) – 验证配置
- Returns:
(修复后的DataFrame, 验证报告)
- betalens.datafeed.read_file(filepath, logger=None, **kwargs)[source]
读取CSV或XLSX文件为DataFrame 支持多种编码格式(UTF-8, GB2312, GBK, GB18030等)
- Parameters:
- Return type:
DataFrame- Returns:
DataFrame
- Raises:
ValueError – 不支持的文件格式
FileNotFoundError – 文件不存在
- betalens.datafeed.cross_section_to_db_format(df, key_columns, value_columns, key_value_mapping, additional_fields=None, logger=None)[source]
将cross-section格式数据转换为数据库三列表格式
- Parameters:
- Return type:
DataFrame- Returns:
转换后的DataFrame,格式为:[key_columns, metric, value, …]
Example
- 输入:
code | name | 2024-01-01 | 2024-01-02 A001 | 股票A | 100 | 101
- 输出:
code | name | date | value A001 | 股票A | 2024-01-01 | 100 A001 | 股票A | 2024-01-02 | 101
- betalens.datafeed.batch_read_files(folder_path, file_pattern='*.csv', recursive=False, logger=None, **read_kwargs)[source]
批量读取文件夹中的文件
- betalens.datafeed.batch_write_files(data_dict, output_dir, file_format='csv', create_subdirs=True, logger=None, **write_kwargs)[source]
批量写入文件 CSV默认使用utf-8-sig编码,确保Excel能正确显示中文
- Parameters:
- Return type:
- Returns:
成功写入的文件路径列表
- betalens.datafeed.create_directory_tree(data_dict, output_dir, categorize_by=None, logger=None)[source]
创建目录树并分类保存文件
- betalens.datafeed.check_excel_errors(df, checks=None, logger=None)[source]
检查Excel数据中的错误
- Parameters:
df (
DataFrame) – 待检查的DataFramechecks (
Optional[Dict[str,any]]) –检查配置字典,如: {
’check_empty_rows’: True, # 检查空行 ‘check_null_values’: True, # 检查空值 ‘check_duplicates’: [‘col1’, ‘col2’], # 检查重复(指定列) ‘check_data_types’: {‘col1’: ‘int’, ‘col2’: ‘float’}, # 检查数据类型 ‘check_value_range’: {‘col1’: (0, 100)}, # 检查值范围
}
- Return type:
- Returns:
(是否通过检查, 错误列表)
- class betalens.datafeed.FillStrategy(value)[source]
Bases:
Enum填充策略枚举
- RAISE_ERROR = 'raise_error'
- DROP = 'drop'
- FILL_FORWARD = 'ffill'
- FILL_BACKWARD = 'bfill'
- FILL_VALUE = 'fill_value'
- FILL_MEAN = 'mean'
- FILL_MEDIAN = 'median'
- FILL_MODE = 'mode'
- INTERPOLATE = 'interpolate'
- betalens.datafeed.check_null_values(df, columns=None, check_types=None, logger=None)[source]
检查空值、NaN、None
- betalens.datafeed.check_datetime_column(df, date_column, expected_freq=None, check_sorted=True, check_duplicates=True, check_format=True, logger=None)[source]
检查日期列的各种问题
- Parameters:
- Return type:
- Returns:
检查结果字典
- betalens.datafeed.fix_null_values(df, strategy, columns=None, fill_value=None, inplace=False, logger=None)[source]
修复空值
- Parameters:
- Return type:
DataFrame- Returns:
修复后的DataFrame
- betalens.datafeed.drop_duplicates_strict(df, subset=None, keep='first', verify_all_fields=True, ignore_cols=None, inplace=False, logger=None)[source]
严格去重:确保只有完全相同的行才会被删除
- Parameters:
- Return type:
- Returns:
(修复后的DataFrame, 去重报告)
- betalens.datafeed.fix_datetime_column(df, date_column, fix_format=True, fix_duplicates='keep_first', fix_sort=True, sort_order='ascending', dedupe_subset=None, verify_all_fields=True, inplace=False, logger=None)[source]
修复日期列的问题
- Parameters:
df (
DataFrame) – DataFramedate_column (
str) – 日期列名fix_format (
bool) – 是否修复格式(转换为datetime)fix_sort (
bool) – 是否排序sort_order (
str) – 排序顺序,’ascending’或’descending’dedupe_subset (
Optional[List[str]]) – 去重时使用的列组合,None则使用[date_column] 推荐: [‘code’, ‘metric’, date_column] 避免误删不同metric的数据verify_all_fields (
bool) – 是否验证subset外的其他字段也相同(严格模式)inplace (
bool) – 是否原地修改
- Return type:
DataFrame- Returns:
修复后的DataFrame
- betalens.datafeed.validate_and_fix(df, validations, inplace=False, logger=None)[source]
综合验证和修复
- Parameters:
df (
DataFrame) – DataFramevalidations (
Dict[str,Dict]) –验证配置字典,格式如: {
- ’null_check’: {
‘columns’: [‘col1’, ‘col2’], ‘fix_strategy’: ‘ffill’
}, ‘datetime_check’: {
’column’: ‘date’, ‘expected_freq’: ‘D’, ‘fix_format’: True, ‘fix_duplicates’: ‘keep_first’, ‘fix_sort’: True
}
}
inplace (
bool) – 是否原地修改
- Return type:
- Returns:
(修复后的DataFrame, 验证报告)
- class betalens.datafeed.DataValidator(logger=None)[source]
Bases:
objectDataValidator - 已弃用,请直接使用模块级函数
此类已弃用,方法不再实现。请使用模块级函数: - check_null_values() - check_datetime_column() - fix_null_values() - drop_duplicates_strict() - fix_datetime_column() - validate_and_fix()
- Parameters:
logger (Logger | None)
- check_datetime_column(df, date_column, expected_freq=None, check_sorted=True, check_duplicates=True, check_format=True)[source]
- drop_duplicates_strict(df, subset=None, keep='first', verify_all_fields=True, ignore_cols=None, inplace=False)[source]
- betalens.datafeed.build_query(table_name, conditions=None, params=None, select_columns='*', order_by=None, limit=None)[source]
构建SQL查询
- betalens.datafeed.generate_input_range_pairs(codes, ranges)[source]
生成 (code, start_ts, end_ts) 笛卡尔积
- betalens.datafeed.build_nearest_query(table_name, input_tuples, metric, direction='after', time_tolerance=None)[source]
构建最近时点匹配查询
- betalens.datafeed.build_nearest_in_range_query(table_name, input_tuples, metric, direction='after', time_tolerance=None)[source]
构建区间内最近时点匹配查询
在每个 (code, start_ts, end_ts) 区间内,按方向查找距锚点最近的数据: - direction=’after’:锚点为 start_ts,区间过滤 t.datetime > start AND t.datetime < end - direction=’before’:锚点为 end_ts,区间过滤 t.datetime <= end AND t.datetime >= start
- betalens.datafeed.query_nearest_after(cursor, table_name, codes, datetimes, metric, time_tolerance=None, logger=None)[source]
查询每个时点之后最近的有效值
用途:主要用于回测时提取价格 时间结构:最新特征 <= 提数时点 < 调仓时点
- Parameters:
- Return type:
DataFrame- Returns:
- DataFrame,包含列:
code: 代码
input_ts: 输入时间戳(提数时点)
datetime: 匹配到的数据时间戳
diff_hours: 时间差(小时)
value: 数据值
name: 名称
- betalens.datafeed.query_nearest_before(cursor, table_name, codes, datetimes, metric, time_tolerance=None, logger=None)[source]
查询每个时点之前最近的有效值
用途:主要用于回测时提取历史价格特征 时间结构:调仓时点 <= 提数时点 < 最新特征时点
- Parameters:
- Return type:
DataFrame- Returns:
- DataFrame,包含列:
code: 代码
input_ts: 输入时间戳(提数时点)
datetime: 匹配到的数据时间戳
diff_hours: 时间差(小时)
value: 数据值
name: 名称
- betalens.datafeed.query_nearest_in_range_after(cursor, table_name, codes, ranges, metric, time_tolerance=None, logger=None)[source]
在每个 (start, end) 区间内查询距 start 最近的有效值(向后查)
时间结构:start <= t.datetime - epsilon, t.datetime < end,锚点 = start
- Parameters:
- Returns:
code, input_ts(=start), datetime, diff_hours, value, name
- Return type:
DataFrame
- betalens.datafeed.query_nearest_in_range_before(cursor, table_name, codes, ranges, metric, time_tolerance=None, logger=None)[source]
在每个 (start, end) 区间内查询距 end 最近的有效值(向前查)
时间结构:start <= t.datetime <= end,锚点 = end
- Parameters:
- Returns:
code, input_ts(=end), datetime, diff_hours, value, name
- Return type:
DataFrame
- betalens.datafeed.query_time_range(cursor, table_name, codes=None, start_date=None, end_date=None, metric=None, limit=None, logger=None)[source]
查询指定时间范围的数据
- Parameters:
- Return type:
DataFrame- Returns:
DataFrame
- betalens.datafeed.get_available_dates(cursor, table_name, code, metric, start_date=None, end_date=None, logger=None)[source]
获取指定代码和指标的可用日期列表
- betalens.datafeed.get_latest_date(cursor, table_name, code=None, metric=None, logger=None)[source]
获取最新的数据日期
- betalens.datafeed.align_to_dates(df, target_dates, date_column='datetime', method='ffill')[source]
将数据对齐到目标日期序列
- betalens.datafeed.query_industry(cursor, codes, dates, scheme='申万一级行业', table_name='industry', exact=False, logger=None)[source]
正查:每个 (code, date) 在该日所属的行业(point-in-time,取 datetime<=date 的最近一条)
- Parameters:
cursor – 数据库游标(建议 RealDictCursor)
dates (
Union[str,List[str]]) – 查询日期,单个或列表,格式 ‘YYYY-MM-DD’ 或 ‘YYYY-MM-DD HH:MM:SS’scheme (
str) – 分类体系(即 metric)。不带版本后缀(如 ‘申万一级行业’)时自动匹配全部 版本,最近一条天然落到查询日生效的版本;带后缀(如 ‘申万一级行业(2021)’) 则只查该版本。table_name (
str) – 表名,默认 ‘industry’exact (
bool) – 强制精确匹配 metric(关闭版本自动选择),默认 False
- Returns:
- code | query_date | effective_dt | sec_name |
industry_value | ind_name | ind_code | scheme
无归属记录的 (code,date) 行业字段为 NaN/None
- Return type:
DataFrame
- betalens.datafeed.get_industry_members(cursor, industry, date, scheme='申万一级行业', table_name='industry', by='name', exact=False, logger=None)[source]
反查:某日某行业的成分股(每只股票取 datetime<=date 的最近归属,再筛目标行业)
- Parameters:
cursor – 数据库游标
industry (
Union[str,int,float]) – 目标行业,可为行业名(str,匹配 remark->>’ind_name’) 或行业代码数值(int/float,匹配 value)date (
str) – 查询日期scheme (
str) – 分类体系(metric)。不带版本后缀时自动匹配全部版本(最近一条天然落到 查询日生效的版本);带后缀只查该版本。table_name (
str) – 表名by (
str) – ‘name’ 用行业名匹配,’value’ 用行业代码数值匹配; industry 类型也会自动推断exact (
bool) – 强制精确匹配 metric(关闭版本自动选择),默认 False
- Returns:
code | sec_name | industry_value | ind_name | ind_code | scheme
- Return type:
DataFrame
- betalens.datafeed.build_industry_records(df, scheme='申万一级行业', code_col='code', name_col='name', date_col='effective_dt', ind_name_col='ind_name', ind_code_col='ind_code')[source]
入库辅助:把行业归属明细整理成可直接 incremental_insert 的长格式
- 输入每行 = 一条归属事件 (证券, 生效日, 行业)。输出列:
datetime, code, name, metric(=scheme), value(=行业代码数值), remark(dict)
- Parameters:
- Return type:
DataFrame- Returns:
长格式 DataFrame(datetime, code, name, metric, value, remark)
- betalens.datafeed.get_index_universe(cursor, index_code, date, table_name='index_universe', metric='universe', logger=None)[source]
返回 index_code 在 date 当日生效的成分股代码列表(point-in-time)。
步骤:用 query.query_nearest_before 找到 <=date 的最近生效快照日,再取该行 remark 中的 constituents 列表。该日前无可用股票池则返回空列表。
- Parameters:
- Return type:
- Returns:
成分股代码列表(如 [‘000001.SZ’, …]);无可用股票池则返回 []
- betalens.datafeed.get_index_universe_date(cursor, index_code, date, table_name='index_universe', metric='universe', logger=None)[source]
返回某指数在某日实际生效的快照日期(point-in-time,取 datetime<=date 的最近一条)。
复用 query.query_nearest_before 定位最近生效日。
- Parameters:
- Returns:
生效快照的 datetime(pandas.Timestamp);该日前无可用股票池则返回 None
- betalens.datafeed.process_excel_to_db_format(filepath, config, logger=None)[source]
处理Excel文件为数据库格式
- Parameters:
- Return type:
- Returns:
(处理后的DataFrame, 错误列表)
- betalens.datafeed.check_existing_rows(cursor, df, table, key_columns=None, logger=None)[source]
检查数据库中是否已存在相关数据行
- betalens.datafeed.insert_dataframe(cursor, conn, df, table, batch_size=1000, check_duplicates=True, key_columns=None, skip_duplicates=True, logger=None)[source]
将DataFrame插入数据库
- Parameters:
- Return type:
- Returns:
(是否成功, 消息, 统计信息字典)
- betalens.datafeed.get_existing_dates(cursor, table, code, metric, logger=None)[source]
获取数据库中已存在的日期
- betalens.datafeed.incremental_insert(cursor, conn, df, table, date_column='datetime', code_column='code', metric_column='metric', logger=None)[source]
增量插入:只插入数据库中不存在的数据
- betalens.datafeed.save_error_file(filepath, df, errors, error_dir='./errors', error_subdir='failed_files', logger=None)[source]
保存错误文件和日志
- betalens.datafeed.process_directory_tree(cursor, conn, root_dir, table, config, file_pattern='*.csv', recursive=True, mode='insert', error_dir='./errors', logger=None)[source]
按照目录树结构处理和插入Excel文件
- class betalens.datafeed.ConfigManager(config_file=None)[source]
Bases:
object配置管理器
- Parameters:
config_file (str | None)
- get(key_path, default=None)[source]
获取配置值
- Parameters:
- Return type:
- Returns:
配置值
Example
>>> config = ConfigManager() >>> config.get('database.dbname') 'datafeed' >>> config.get('database.port') '5432'
- set(key_path, value)[source]
设置配置值
Example
>>> config = ConfigManager() >>> config.set('database.dbname', 'my_database') >>> config.get('database.dbname') 'my_database'
- betalens.datafeed.get_config(config_file=None)[source]
获取全局配置实例
- Parameters:
- Return type:
- Returns:
ConfigManager实例
- betalens.datafeed.func_timer(function)[source]
用装饰器实现函数计时 :type function: :param function: 需要计时的函数 :return: None
- betalens.datafeed.get_absolute_trade_days(begin_date, end_date, period, use_pmc=True)[source]
获取交易日序列
- Parameters:
begin_date – 开始日期,字符串格式
end_date – 结束日期,字符串格式
period – 周期,如’D’(日), ‘W’(周), ‘M’(月), ‘Q’(季), ‘S’(半年), ‘Y’(年)
use_pmc – 默认True,使用pandas_market_calendars(中国A股/北京时区);False时使用akshare
- Returns:
交易日列表(datetime.datetime对象)
- betalens.datafeed.trade_days_offset(begin_datetime, offset, period='D')[source]
交易日偏移计算
- Parameters:
begin_datetime – 起始datetime对象
offset – 偏移量(整数)
period – 周期,默认’D’
- Returns:
偏移后的datetime对象
子模块
core
## 研究数据库结构(摘要)
### 表1 个券行情(日频) - 列:入库实际时间(=最早可交易时间)、windcode、中文名、数据性质(收盘价/成交量等)、数值、备注(json) - 规则:开盘价最早 09:30;其余价量最早 15:00 可确定 - 处理:计算日频收益率(如 close-to-close)用于回测与挖掘
### 表2 个券基本面(日频入库,事件驱动) - 列:入库实际时间(=最早可交易时间)、数据理论发生时间(报告期,如 0331/0630/0930/1231)、windcode、中文名、数据性质(如归母净利润)、数值、备注(json) - 规则:按公告时点入库,关注盘前/盘中/盘后;理论发生时间仅用于展示,不做因果外推 - 年报次序不一:优先一致预期或线性外推作占位,尽量避免非原生数据入库
### 表3 宏观经济(事件驱动) - 列:入库实际时间(=最早可交易时间)、数据理论发生时间(如“1月GDP”)、windcode、中文名、数据性质(可含均线算子)、数值、备注(json) - 规则:与表2一致(事件驱动、区分公告时点与发生时点)
### 表4 因子库 - 列:入库实际时间(=最早可交易时间)、数据编制方式、数值、备注(json)
## 投资数据库结构(摘要) - 不保留完整历史 - 从研究数据库按需拉取最近滚动窗口数据;其余数据在线拉取 - 在线数据仅记录入库时间(可交易可用时点)
## 投资数据库结构: - 出于投资目的,实际不需要历史数据 - 从研究数据库拉取所需最近一个滚动窗口内的数据,其余全部在线拉取,并直接记录入库时间
更新日志: 2025-10-31: 重构datafeed.py,新增工具模块 - excel.py: Excel文件处理模块 - validation.py: 数据验证和异常检查工具 - query.py: 数据库查询功能重构 - integration.py: 数据库与Excel交互功能 - wind_ingest.py: Wind数据抓取模块
2025-11-03: 简化core.py为薄封装层 - 移除所有业务逻辑到子模块 - core.py仅保留连接管理和函数组合 - 所有数据转换、插入逻辑位于子模块
- betalens.datafeed.core.func_timer(function)[source]
用装饰器实现函数计时 :type function: :param function: 需要计时的函数 :return: None
- class betalens.datafeed.core.Datafeed(table_name, db_config=None, log_dir=None)[source]
Bases:
object数据管理主类(薄封装层)
职责: - 管理数据库连接 - 提供统一的日志记录 - 组合子模块功能为高层API - 不包含业务逻辑(所有逻辑在子模块中)
- 使用示例:
# 创建实例(使用默认配置) df = Datafeed(“daily_market_data”)
# 创建实例(自定义数据库配置) df = Datafeed(
table_name=”daily_market_data”, db_config={
‘dbname’: ‘my_database’, ‘user’: ‘my_user’, ‘password’: ‘my_password’, ‘host’: ‘localhost’, ‘port’: ‘5432’
}
)
# 单文件插入 df.insert_csv_file(“data.csv”, config={…})
# Wind数据抓取 df.ingest_wind_daily_market(
codes=[‘000001.SZ’], start_date=’2024-01-01’, end_date=’2024-01-31’
)
# 查询 data = df.query_time_range(
codes=[‘000001.SZ’], start_date=’2024-01-01’, metric=’收盘价(元)’
)
# 关闭 df.close()
- __init__(table_name, db_config=None, log_dir=None)[source]
初始化Datafeed实例
- Parameters:
table_name – 数据库表名
db_config – 数据库配置字典,包含以下键: - dbname: 数据库名 - user: 用户名 - password: 密码 - host: 主机地址 - port: 端口 如果为None,使用config.json中的配置
log_dir – 日志目录,如果为None,使用config.json中的配置
- insert_csv_file(filepath, config, mode='incremental')[source]
单文件CSV插入(薄封装)
组合流程: 1. excel.read_file - 读取文件 2. excel.cross_section_to_db_format - 转换格式(如果需要) 3. validation.validate_and_fix - 验证和修复(如果配置) 4. integration.insert_dataframe/incremental_insert - 插入
- Parameters:
- Return type:
- Returns:
统计信息字典
- insert_ede_file(filepath, date_from='filename', default_datetime=None, mode='incremental')[source]
处理并插入EDE格式的Excel文件(薄封装)
EDE格式特征: - 第一列:证券代码 -第二列:证券简称 - 第三列及之后:指标列,格式为”指标名 [元数据] 值类型 [元数据] 单位”
- 示例EDE格式:
证券代码 证券简称 流通A股 [交易日期] 最新 [单位] 股 002460.SZ 赣锋锂业 1,211,379,763.0000 1772.HK 赣锋锂业 1,211,379,763.0000
处理流程: 1. 读取Excel文件 2. 清理数据(去除空值、”数据来源:Wind”等) 3. 识别code、name列 4. 解析metric列,提取指标名称和元数据 5. 构建日期列(从文件名或列名中提取) 6. 转换为数据库格式 7. 插入数据库
- Parameters:
filepath (
str) – EDE格式Excel文件路径date_from (
str) – 日期来源,可选值: - ‘filename’: 从文件名提取日期(如EDE20251103.xlsx -> 2025-11-03 15:30:00) - ‘metric’: 从列名中的[日期]部分提取default_datetime (
Optional[str]) – 默认日期时间(当无法从文件名或列名提取时使用) 格式:’YYYY-MM-DD HH:MM:SS’,如’2025-11-03 15:30:00’mode (
str) – 插入模式 - ‘incremental’: 增量插入(默认),只插入新数据 - ‘insert’: 直接插入,会检查重复并跳过
- Return type:
- Returns:
- 统计信息字典,包含:
success: 是否成功
new_rows: 新增行数
skipped_rows: 跳过行数
errors: 错误列表(如果有)
Example
>>> df = Datafeed('daily_market_data') >>> result = df.insert_ede_file( ... 'EDE20251103.xlsx', ... date_from='filename', ... mode='incremental' ... ) >>> print(f"新增{result['new_rows']}行,跳过{result['skipped_rows']}行")
- batch_process_excel_files(folder_path, config, file_pattern='*.csv', recursive=True, mode='insert')[source]
批量处理Excel文件并插入数据库(薄封装)
直接调用 integration.process_directory_tree
- incremental_update(df, date_column='datetime', code_column='code', metric_column='metric')[source]
增量更新数据到数据库(薄封装)
直接调用 integration.incremental_insert
- insert_with_conflict_check(df, date_column='datetime', code_column='code', metric_column='metric')[source]
插入数据时检测重复和冲突(批量查询优化版本)
如果key(datetime, code, metric)相同但value不同:记录冲突,不插入
如果key和value完全相同:跳过,不插入
如果key不存在:插入新记录
- update_data(df, date_column='datetime', code_column='code', metric_column='metric')[source]
使用SQL UPDATE批量更新数据
对于存在的记录(相同datetime, code, metric),更新value和name 对于不存在的记录,可选择插入(upsert模式)
- ingest_wind_daily_market(codes, start_date, end_date, fields=None, asset_type='stock', mode='incremental')[source]
从Wind获取日行情并插入数据库(薄封装)
组合流程: 1. wind_ingest.fetch_daily_market - 获取Wind数据 2. integration.incremental_insert/insert_dataframe - 插入
- ingest_wind_daily_index(codes, start_date, end_date, fields=None, mode='incremental')[source]
Wind指数数据抓取(便捷封装)
- ingest_wind_daily_fund(codes, start_date, end_date, fields=None, mode='incremental')[source]
Wind基金数据抓取(便捷封装)
- ingest_wind_daily_bond(codes, start_date, end_date, fields=None, mode='incremental')[source]
Wind债券数据抓取(便捷封装)
- run_query(conditions=None, params=None, select_columns='*')[source]
执行自定义SQL查询(薄封装)
替代原 query_data 方法,使用 query.build_query
- query_time_range(codes=None, start_date=None, end_date=None, metric=None, limit=None)[source]
查询指定时间范围的数据(薄封装)
直接调用 query.query_time_range
- query_nearest_after(params=None)[source]
根据输入时间戳序列查找每个时点之后最近的有效值(薄封装)
主要用于回测时提取价格 直接调用 query.query_nearest_after
- Parameters:
params (dict) – 必须包含以下键: - codes: 代码列表 - datetimes: 目标时间戳列表(格式:’YYYY-MM-DD HH:MM’) - metric: 查询的指标名称 - time_tolerance: 允许的最大时间间隔(单位:小时,默认不限制)
- Returns:
- 包含以下列:
code | input_ts | datetime | diff_hours | value | name
- Return type:
DataFrame
- query_nearest_in_range_after(params=None)[source]
在 (start, end) 区间内查找距 start 最近的有效值(薄封装)
- Parameters:
params (dict) – 必须包含以下键: - codes: 代码列表 - ranges: [(start, end), …] 区间列表 - metric: 指标名 - time_tolerance: 锚点容差(小时,可选)
- Returns:
code | input_ts(=start) | datetime | diff_hours | value | name
- Return type:
DataFrame
- query_nearest_in_range_before(params=None)[source]
在 (start, end) 区间内查找距 end 最近的有效值(薄封装)
- Parameters:
params (dict) – 必须包含以下键: - codes: 代码列表 - ranges: [(start, end), …] 区间列表 - metric: 指标名 - time_tolerance: 锚点容差(小时,可选)
- Returns:
code | input_ts(=end) | datetime | diff_hours | value | name
- Return type:
DataFrame
- query_nearest_before(params=None)[source]
根据输入时间戳序列查找每个时点之前最近的有效值(薄封装)
主要用于回测时提取历史价格特征 直接调用 query.query_nearest_before
- Parameters:
params (dict) – 必须包含以下键: - codes: 代码列表 - datetimes: 目标时间戳列表(格式:’YYYY-MM-DD HH:MM’) - metric: 查询的指标名称 - time_tolerance: 允许的最大时间间隔(单位:小时,默认不限制)
- Returns:
- 包含以下列:
code | input_ts | datetime | diff_hours | value | name
- Return type:
DataFrame
- get_available_dates(code, metric, start_date=None, end_date=None)[source]
获取指定代码和指标的可用日期列表(薄封装)
直接调用 query.get_available_dates
- validate_dataframe(df, validations)[source]
验证和修复DataFrame(薄封装)
直接调用 validation.validate_and_fix
- Parameters:
df (
DataFrame) – 待验证的DataFramevalidations (
dict) – 验证配置
- Returns:
(修复后的DataFrame, 验证报告)
- betalens.datafeed.core.get_absolute_trade_days(begin_date, end_date, period, use_pmc=True)[source]
获取交易日序列
- Parameters:
begin_date – 开始日期,字符串格式
end_date – 结束日期,字符串格式
period – 周期,如’D’(日), ‘W’(周), ‘M’(月), ‘Q’(季), ‘S’(半年), ‘Y’(年)
use_pmc – 默认True,使用pandas_market_calendars(中国A股/北京时区);False时使用akshare
- Returns:
交易日列表(datetime.datetime对象)
excel
Excel文件处理工具模块(函数式) 功能: - 读取CSV/XLSX文件转为DataFrame - 将cross-section数据转换为数据库三列表格式 - 批量文件操作 - 文件夹分类和目录树生成 - Excel错误检查(错行、空值、异常值)
- betalens.datafeed.excel.read_csv_with_encoding(filepath, logger=None, **kwargs)[source]
尝试使用多种编码读取CSV文件
- betalens.datafeed.excel.read_file(filepath, logger=None, **kwargs)[source]
读取CSV或XLSX文件为DataFrame 支持多种编码格式(UTF-8, GB2312, GBK, GB18030等)
- Parameters:
- Return type:
DataFrame- Returns:
DataFrame
- Raises:
ValueError – 不支持的文件格式
FileNotFoundError – 文件不存在
- betalens.datafeed.excel.cross_section_to_db_format(df, key_columns, value_columns, key_value_mapping, additional_fields=None, logger=None)[source]
将cross-section格式数据转换为数据库三列表格式
- Parameters:
- Return type:
DataFrame- Returns:
转换后的DataFrame,格式为:[key_columns, metric, value, …]
Example
- 输入:
code | name | 2024-01-01 | 2024-01-02 A001 | 股票A | 100 | 101
- 输出:
code | name | date | value A001 | 股票A | 2024-01-01 | 100 A001 | 股票A | 2024-01-02 | 101
- betalens.datafeed.excel.batch_read_files(folder_path, file_pattern='*.csv', recursive=False, logger=None, **read_kwargs)[source]
批量读取文件夹中的文件
- betalens.datafeed.excel.batch_write_files(data_dict, output_dir, file_format='csv', create_subdirs=True, logger=None, **write_kwargs)[source]
批量写入文件 CSV默认使用utf-8-sig编码,确保Excel能正确显示中文
- Parameters:
- Return type:
- Returns:
成功写入的文件路径列表
- betalens.datafeed.excel.create_directory_tree(data_dict, output_dir, categorize_by=None, logger=None)[source]
创建目录树并分类保存文件
- betalens.datafeed.excel.apply_time_alignment(df, date_column='日期', metric_column='variable', open_metric_names=None, open_time=None, other_time=None, inplace=False, logger=None)[source]
根据指标类型为日期列添加时间戳
开盘价在交易日09:30可用,其他价格/成交量在15:00收盘后可用。
- Parameters:
- Return type:
DataFrame- Returns:
添加时间戳后的DataFrame
Example
>>> df = pd.DataFrame({ ... '日期': ['2024-01-01', '2024-01-01'], ... 'variable': ['开盘价(元)', '收盘价(元)'], ... 'value': [100, 101] ... }) >>> df = apply_time_alignment(df) >>> df['日期'] 0 2024-01-01 09:30:01 1 2024-01-01 15:00:01
- betalens.datafeed.excel.check_excel_errors(df, checks=None, logger=None)[source]
检查Excel数据中的错误
- Parameters:
df (
DataFrame) – 待检查的DataFramechecks (
Optional[Dict[str,any]]) –检查配置字典,如: {
’check_empty_rows’: True, # 检查空行 ‘check_null_values’: True, # 检查空值 ‘check_duplicates’: [‘col1’, ‘col2’], # 检查重复(指定列) ‘check_data_types’: {‘col1’: ‘int’, ‘col2’: ‘float’}, # 检查数据类型 ‘check_value_range’: {‘col1’: (0, 100)}, # 检查值范围
}
- Return type:
- Returns:
(是否通过检查, 错误列表)
validation
数据验证和异常检查工具模块(函数式) 功能: - 检查空值、NaN、None - 检查日期列的格式、重复、排序、频率 - 提供多种修复策略(替换、填充、删除、抛出错误)
- class betalens.datafeed.validation.FillStrategy(value)[source]
Bases:
Enum填充策略枚举
- RAISE_ERROR = 'raise_error'
- DROP = 'drop'
- FILL_FORWARD = 'ffill'
- FILL_BACKWARD = 'bfill'
- FILL_VALUE = 'fill_value'
- FILL_MEAN = 'mean'
- FILL_MEDIAN = 'median'
- FILL_MODE = 'mode'
- INTERPOLATE = 'interpolate'
- betalens.datafeed.validation.check_null_values(df, columns=None, check_types=None, logger=None)[source]
检查空值、NaN、None
- betalens.datafeed.validation.check_datetime_column(df, date_column, expected_freq=None, check_sorted=True, check_duplicates=True, check_format=True, logger=None)[source]
检查日期列的各种问题
- Parameters:
- Return type:
- Returns:
检查结果字典
- betalens.datafeed.validation.fix_null_values(df, strategy, columns=None, fill_value=None, inplace=False, logger=None)[source]
修复空值
- Parameters:
- Return type:
DataFrame- Returns:
修复后的DataFrame
- betalens.datafeed.validation.drop_duplicates_strict(df, subset=None, keep='first', verify_all_fields=True, ignore_cols=None, inplace=False, logger=None)[source]
严格去重:确保只有完全相同的行才会被删除
- Parameters:
- Return type:
- Returns:
(修复后的DataFrame, 去重报告)
- betalens.datafeed.validation.fix_datetime_column(df, date_column, fix_format=True, fix_duplicates='keep_first', fix_sort=True, sort_order='ascending', dedupe_subset=None, verify_all_fields=True, inplace=False, logger=None)[source]
修复日期列的问题
- Parameters:
df (
DataFrame) – DataFramedate_column (
str) – 日期列名fix_format (
bool) – 是否修复格式(转换为datetime)fix_sort (
bool) – 是否排序sort_order (
str) – 排序顺序,’ascending’或’descending’dedupe_subset (
Optional[List[str]]) – 去重时使用的列组合,None则使用[date_column] 推荐: [‘code’, ‘metric’, date_column] 避免误删不同metric的数据verify_all_fields (
bool) – 是否验证subset外的其他字段也相同(严格模式)inplace (
bool) – 是否原地修改
- Return type:
DataFrame- Returns:
修复后的DataFrame
- betalens.datafeed.validation.validate_and_fix(df, validations, inplace=False, logger=None)[source]
综合验证和修复
- Parameters:
df (
DataFrame) – DataFramevalidations (
Dict[str,Dict]) –验证配置字典,格式如: {
- ’null_check’: {
‘columns’: [‘col1’, ‘col2’], ‘fix_strategy’: ‘ffill’
}, ‘datetime_check’: {
’column’: ‘date’, ‘expected_freq’: ‘D’, ‘fix_format’: True, ‘fix_duplicates’: ‘keep_first’, ‘fix_sort’: True
}
}
inplace (
bool) – 是否原地修改
- Return type:
- Returns:
(修复后的DataFrame, 验证报告)
- class betalens.datafeed.validation.DataValidator(logger=None)[source]
Bases:
objectDataValidator - 已弃用,请直接使用模块级函数
此类已弃用,方法不再实现。请使用模块级函数: - check_null_values() - check_datetime_column() - fix_null_values() - drop_duplicates_strict() - fix_datetime_column() - validate_and_fix()
- Parameters:
logger (Logger | None)
- check_datetime_column(df, date_column, expected_freq=None, check_sorted=True, check_duplicates=True, check_format=True)[source]
- drop_duplicates_strict(df, subset=None, keep='first', verify_all_fields=True, ignore_cols=None, inplace=False)[source]
query
数据库查询工具模块(函数式) 功能: - 重构query_nearest_after和query_nearest_before - 解耦数据库查询逻辑 - 提供灵活的查询参数构建 - 支持时间点匹配和数据提取
- betalens.datafeed.query.build_query(table_name, conditions=None, params=None, select_columns='*', order_by=None, limit=None)[source]
构建SQL查询
- betalens.datafeed.query.generate_input_range_pairs(codes, ranges)[source]
生成 (code, start_ts, end_ts) 笛卡尔积
- betalens.datafeed.query.build_nearest_in_range_query(table_name, input_tuples, metric, direction='after', time_tolerance=None)[source]
构建区间内最近时点匹配查询
在每个 (code, start_ts, end_ts) 区间内,按方向查找距锚点最近的数据: - direction=’after’:锚点为 start_ts,区间过滤 t.datetime > start AND t.datetime < end - direction=’before’:锚点为 end_ts,区间过滤 t.datetime <= end AND t.datetime >= start
- betalens.datafeed.query.build_nearest_query(table_name, input_tuples, metric, direction='after', time_tolerance=None)[source]
构建最近时点匹配查询
- betalens.datafeed.query.query_nearest_after(cursor, table_name, codes, datetimes, metric, time_tolerance=None, logger=None)[source]
查询每个时点之后最近的有效值
用途:主要用于回测时提取价格 时间结构:最新特征 <= 提数时点 < 调仓时点
- Parameters:
- Return type:
DataFrame- Returns:
- DataFrame,包含列:
code: 代码
input_ts: 输入时间戳(提数时点)
datetime: 匹配到的数据时间戳
diff_hours: 时间差(小时)
value: 数据值
name: 名称
- betalens.datafeed.query.query_nearest_before(cursor, table_name, codes, datetimes, metric, time_tolerance=None, logger=None)[source]
查询每个时点之前最近的有效值
用途:主要用于回测时提取历史价格特征 时间结构:调仓时点 <= 提数时点 < 最新特征时点
- Parameters:
- Return type:
DataFrame- Returns:
- DataFrame,包含列:
code: 代码
input_ts: 输入时间戳(提数时点)
datetime: 匹配到的数据时间戳
diff_hours: 时间差(小时)
value: 数据值
name: 名称
- betalens.datafeed.query.query_nearest_in_range_after(cursor, table_name, codes, ranges, metric, time_tolerance=None, logger=None)[source]
在每个 (start, end) 区间内查询距 start 最近的有效值(向后查)
时间结构:start <= t.datetime - epsilon, t.datetime < end,锚点 = start
- Parameters:
- Returns:
code, input_ts(=start), datetime, diff_hours, value, name
- Return type:
DataFrame
- betalens.datafeed.query.query_nearest_in_range_before(cursor, table_name, codes, ranges, metric, time_tolerance=None, logger=None)[source]
在每个 (start, end) 区间内查询距 end 最近的有效值(向前查)
时间结构:start <= t.datetime <= end,锚点 = end
- Parameters:
- Returns:
code, input_ts(=end), datetime, diff_hours, value, name
- Return type:
DataFrame
- betalens.datafeed.query.query_time_range(cursor, table_name, codes=None, start_date=None, end_date=None, metric=None, limit=None, logger=None)[source]
查询指定时间范围的数据
- Parameters:
- Return type:
DataFrame- Returns:
DataFrame
- betalens.datafeed.query.get_available_dates(cursor, table_name, code, metric, start_date=None, end_date=None, logger=None)[source]
获取指定代码和指标的可用日期列表
- betalens.datafeed.query.get_latest_date(cursor, table_name, code=None, metric=None, logger=None)[source]
获取最新的数据日期
integration
数据库-Excel交互功能模块(函数式) 功能: - 按照目录树结构读取和处理Excel文件 - 将处理后的数据插入数据库 - 增量更新功能(只插入新数据) - 错误检查和日志记录 - 保存错误数据和源文件
- betalens.datafeed.integration.process_excel_to_db_format(filepath, config, logger=None)[source]
处理Excel文件为数据库格式
- Parameters:
- Return type:
- Returns:
(处理后的DataFrame, 错误列表)
- betalens.datafeed.integration.check_existing_rows(cursor, df, table, key_columns=None, logger=None)[source]
检查数据库中是否已存在相关数据行
- betalens.datafeed.integration.insert_dataframe(cursor, conn, df, table, batch_size=1000, check_duplicates=True, key_columns=None, skip_duplicates=True, logger=None)[source]
将DataFrame插入数据库
- Parameters:
- Return type:
- Returns:
(是否成功, 消息, 统计信息字典)
- betalens.datafeed.integration.get_existing_dates(cursor, table, code, metric, logger=None)[source]
获取数据库中已存在的日期
- betalens.datafeed.integration.incremental_insert(cursor, conn, df, table, date_column='datetime', code_column='code', metric_column='metric', logger=None)[source]
增量插入:只插入数据库中不存在的数据
- betalens.datafeed.integration.save_error_file(filepath, df, errors, error_dir='./errors', error_subdir='failed_files', logger=None)[source]
保存错误文件和日志
- betalens.datafeed.integration.process_directory_tree(cursor, conn, root_dir, table, config, file_pattern='*.csv', recursive=True, mode='insert', error_dir='./errors', logger=None)[source]
按照目录树结构处理和插入Excel文件
universe
指数历史股票池查询模块(函数式)
设计要点
指数成分股池是 point-in-time 的”状态”数据:某指数从某调整日起拥有一组成分股, 直到下次调整。每个生效日存为一行,约定:
code : 指数代码(如 ‘000906.SH’)
name : 指数名称(如 ‘中证800’)
metric : 固定为 ‘universe’,标识成分股池
value : 成分股数量(便于校验)
remark : JSONB,约定 {“index_code”, “index_name”, “constituents”: […]}
datetime: 该股票池的生效时点(最早可知日)
查询语义 = 取 datetime <= 查询日 的最近一条,与 query.query_nearest_before 同构, 天然避免前视偏差。由于成分股列表存在 remark(JSONB),而 query_nearest_before 只 返回 value/name,故本模块用它先”定位”最近生效日,再补一条小查询取出 remark。
主要接口
get_index_universe : 返回某指数某日生效的成分股代码列表 get_index_universe_date : 返回某指数某日实际生效的快照日期(便于排查)
- betalens.datafeed.universe.get_index_universe_date(cursor, index_code, date, table_name='index_universe', metric='universe', logger=None)[source]
返回某指数在某日实际生效的快照日期(point-in-time,取 datetime<=date 的最近一条)。
复用 query.query_nearest_before 定位最近生效日。
- Parameters:
- Returns:
生效快照的 datetime(pandas.Timestamp);该日前无可用股票池则返回 None
- betalens.datafeed.universe.get_index_universe(cursor, index_code, date, table_name='index_universe', metric='universe', logger=None)[source]
返回 index_code 在 date 当日生效的成分股代码列表(point-in-time)。
步骤:用 query.query_nearest_before 找到 <=date 的最近生效快照日,再取该行 remark 中的 constituents 列表。该日前无可用股票池则返回空列表。
- Parameters:
- Return type:
- Returns:
成分股代码列表(如 [‘000001.SZ’, …]);无可用股票池则返回 []
config
配置管理模块 功能: - 加载和管理datafeed模块的配置参数 - 支持从config.json文件读取配置 - 支持运行时动态修改配置 - 提供配置验证和默认值
- class betalens.datafeed.config.ConfigManager(config_file=None)[source]
Bases:
object配置管理器
- Parameters:
config_file (str | None)
- get(key_path, default=None)[source]
获取配置值
- Parameters:
- Return type:
- Returns:
配置值
Example
>>> config = ConfigManager() >>> config.get('database.dbname') 'datafeed' >>> config.get('database.port') '5432'
- set(key_path, value)[source]
设置配置值
Example
>>> config = ConfigManager() >>> config.set('database.dbname', 'my_database') >>> config.get('database.dbname') 'my_database'
wind_ingest
Wind数据抓取模块(函数式) 功能: - 从WindPy获取日行情数据 - 转换为数据库标准格式 - 支持多种资产类型(股票、指数、基金、债券)
- betalens.datafeed.wind_ingest.fetch_daily_market(codes, start_date, end_date, fields=None, asset_type='stock', apply_time_stamps=True, logger=None)[source]
从Wind获取日行情数据并转换为数据库格式
- Parameters:
- Return type:
DataFrame- Returns:
- DataFrame,格式为:
datetime | code | name | metric | value
Example
>>> df = fetch_daily_market( ... codes=['000001.SZ'], ... start_date='2024-01-01', ... end_date='2024-01-31' ... ) >>> df.head() datetime code name metric value 0 2024-01-02 09:30:01 000001.SZ 平安银行 开盘价(元) 10.50 1 2024-01-02 15:00:01 000001.SZ 平安银行 收盘价(元) 10.55
- betalens.datafeed.wind_ingest.fetch_daily_index(codes, start_date, end_date, fields=None, apply_time_stamps=True, logger=None)[source]
获取指数日行情数据
这是fetch_daily_market的便捷封装,asset_type=’index’
- betalens.datafeed.wind_ingest.fetch_daily_fund(codes, start_date, end_date, fields=None, apply_time_stamps=True, logger=None)[source]
获取基金日行情数据
这是fetch_daily_market的便捷封装,asset_type=’fund’
ede_processor
EDE格式Excel文件处理工具模块(函数式) 功能: - 处理特定的EDE格式Excel文件(来自Wind等数据源) - 识别证券代码、名称和指标列 - 从列名中提取metric和元数据(日期、单位等) - 转换为数据库标准格式并插入
- betalens.datafeed.ede_processor.extract_date_from_filename(filepath, pattern=None, default_time=None, logger=None)[source]
从文件名中提取日期
- betalens.datafeed.ede_processor.parse_metric_column(column_name, logger=None)[source]
解析EDE格式的指标列名,提取metric名称和元数据
- EDE格式示例:
“流通A股 [交易日期] 最新 [单位] 股” “流通市值 [交易日期] 最新 [单位] 万元”
- Parameters:
- Return type:
- Returns:
(metric名称, 元数据字典)
Example
>>> parse_metric_column("流通A股 [交易日期] 最新 [单位] 股") ('流通A股', {'日期说明': '交易日期', '值类型': '最新', '单位说明': '单位', '单位': '股'})
- betalens.datafeed.ede_processor.extract_date_from_metric_metadata(metadata, column_name, default_time=None, logger=None)[source]
从metric元数据中提取日期
- 在某些情况下,列名中可能包含具体日期,如:
“流通A股 [20251103] 最新 [单位] 股”
- betalens.datafeed.ede_processor.clean_ede_dataframe(df, logger=None)[source]
清理EDE格式的DataFrame
操作: 1. 删除完全空白的行 2. 删除包含”数据来源”等无关字符串的行 3. 删除完全空白的列
- betalens.datafeed.ede_processor.identify_code_name_columns(df, code_column_names=None, name_column_names=None, logger=None)[source]
识别DataFrame中的代码列和名称列
- Parameters:
- Return type:
- Returns:
(代码列名, 名称列名),如果未找到则返回None
- betalens.datafeed.ede_processor.process_ede_file(filepath, date_from='filename', default_datetime=None, code_column_names=None, name_column_names=None, logger=None)[source]
处理EDE格式的Excel文件并转换为数据库格式
- EDE格式特征:
第一列:证券代码
第二列:证券简称
第三列及之后:指标列,格式为”指标名 [元数据] 值类型 [元数据] 单位”
- Parameters:
date_from (
str) – 日期来源,’filename’(从文件名提取)或’metric’(从列名提取)
- Return type:
- Returns:
(处理后的DataFrame, 错误列表)
- 处理后的DataFrame格式:
code: 证券代码
name: 证券简称
metric: 指标名称
value: 数值
datetime: 日期时间
note: 元数据(JSON格式)