LLM与程序协作来去重清洗文本格式数据

张开发
2026/5/17 18:32:51 15 分钟阅读
LLM与程序协作来去重清洗文本格式数据
之前探索了LLM与程序协作来结构化文本财报数据https://blog.csdn.net/liliang199/article/details/159834630这里进一步探索LLM和程序协作的方式来融合和去重清洗文本数据。所用示例参考和修改自网络资料。1 去重方案探索一般来说程序负责确定性与效率大模型负责模糊理解与语义判断。大部分实际场景任务都需要通过清晰的职责划分规避大模型在精确计算、大规模数据处理上的弱点同时发挥其在自然语言理解上的优势。针对数据清洗去重这里分别从数据库入库、数据去重阶段入手深度探索LLM和程序的融合。1.1 数据库入库在数据库入库阶段这里采用程序主导 LLM辅助的方式。1Pandas程序清洗用 Pandas 做格式清洗(类型转换、缺失值填充、字段映射)是程序强项稳定高效。2LLM辅助提取LLM从备注、描述等非结构化文本中抽取预定义实体比如人名、地名、金额、日期。该类型的抽取任务采用传统规则难以有效处理规则和程序难以覆盖实体的多种描述方式。3LLM提取注意点LLM可能存在幻觉或不稳定需结构化输出(如JSON Schema、Output structures)与校验逻辑。批量处理大量文本时API 成本与延迟不可忽略可考虑异步批处理或本地小模型。对抽取结果加入置信度评分低置信度字段标记人工复核日期/金额用正则辅助纠错。1.2 数据去重在去重阶段则采用程序快速召回和LLM语义辨别的方式实现数据去重。1程序快速找回这里采用Blocking技术如按姓名、公司名前4个字将比对缩小到小候选集实现快速找回。2LLM语义辨别LLM则负责语义层面的二分类如是否指向同一实体并生成 Golden Record。LLM可处理拼写差异、别名、信息残缺等复杂情况。3潜在风险LLM同一对记录多次调用可能结果不同建议设置温度参数 0 并缓存结果方便溯源。Golden Record 生成需要合并规则如字段优先取非空、最新时间戳可让LLM输出合并后的 JSON再由程序应用规则进行校验。由于这些不确定性所以需要保留去重日志支持人工校验闭环。对高频出现的匹配模式可沉淀为规则。2 去重实例验证这里进一步采用Python OpenAI API SQLite模拟一个客户信息管理系统的去重过程。以下是完整流程示例1. 原始 CSV 数据清洗与 LLM 实体抽取存入数据库。2. 基于 blocking LLM 判断去重生成黄金记录3 基于黄金记录运行实际去重过程。2.1 环境设置参考之前文档LLM环境设置示例代码如下。import os model_name gpt_model_name # LLM名称,比如deepseek-r1, qwen3.5-8b os.environ[OPENAI_API_KEY] gpt_api_key # LLM供应商提供的api key os.environ[OPENAI_BASE_URL] gpt_api_url # LLM供应商提供llm访问api的url from openai import OpenAI client OpenAI()2.2 数据入库这里模拟输入如下三条记录其中记录1和记录2是重复记录。[{id: 1, name: 张三 , company: ABC科技, date: 2023-05-01, description: 张三昨天在北京签了合同金额50000元},{id: 2, name: 张 三, company: ABC科技, date: 2023-05-02, description: 张总与北京分公司达成5万交易},{id: 3, name: 李四, company: XYZ有限公司, date: 2023-06-01, description: 李四在上海支付了1200元},]以下是数据模拟和入库过程这里使用LLM从description字段抽取人名、地名、金额、日期信息。import os import json import sqlite3 import pandas as pd from typing import List, Dict, Any, Tuple import openai # 初始化 OpenAI client openai.OpenAI() # 阶段1数据入库 def clean_and_extract_entities(raw_df: pd.DataFrame) - pd.DataFrame: 1. Pandas 清洗去除首尾空格、统一日期格式、填充缺失 2. LLM 抽取实体从 description 字段抽取人名、地名、金额、日期 # 程序主导清洗 df raw_df.copy() df[name] df[name].str.strip().str.title() df[company] df[company].str.strip().str.upper() df[date] pd.to_datetime(df[date], errorscoerce).dt.date # 对每条记录的 description 调用 LLM 抽取 def extract_entities(text: str) - Dict[str, Any]: if pd.isna(text) or text : return {person: None, place: None, amount: None, date: None} prompt f 从以下文本中抽取实体。返回 JSON 格式键为 person, place, amount, date。 若没有则值为 null。金额保留数字和货币单位日期格式 YYYY-MM-DD。 文本{text} response client.chat.completions.create( modelmodel_name, messages[{role: user, content: prompt}], temperature0, response_format{type: json_object} ) try: return json.loads(response.choices[0].message.content) except: return {person: None, place: None, amount: None, date: None} # 抽取生产环境建议批处理限流 print(run) entities df[description].apply(extract_entities) df[ext_person] entities.apply(lambda x: x.get(person)) df[ext_place] entities.apply(lambda x: x.get(place)) df[ext_amount] entities.apply(lambda x: x.get(amount)) df[ext_date] entities.apply(lambda x: x.get(date)) return df # 生成示例数据 raw_data pd.DataFrame([ {id: 1, name: 张三 , company: ABC科技, date: 2023-05-01, description: 张三昨天在北京签了合同金额50000元}, {id: 2, name: 张 三, company: ABC科技, date: 2023-05-02, description: 张总与北京分公司达成5万交易}, {id: 3, name: 李四, company: XYZ有限公司, date: 2023-06-01, description: 李四在上海支付了1200元}, ]) cleaned_df clean_and_extract_entities(raw_data) print( 入库后数据 ) print(cleaned_df[[name, company, ext_person, ext_place, ext_amount, ext_date]]) # 存入 SQLite conn sqlite3.connect(example.db) cleaned_df.to_sql(records, conn, if_existsreplace, indexFalse)以下是对入库后数据的示例。 入库后数据 name company ext_person ext_place ext_amount ext_date0 张三 ABC科技 张三 北京 50000 元 2024-05-211 张 三 ABC科技 张总 北京分公司 5 万 None2 李四 XYZ有限公司 李四 上海 1200 元 None这步仅完成数据入库未完成数据去重。2.3 识别重复记录这里示例示例重复记录的识别过程。blocking_candidates按公司名称前4个字将记录进行分组生成候选对。大模型判断是否重复当发现重复时进行记录。# 阶段2数据去重 def blocking_candidates(df: pd.DataFrame) - List[Tuple[int, int]]: 简单 blocking按公司名前4个字分组生成候选对 df[block_key] df[company].str[:4] candidates [] for key, group in df.groupby(block_key): ids group.index.tolist() for i in range(len(ids)): for j in range(i1, len(ids)): candidates.append((ids[i], ids[j])) return candidates def llm_deduplicate_pair(row1: pd.Series, row2: pd.Series) - Tuple[bool, Dict]: 大模型判断是否重复并生成黄金记录 prompt f 判断以下两条记录是否指向同一个真实实体人/公司。如果是请生成合并后的黄金记录字段值优先选择非空、更完整、日期更新的。 记录A: {row1.to_dict()} 记录B: {row2.to_dict()} 返回 JSON 格式{{is_duplicate: true/false, golden_record: {{name: ..., company: ..., ...}} }} response client.chat.completions.create( modelmodel_name, messages[{role: user, content: prompt}], temperature0, response_format{type: json_object} ) result json.loads(response.choices[0].message.content) return result[is_duplicate], result.get(golden_record, {}) # 执行去重实际会标记重复并生成黄金表此处简化演示 df_records cleaned_df.reset_index() # 保留原始索引 candidates blocking_candidates(df_records) duplicate_pairs [] for i, j in candidates: is_dup, golden llm_deduplicate_pair(df_records.loc[i], df_records.loc[j]) if is_dup: duplicate_pairs.append((i, j, golden)) print(f\n 发现 {len(duplicate_pairs)} 对重复记录 ) # 实际生产中会合并生成独立黄金表此处省略写入输出如下 发现 1 对重复记录 2.3 运行去重这里根据 LLM 判定的重复对构建连通分量对每个分量内的多条记录合并为一条黄金记录。这里采用规则合并然后生成新的去重后数据表并写入数据库。如果数据格式比较复杂不太好规则合并也可以采用LLM合并多记录。示例代码如下。import networkx as nx # 需要安装: pip install networkx # 真实去重合并重复记录并写入数据库 def merge_records_by_rules(records: List[pd.Series]) - Dict: 规则合并对于同一簇的多条记录按字段策略合并 - name: 取最长非空字符串通常更完整 - company: 取最长非空字符串 - date: 取最新日期 - description: 拼接所有非空描述去重 - ext_person, ext_place, ext_amount, ext_date: 取第一个非空值或投票 merged {} # 需要合并的字段列表及其合并策略 fields [name, company, date, description, ext_person, ext_place, ext_amount, ext_date] for field in fields: values [rec[field] for rec in records if pd.notna(rec[field]) and rec[field] ! ] if not values: merged[field] None continue if field date: # 日期取最新最大 merged[field] max(values) elif field description: # 描述去重后拼接 unique_desc list(dict.fromkeys(values)) # 保持顺序去重 merged[field] | .join(unique_desc) elif field in [name, company]: # 取最长字符串通常信息更全 merged[field] min(values, keylen) else: # 其他字段取第一个非空值 merged[field] values[0] # 保留原始 id 列表供追溯 merged[original_ids] json.dumps([int(rec[id]) for rec in records]) return merged def deduplicate_database(conn, df: pd.DataFrame, duplicate_pairs: List[Tuple[int, int, Dict]]) - pd.DataFrame: 根据重复对执行真实去重返回去重后的 DataFrame并写入数据库。 duplicate_pairs: 每项为 (idx1, idx2, golden_record_from_llm) # 构建图找出连通分量簇 G nx.Graph() for idx1, idx2, _ in duplicate_pairs: G.add_edge(idx1, idx2) # 获取所有连通分量每个分量包含重复记录的索引 components list(nx.connected_components(G)) # 记录哪些索引已经被归入某个分量 all_duplicate_indices set() golden_records [] for comp in components: comp_indices list(comp) all_duplicate_indices.update(comp_indices) # 取出该分量中的所有原始记录 records [df.loc[i] for i in comp_indices] # 合并生成黄金记录规则合并也可改用 LLM 合并 golden merge_records_by_rules(records) golden_records.append(golden) # 非重复的单例记录不在任何分量中的索引 singleton_indices [i for i in df.index if i not in all_duplicate_indices] dedup_df pd.DataFrame(golden_records) # 黄金记录 # 添加单例记录 for i in singleton_indices: dedup_df pd.concat([dedup_df, df.loc[[i]]], ignore_indexTrue) # 重新生成自增 id可选 dedup_df.reset_index(dropTrue, inplaceTrue) dedup_df[new_id] dedup_df.index 1 # 写回数据库替换原表或新建表 dedup_df.to_sql(records_dedup, conn, if_existsreplace, indexFalse) print(f去重完成原表 {len(df)} 条记录 - 去重后 {len(dedup_df)} 条记录) return dedup_df # 执行真实去重接上文 duplicate_pairs 已得到 if duplicate_pairs: dedup_df deduplicate_database(conn, df_records, duplicate_pairs) print(\n 去重后的黄金记录表 ) print(dedup_df[[new_id, name, company, ext_amount, original_ids]]) else: print(未发现重复记录无需去重。)输出如下所示。去重完成原表 3 条记录 - 去重后 2 条记录 去重后的黄金记录表 new_id name company ext_amount original_ids0 1 张三 ABC科技 50000 元 [1, 2]1 2 李四 XYZ有限公司 1200 元 NaN此时我们已经基于LLM和程序规则的协作完成了重复记录的识别、去重和重新构建入库。reference---LLM如何与程序协作来结构化文本财报数据https://blog.csdn.net/liliang199/article/details/159834630如何使用向量库faiss和LLM判断问题是否被记录https://blog.csdn.net/liliang199/article/details/159476341LLM如何基于tools对同一数据不同问题进行查询https://blog.csdn.net/liliang199/article/details/159767913

更多文章