#%%By Janis 250226
"""
数据验证和异常检查工具模块(函数式)
功能:
- 检查空值、NaN、None
- 检查日期列的格式、重复、排序、频率
- 提供多种修复策略(替换、填充、删除、抛出错误)
"""
import pandas as pd
import numpy as np
from typing import Optional, Union, List, Dict, Callable, Tuple, Any
from datetime import datetime
import logging
from enum import Enum
[docs]
class FillStrategy(Enum):
"""填充策略枚举"""
RAISE_ERROR = "raise_error" # 抛出错误并停止
DROP = "drop" # 删除含有问题的行
FILL_FORWARD = "ffill" # 向前填充
FILL_BACKWARD = "bfill" # 向后填充
FILL_VALUE = "fill_value" # 用指定值填充
FILL_MEAN = "mean" # 用均值填充
FILL_MEDIAN = "median" # 用中位数填充
FILL_MODE = "mode" # 用众数填充
INTERPOLATE = "interpolate" # 插值填充
def _get_default_logger():
"""获取默认logger"""
logger = logging.getLogger('DataValidator')
if not logger.handlers:
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
[docs]
def check_null_values(
df: pd.DataFrame,
columns: Optional[List[str]] = None,
check_types: Optional[List[str]] = None,
logger: Optional[logging.Logger] = None
) -> Dict[str, Any]:
"""
检查空值、NaN、None
Args:
df: 待检查的DataFrame
columns: 要检查的列名列表,None表示检查所有列
check_types: 检查类型列表,可选['null', 'nan', 'none', 'empty_string']
logger: 日志记录器,如果为None则使用默认logger
Returns:
检查结果字典
"""
if logger is None:
logger = _get_default_logger()
if columns is None:
columns = df.columns.tolist()
if check_types is None:
check_types = ['null', 'nan', 'none', 'empty_string']
results = {}
for col in columns:
if col not in df.columns:
logger.warning(f"列 '{col}' 不存在于DataFrame中")
continue
col_results = {
'total_rows': len(df),
'issues': {}
}
# 检查各种类型的空值
if 'null' in check_types or 'nan' in check_types:
null_mask = df[col].isnull()
null_count = null_mask.sum()
if null_count > 0:
col_results['issues']['null/nan'] = {
'count': int(null_count),
'percentage': float(null_count / len(df) * 100),
'indices': df[null_mask].index.tolist()[:10] # 只保留前10个
}
if 'none' in check_types:
none_mask = df[col] == None
none_count = none_mask.sum()
if none_count > 0:
col_results['issues']['none'] = {
'count': int(none_count),
'percentage': float(none_count / len(df) * 100),
'indices': df[none_mask].index.tolist()[:10]
}
if 'empty_string' in check_types:
if df[col].dtype == object:
empty_mask = df[col].astype(str).str.strip() == ''
empty_count = empty_mask.sum()
if empty_count > 0:
col_results['issues']['empty_string'] = {
'count': int(empty_count),
'percentage': float(empty_count / len(df) * 100),
'indices': df[empty_mask].index.tolist()[:10]
}
if col_results['issues']:
results[col] = col_results
logger.warning(f"列 '{col}' 发现空值问题: {col_results['issues']}")
return results
[docs]
def check_datetime_column(
df: pd.DataFrame,
date_column: str,
expected_freq: Optional[str] = None,
check_sorted: bool = True,
check_duplicates: bool = True,
check_format: bool = True,
logger: Optional[logging.Logger] = None
) -> Dict[str, Any]:
"""
检查日期列的各种问题
Args:
df: DataFrame
date_column: 日期列名
expected_freq: 期望的频率,如'D'(日), 'W'(周), 'M'(月), 'Q'(季度), 'Y'(年)
check_sorted: 是否检查排序
check_duplicates: 是否检查重复
check_format: 是否检查格式
logger: 日志记录器,如果为None则使用默认logger
Returns:
检查结果字典
"""
if logger is None:
logger = _get_default_logger()
results = {
'column': date_column,
'total_rows': len(df),
'issues': {}
}
if date_column not in df.columns:
results['issues']['column_not_found'] = f"列 '{date_column}' 不存在"
logger.error(results['issues']['column_not_found'])
return results
# 1. 检查格式和类型
if check_format:
try:
# 尝试转换为datetime
if not pd.api.types.is_datetime64_any_dtype(df[date_column]):
date_series = pd.to_datetime(df[date_column], errors='coerce')
# 检查转换失败的值
invalid_mask = date_series.isnull() & df[date_column].notnull()
invalid_count = invalid_mask.sum()
if invalid_count > 0:
results['issues']['invalid_format'] = {
'count': int(invalid_count),
'percentage': float(invalid_count / len(df) * 100),
'examples': df[invalid_mask][date_column].head(5).tolist()
}
logger.warning(f"列 '{date_column}' 有 {invalid_count} 个无效日期格式")
else:
date_series = df[date_column]
except Exception as e:
results['issues']['format_error'] = str(e)
logger.error(f"日期格式检查失败: {e}")
return results
else:
date_series = df[date_column]
# 移除NaT值进行后续检查
valid_dates = date_series.dropna()
if len(valid_dates) == 0:
results['issues']['all_null'] = "所有日期值都为空"
logger.error(results['issues']['all_null'])
return results
# 2. 检查重复
if check_duplicates:
duplicates = valid_dates[valid_dates.duplicated(keep=False)]
if len(duplicates) > 0:
dup_values = duplicates.value_counts()
results['issues']['duplicates'] = {
'count': int(len(duplicates)),
'unique_duplicate_dates': int(len(dup_values)),
'examples': dup_values.head(5).to_dict()
}
logger.warning(f"列 '{date_column}' 有 {len(duplicates)} 个重复日期")
# 3. 检查排序
if check_sorted:
is_sorted_asc = valid_dates.is_monotonic_increasing
is_sorted_desc = valid_dates.is_monotonic_decreasing
if not (is_sorted_asc or is_sorted_desc):
# 找出乱序的位置
diff = valid_dates.diff()
unsorted_indices = diff[diff < pd.Timedelta(0)].index.tolist()
results['issues']['unsorted'] = {
'is_sorted': False,
'unsorted_positions': unsorted_indices[:10], # 只保留前10个
'count': len(unsorted_indices)
}
logger.warning(f"列 '{date_column}' 未正确排序,发现 {len(unsorted_indices)} 处乱序")
else:
results['sort_order'] = 'ascending' if is_sorted_asc else 'descending'
# 4. 检查频率
if expected_freq is not None and len(valid_dates) > 1:
try:
# 推断实际频率
inferred_freq = pd.infer_freq(valid_dates)
if inferred_freq is None:
# 频率不一致,计算时间间隔
time_diffs = valid_dates.diff().dropna()
results['issues']['irregular_frequency'] = {
'expected': expected_freq,
'inferred': None,
'min_interval': str(time_diffs.min()),
'max_interval': str(time_diffs.max()),
'mean_interval': str(time_diffs.mean()),
'unique_intervals': int(time_diffs.nunique())
}
logger.warning(f"列 '{date_column}' 频率不规则,期望 {expected_freq}")
elif inferred_freq != expected_freq:
results['issues']['frequency_mismatch'] = {
'expected': expected_freq,
'inferred': inferred_freq
}
logger.warning(f"列 '{date_column}' 频率不匹配: 期望 {expected_freq}, 实际 {inferred_freq}")
else:
results['frequency'] = inferred_freq
except Exception as e:
results['issues']['frequency_check_error'] = str(e)
logger.error(f"频率检查失败: {e}")
# 5. 统计信息
results['stats'] = {
'min_date': str(valid_dates.min()),
'max_date': str(valid_dates.max()),
'date_range': str(valid_dates.max() - valid_dates.min()),
'unique_dates': int(valid_dates.nunique()),
'null_count': int(date_series.isnull().sum())
}
return results
[docs]
def fix_null_values(
df: pd.DataFrame,
strategy: Union[FillStrategy, str],
columns: Optional[List[str]] = None,
fill_value: Any = None,
inplace: bool = False,
logger: Optional[logging.Logger] = None
) -> pd.DataFrame:
"""
修复空值
Args:
df: DataFrame
strategy: 填充策略
columns: 要处理的列,None表示所有列
fill_value: 当strategy为FILL_VALUE时使用的填充值
inplace: 是否原地修改
logger: 日志记录器,如果为None则使用默认logger
Returns:
修复后的DataFrame
"""
if logger is None:
logger = _get_default_logger()
if not inplace:
df = df.copy()
if isinstance(strategy, str):
try:
strategy = FillStrategy(strategy)
except ValueError:
logger.error(f"无效的填充策略: {strategy}")
raise
if columns is None:
columns = df.columns.tolist()
for col in columns:
if col not in df.columns:
continue
null_count = df[col].isnull().sum()
if null_count == 0:
continue
logger.info(f"处理列 '{col}' 的 {null_count} 个空值,策略: {strategy.value}")
try:
if strategy == FillStrategy.RAISE_ERROR:
raise ValueError(f"列 '{col}' 包含 {null_count} 个空值")
elif strategy == FillStrategy.DROP:
df.dropna(subset=[col], inplace=True)
elif strategy == FillStrategy.FILL_FORWARD:
df[col] = df[col].ffill()
elif strategy == FillStrategy.FILL_BACKWARD:
df[col] = df[col].bfill()
elif strategy == FillStrategy.FILL_VALUE:
if fill_value is None:
logger.warning(f"未指定fill_value,使用0")
fill_value = 0
df[col] = df[col].fillna(fill_value)
elif strategy == FillStrategy.FILL_MEAN:
if pd.api.types.is_numeric_dtype(df[col]):
mean_val = df[col].mean()
df[col] = df[col].fillna(mean_val)
else:
logger.warning(f"列 '{col}' 不是数值类型,无法使用均值填充")
elif strategy == FillStrategy.FILL_MEDIAN:
if pd.api.types.is_numeric_dtype(df[col]):
median_val = df[col].median()
df[col] = df[col].fillna(median_val)
else:
logger.warning(f"列 '{col}' 不是数值类型,无法使用中位数填充")
elif strategy == FillStrategy.FILL_MODE:
mode_val = df[col].mode()
if len(mode_val) > 0:
df[col] = df[col].fillna(mode_val[0])
else:
logger.warning(f"列 '{col}' 无法计算众数")
elif strategy == FillStrategy.INTERPOLATE:
if pd.api.types.is_numeric_dtype(df[col]):
df[col] = df[col].interpolate()
else:
logger.warning(f"列 '{col}' 不是数值类型,无法使用插值填充")
except Exception as e:
logger.error(f"修复列 '{col}' 失败: {e}")
raise
return df
[docs]
def drop_duplicates_strict(
df: pd.DataFrame,
subset: Optional[List[str]] = None,
keep: str = 'first',
verify_all_fields: bool = True,
ignore_cols: Optional[List[str]] = None,
inplace: bool = False,
logger: Optional[logging.Logger] = None
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
"""
严格去重:确保只有完全相同的行才会被删除
Args:
df: DataFrame
subset: 用于判断重复的列,None表示所有列
keep: 'first', 'last', False(删除所有重复)
verify_all_fields: 是否验证subset外的其他字段也相同
ignore_cols: 验证时忽略的列(如索引、时间戳等)
inplace: 是否原地修改
logger: 日志记录器,如果为None则使用默认logger
Returns:
(修复后的DataFrame, 去重报告)
"""
if logger is None:
logger = _get_default_logger()
if not inplace:
df = df.copy()
report = {
'total_rows': len(df),
'duplicates_found': 0,
'duplicates_removed': 0,
'inconsistent_duplicates': [],
'removed_indices': []
}
if subset is None:
subset = df.columns.tolist()
# 检查subset列是否存在
missing_cols = [col for col in subset if col not in df.columns]
if missing_cols:
logger.warning(f"subset中的列不存在: {missing_cols}")
subset = [col for col in subset if col in df.columns]
if not subset:
logger.warning("没有有效的subset列,跳过去重")
return df, report
# 找出重复行
duplicated_mask = df.duplicated(subset=subset, keep=False)
num_duplicated = duplicated_mask.sum()
report['duplicates_found'] = int(num_duplicated)
if num_duplicated == 0:
logger.info("未发现重复数据")
return df, report
logger.info(f"发现 {num_duplicated} 行在 {subset} 上有重复")
# 如果需要验证所有字段
if verify_all_fields and num_duplicated > 0:
# 准备验证列(排除ignore_cols)
verify_cols = df.columns.tolist()
if ignore_cols:
verify_cols = [col for col in verify_cols if col not in ignore_cols]
# 找出subset重复但其他字段不一致的记录
duplicated_groups = df[duplicated_mask].groupby(subset)
inconsistent_groups = []
for name, group in duplicated_groups:
if len(group) > 1:
# 检查组内是否所有行完全相同(在verify_cols上)
first_row = group.iloc[0]
for idx, row in group.iloc[1:].iterrows():
# 比较所有verify_cols
differences = []
for col in verify_cols:
if col not in subset: # subset已经相同,不需要再比较
val1 = first_row[col]
val2 = row[col]
# 处理NaN的比较
if pd.isna(val1) and pd.isna(val2):
continue
if val1 != val2:
differences.append({
'column': col,
'value1': val1,
'value2': val2
})
if differences:
inconsistent_groups.append({
'subset_values': dict(zip(subset, name if isinstance(name, tuple) else [name])),
'indices': [group.index[0], idx],
'differences': differences
})
report['inconsistent_duplicates'] = inconsistent_groups
if inconsistent_groups:
logger.warning(
f"发现 {len(inconsistent_groups)} 组重复:在 {subset} 上相同但其他字段不同,将保留这些记录"
)
for i, group_info in enumerate(inconsistent_groups[:3]): # 只显示前3个
logger.warning(
f" 示例 {i+1}: {group_info['subset_values']} "
f"有 {len(group_info['differences'])} 个字段不同"
)
if len(inconsistent_groups) > 3:
logger.warning(f" ... 还有 {len(inconsistent_groups)-3} 组类似情况")
# 从duplicated_mask中移除这些不一致的重复
inconsistent_indices = set()
for group_info in inconsistent_groups:
inconsistent_indices.update(group_info['indices'])
# 只删除完全一致的重复
for idx in inconsistent_indices:
duplicated_mask.loc[idx] = False
# 执行去重
original_len = len(df)
if keep == 'first':
drop_mask = df.duplicated(subset=subset, keep='first')
elif keep == 'last':
drop_mask = df.duplicated(subset=subset, keep='last')
elif keep == False:
drop_mask = df.duplicated(subset=subset, keep=False)
else:
logger.warning(f"未知的keep参数: {keep},使用'first'")
drop_mask = df.duplicated(subset=subset, keep='first')
# 只删除完全一致的重复(已经通过verify_all_fields过滤)
if verify_all_fields and report['inconsistent_duplicates']:
# 重新计算drop_mask,排除不一致的
inconsistent_indices = set()
for group_info in report['inconsistent_duplicates']:
inconsistent_indices.update(group_info['indices'])
for idx in inconsistent_indices:
drop_mask.loc[idx] = False
report['removed_indices'] = df[drop_mask].index.tolist()
df.drop(df[drop_mask].index, inplace=True)
df.reset_index(drop=True, inplace=True)
removed = original_len - len(df)
report['duplicates_removed'] = removed
if removed > 0:
logger.info(f"删除了 {removed} 个完全相同的重复行")
return df, report
[docs]
def fix_datetime_column(
df: pd.DataFrame,
date_column: str,
fix_format: bool = True,
fix_duplicates: Optional[str] = 'keep_first',
fix_sort: bool = True,
sort_order: str = 'ascending',
dedupe_subset: Optional[List[str]] = None,
verify_all_fields: bool = True,
inplace: bool = False,
logger: Optional[logging.Logger] = None
) -> pd.DataFrame:
"""
修复日期列的问题
Args:
df: DataFrame
date_column: 日期列名
fix_format: 是否修复格式(转换为datetime)
fix_duplicates: 如何处理重复,None表示不处理
fix_sort: 是否排序
sort_order: 排序顺序,'ascending'或'descending'
dedupe_subset: 去重时使用的列组合,None则使用[date_column]
推荐: ['code', 'metric', date_column] 避免误删不同metric的数据
verify_all_fields: 是否验证subset外的其他字段也相同(严格模式)
inplace: 是否原地修改
logger: 日志记录器,如果为None则使用默认logger
Returns:
修复后的DataFrame
"""
if logger is None:
logger = _get_default_logger()
if not inplace:
df = df.copy()
if date_column not in df.columns:
logger.error(f"列 '{date_column}' 不存在")
return df
# 1. 修复格式
if fix_format:
try:
if not pd.api.types.is_datetime64_any_dtype(df[date_column]):
original_count = len(df)
df[date_column] = pd.to_datetime(df[date_column], errors='coerce')
# 统计转换失败的数量
invalid_count = df[date_column].isnull().sum()
if invalid_count > 0:
logger.warning(f"日期转换失败 {invalid_count} 个值,已设置为NaT")
else:
logger.info(f"成功将列 '{date_column}' 转换为datetime类型")
except Exception as e:
logger.error(f"日期格式修复失败: {e}")
# 2. 处理重复(使用严格去重)
if fix_duplicates is not None:
# 确定去重列
if dedupe_subset is None:
dedupe_subset = [date_column]
else:
# 确保date_column在subset中
if date_column not in dedupe_subset:
dedupe_subset = dedupe_subset + [date_column]
logger.info(f"使用列组合进行去重: {dedupe_subset}")
# 使用严格去重
df, report = drop_duplicates_strict(
df=df,
subset=dedupe_subset,
keep=fix_duplicates if fix_duplicates in ['first', 'last', False] else 'first',
verify_all_fields=verify_all_fields,
logger=logger,
inplace=True
)
# 记录不一致的重复
if report['inconsistent_duplicates']:
logger.warning(
f"发现 {len(report['inconsistent_duplicates'])} 组数据:"
f"在 {dedupe_subset} 上相同但其他字段不同,已保留(未删除)"
)
# 3. 排序
if fix_sort:
ascending = (sort_order == 'ascending')
df.sort_values(by=date_column, ascending=ascending, inplace=True)
df.reset_index(drop=True, inplace=True)
logger.info(f"已按 '{date_column}' {sort_order} 排序")
return df
[docs]
def validate_and_fix(
df: pd.DataFrame,
validations: Dict[str, Dict],
inplace: bool = False,
logger: Optional[logging.Logger] = None
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
"""
综合验证和修复
Args:
df: DataFrame
validations: 验证配置字典,格式如:
{
'null_check': {
'columns': ['col1', 'col2'],
'fix_strategy': 'ffill'
},
'datetime_check': {
'column': 'date',
'expected_freq': 'D',
'fix_format': True,
'fix_duplicates': 'keep_first',
'fix_sort': True
}
}
inplace: 是否原地修改
logger: 日志记录器,如果为None则使用默认logger
Returns:
(修复后的DataFrame, 验证报告)
"""
if logger is None:
logger = _get_default_logger()
if not inplace:
df = df.copy()
report = {}
# 1. 空值检查和修复
if 'null_check' in validations:
config = validations['null_check']
columns = config.get('columns', None)
# 检查
null_results = check_null_values(df, columns=columns, logger=logger)
report['null_check'] = null_results
# 修复
if 'fix_strategy' in config and null_results:
df = fix_null_values(
df,
strategy=config['fix_strategy'],
columns=columns,
fill_value=config.get('fill_value', None),
logger=logger,
inplace=True
)
report['null_fix'] = f"已使用策略 '{config['fix_strategy']}' 修复空值"
# 2. 日期列检查和修复
if 'datetime_check' in validations:
config = validations['datetime_check']
date_column = config.get('column')
if date_column:
# 检查
datetime_results = check_datetime_column(
df,
date_column=date_column,
expected_freq=config.get('expected_freq'),
check_sorted=config.get('check_sorted', True),
check_duplicates=config.get('check_duplicates', True),
check_format=config.get('check_format', True),
logger=logger
)
report['datetime_check'] = datetime_results
# 修复
if datetime_results.get('issues'):
df = fix_datetime_column(
df,
date_column=date_column,
fix_format=config.get('fix_format', True),
fix_duplicates=config.get('fix_duplicates', 'keep_first'),
fix_sort=config.get('fix_sort', True),
sort_order=config.get('sort_order', 'ascending'),
dedupe_subset=config.get('dedupe_subset', None),
verify_all_fields=config.get('verify_all_fields', True),
logger=logger,
inplace=True
)
report['datetime_fix'] = "已修复日期列问题"
return df, report
# 为了向后兼容,保留DataValidator作为函数集合的命名空间
[docs]
class DataValidator:
"""
DataValidator - 已弃用,请直接使用模块级函数
此类已弃用,方法不再实现。请使用模块级函数:
- check_null_values()
- check_datetime_column()
- fix_null_values()
- drop_duplicates_strict()
- fix_datetime_column()
- validate_and_fix()
"""
[docs]
def __init__(self, logger: Optional[logging.Logger] = None):
import warnings
warnings.warn(
"DataValidator类已弃用,请直接使用模块级函数,如 check_null_values(), fix_null_values() 等",
DeprecationWarning,
stacklevel=2
)
[docs]
def check_null_values(self, df, columns=None, check_types=None):
raise NotImplementedError(
"DataValidator.check_null_values() 已弃用。\n"
"请使用: from datafeed.validation import check_null_values\n"
" results = check_null_values(df, columns, check_types)"
)
[docs]
def check_datetime_column(self, df, date_column, expected_freq=None, check_sorted=True, check_duplicates=True, check_format=True):
raise NotImplementedError(
"DataValidator.check_datetime_column() 已弃用。\n"
"请使用: from datafeed.validation import check_datetime_column\n"
" results = check_datetime_column(df, date_column, expected_freq, check_sorted, check_duplicates, check_format)"
)
[docs]
def fix_null_values(self, df, strategy, columns=None, fill_value=None, inplace=False):
raise NotImplementedError(
"DataValidator.fix_null_values() 已弃用。\n"
"请使用: from datafeed.validation import fix_null_values\n"
" fixed_df = fix_null_values(df, strategy, columns, fill_value, inplace)"
)
[docs]
def drop_duplicates_strict(self, df, subset=None, keep='first', verify_all_fields=True, ignore_cols=None, inplace=False):
raise NotImplementedError(
"DataValidator.drop_duplicates_strict() 已弃用。\n"
"请使用: from datafeed.validation import drop_duplicates_strict\n"
" cleaned_df, report = drop_duplicates_strict(df, subset, keep, verify_all_fields, ignore_cols, inplace)"
)
[docs]
def fix_datetime_column(self, df, date_column, fix_format=True, fix_duplicates='keep_first', fix_sort=True, sort_order='ascending', dedupe_subset=None, verify_all_fields=True, inplace=False):
raise NotImplementedError(
"DataValidator.fix_datetime_column() 已弃用。\n"
"请使用: from datafeed.validation import fix_datetime_column\n"
" fixed_df = fix_datetime_column(df, date_column, fix_format, fix_duplicates, fix_sort, sort_order, dedupe_subset, verify_all_fields, inplace)"
)
[docs]
def validate_and_fix(self, df, validations, inplace=False):
raise NotImplementedError(
"DataValidator.validate_and_fix() 已弃用。\n"
"请使用: from datafeed.validation import validate_and_fix\n"
" fixed_df, report = validate_and_fix(df, validations, inplace)"
)