Commit 18a362c4 authored by Bart刘笑东's avatar Bart刘笑东

初始化代码

parent 068062da
import logging
import time
from datetime import datetime, timedelta
from online_streaming import online_llm_streaming
import json
import re
import traceback
import os
import csv
from typing import List, Tuple, Dict, Any, Optional, Union, Set
from dataclasses import dataclass, field
from tenacity import retry, stop_after_attempt, wait_exponential
from pathlib import Path
import yaml
import pandas as pd
import ast
from settings import SALES_VIOLATIONS, SALES_QUALITY_METRICS, LLMConfig, LogConfig
import concurrent.futures
import threading
import random
import jieba
# 添加线程锁用于日志记录
log_lock = threading.Lock()
def thread_safe_log(logger_func, message):
"""线程安全的日志记录"""
with log_lock:
logger_func(message)
def setup_logging():
"""配置日志系统"""
# 创建logs目录(如果不存在)
log_dir = 'logs'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 生成日志文件名(使用当前时间戳)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = os.path.join(log_dir, f'sales_analysis_{timestamp}.log')
# 配置日志格式和处理器
logging.basicConfig(
level=getattr(logging, LogConfig.LEVEL),
format=LogConfig.FORMAT,
handlers=[
logging.FileHandler(log_file, encoding=LogConfig.ENCODING),
logging.StreamHandler()
]
)
# 设置日志
logger = logging.getLogger(__name__)
logger.info(f"日志文件创建于: {log_file}")
return logger
# 配置日志
logger = setup_logging()
@dataclass
class DialogueInfo:
"""对话信息数据类"""
group_id: str
customer_name: str
sales_staff_name: str # 改为更准确的字段名
dialogue_content: str
跟进日期: Optional[str] = None
@dataclass
class AnalysisResult:
"""分析结果数据类"""
def __init__(self,
会话组编号: str,
客户名称: str,
销售人员名称: str, # 改为更准确的字段名
客户现状: str,
客户需求: str,
客户潜在需求: str,
客户意向等级: str,
销售卡点: str,
次日动作: str,
客户购买潜力: str,
顾问卡点: str,
客户购买潜力依据: str,
顾问卡点依据: str,
跟进日期: str = None,
round: int = None, # 添加round字段
):
# 初始化所有属性
self.会话组编号 = 会话组编号
self.客户名称 = 客户名称
self.销售人员名称 = 销售人员名称
self.客户现状 = 客户现状
self.客户需求 = 客户需求
self.客户潜在需求 = 客户潜在需求
self.客户意向等级 = 客户意向等级
self.销售卡点 = 销售卡点
self.次日动作 = 次日动作
self.客户购买潜力 = 客户购买潜力
self.顾问卡点 = 顾问卡点
self.客户购买潜力依据 = 客户购买潜力依据 # 添加依据字段
self.顾问卡点依据 = 顾问卡点依据 # 添加依据字段
self.跟进日期 = 跟进日期
self.round = round # 添加round属性
def to_dict(self) -> Dict[str, Any]:
return {
"会话组编号": self.会话组编号,
"客户名称": self.客户名称,
"销售人员名称": self.销售人员名称, # 改为更准确的字段名
"客户现状": self.客户现状,
"客户需求": self.客户需求,
"客户潜在需求": self.客户潜在需求,
"销售卡点": self.销售卡点,
"次日动作": self.次日动作,
"客户购买潜力": self.客户购买潜力,
"顾问卡点": self.顾问卡点,
"客户购买潜力依据": self.客户购买潜力依据, # 添加依据字段
"顾问卡点依据": self.顾问卡点依据, # 添加依据字段
"跟进日期": self.跟进日期,
"round": self.round # 添加round字段
}
def to_csv_row(self) -> List[str]:
"""将结果转换为CSV行格式"""
return [
str(self.会话组编号),
str(self.客户名称),
str(self.销售人员名称), # 改为更准确的字段名
str(self.客户现状),
str(self.客户需求),
str(self.客户潜在需求),
str(self.销售卡点),
str(self.次日动作),
str(self.客户购买潜力),
str(self.顾问卡点),
str(self.跟进日期),
str(self.客户购买潜力依据), # 添加依据字段
str(self.顾问卡点依据), # 添加依据字段
str(self.round) # 添加round字段
]
def validate_dialogue_content(content: str) -> bool:
"""验证对话内容格式"""
if not content or len(content.strip()) < 10:
return False
# 检查基本格式要求
required_patterns = [
# 时间戳格式
r'\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}',
# 对话者标识(包括更多格式)
r'(?:销售组或客户|客户成功组|银河VIP经理)[-企微通话数据]*[-单聊|-群聊]*[::]',
# 对话内容
r'[::].+',
# 轮次标记格式
r'round\d+:\d{4}-\d{2}-\d{2}'
]
# 至少要匹配一个基本格式
basic_format_matched = False
for pattern in required_patterns:
if re.search(pattern, content):
basic_format_matched = True
break
if not basic_format_matched:
return False
# 检查对话完整性
lines = content.strip().split('\n')
valid_lines = 0
for line in lines:
line = line.strip()
if not line:
continue
# 检查是否是标准对话行
if re.search(r'\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\s+[^::]+[::].+', line):
valid_lines += 1
continue
# 检查是否是轮次标记行
if re.match(r'round\d+:\d{4}-\d{2}-\d{2}', line):
valid_lines += 1
continue
# 检查是否是引用消息
if '这是一条引用/回复消息:' in line:
valid_lines += 1
continue
# 检查是否是引用内容
if line.startswith('"') and line.endswith('"'):
valid_lines += 1
continue
# 检查是否是分隔线
if line.startswith('------'):
valid_lines += 1
continue
# 检查是否是相关文件/图片/链接
if line in ['相关文件', '相关图片', '相关链接']:
valid_lines += 1
continue
# 至少要有两个有效的对话行(通常是一问一答)
return valid_lines >= 2
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def extract_dialogue_groups(content: str) -> List[DialogueInfo]:
"""从内容中提取对话组,带重试机制"""
if not content:
raise ValueError("输入内容为空")
# 使用更精确的正则表达式来匹配对话组
groups = re.split(r'\n(?=对话组[::]\s*\d+\s*\n)', content.strip())
dialogue_groups = []
total_groups = 0
for group in groups:
try:
# 跳过空组
if not group.strip():
continue
# 提取对话组编号
group_match = re.match(r'^对话组[::]\s*(\d+)\s*\n', group)
if not group_match:
continue
total_groups += 1
group_id = group_match.group(1)
# 获取对话内容(去掉对话组标记行)
dialogue_content = re.sub(r'^对话组[::]\s*\d+\s*\n', '', group).strip()
if not dialogue_content:
logger.warning(f"对话组 {group_id} 内容为空")
continue
# 验证对话内容格式
if not validate_dialogue_content(dialogue_content):
logger.warning(f"对话组 {group_id} 格式验证失败")
continue
# 检查是否包含有效时间戳
if not re.search(r'\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}', dialogue_content):
logger.warning(f"对话组 {group_id} 不包含有效时间戳格式")
continue
# 检查是否包含有效对话者标识
if not re.search(
r'(?:销售组|客户(?:\(\d+\))?|客户成功组|银河VIP经理|生活管家|转介运营组|生活顾问1组|华北销售五部|华北销售四部|华南销售三部|华南销售二部|华南销售六部)[-企微通话数据|-企微|-语音通话]*[-单聊|-群聊]*[::]',
dialogue_content):
logger.warning(f"对话组 {group_id} 不包含有效对话者标识")
continue
# 处理按轮次分组的对话内容
dialogue_lines = dialogue_content.split('\n')
processed_lines = []
current_round = None
跟进日期 = None
# 检查是否包含round标记
has_round_markers = any(re.match(r'round\d+:\d{4}-\d{2}-\d{2}', line) for line in dialogue_lines)
if has_round_markers:
# 如果包含round标记,按round分割
rounds = re.split(r'\n(?=round\d+:\d{4}-\d{2}-\d{2})', dialogue_content)
logger.info(f"对话组 {group_id} 包含 {len(rounds)} 轮对话")
for round_content in rounds:
if not round_content.strip():
continue
# 提取round的日期
round_match = re.match(r'round\d+:(\d{4}-\d{2}-\d{2})', round_content)
if round_match:
跟进日期 = round_match.group(1)
logger.info(f"对话组 {group_id} 发现对话轮次,日期: {跟进日期}")
# 从对话内容中提取客户ID
customer_id = None
for line in dialogue_lines:
customer_match = re.search(r'客户\((\d+)\)', line)
if customer_match:
customer_id = customer_match.group(1)
break
# 从对话内容中提取销售人员名称
sales_name = "银河销售人员" # 默认值
for line in dialogue_lines:
sales_match = re.search(r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\s+[^-]+)', line)
if sales_match and "客户" not in sales_match.group(1):
name_parts = sales_match.group(1).split()
if len(name_parts) >= 3:
sales_name = name_parts[2]
break
dialogue_groups.append(DialogueInfo(
group_id=group_id,
customer_name=f"客户{customer_id}" if customer_id else f"客户{group_id}",
sales_staff_name=sales_name,
dialogue_content=dialogue_content,
跟进日期=跟进日期
))
logger.debug(f"成功处理对话组 {group_id}")
except Exception as e:
logger.error(f"处理对话组时发生错误: {str(e)}")
logger.error(traceback.format_exc())
continue
successful_groups = len(dialogue_groups)
if successful_groups == 0:
logger.warning("未能成功提取任何对话组")
else:
logger.info(f"成功提取 {successful_groups}/{total_groups} 个对话组")
return dialogue_groups
def format_dialogue(dialogue: str, group_id: str) -> DialogueInfo:
"""格式化对话内容"""
try:
# 提取客户名称
customer_name = f"客户{group_id}"
# 提取销售名称 - 改进的逻辑
sales_name = "银河销售人员" # 默认值
sales_department = None # 销售部门
# 尝试从对话中提取销售人员信息
lines = dialogue.split('\n')
for line in lines:
# 跳过空行和客户的发言
if not line or "客户" in line:
continue
# 首先去除时间戳(如果存在)
parts = line.split(' ', 2) # 最多分割2次
if len(parts) >= 2:
# 检查第一部分是否是时间戳格式(YYYY-MM-DD HH:MM:SS)
if re.match(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', ' '.join(parts[:2])):
line = parts[2] if len(parts) > 2 else ''
# 匹配销售名称(在冒号或者说话内容之前)
sales_match = re.match(r'^([^::]+)[::](.*)', line)
if sales_match:
potential_name = sales_match.group(1).strip()
# 清理销售名称中的特殊标记
cleaned_name = re.sub(r'-通话数据|-企微-单聊|-企微-群聊|-企微|通话数据|-.*', '', potential_name)
cleaned_name = cleaned_name.strip()
# 如果清理后的名称不为空且不是特殊字符
if cleaned_name and not re.match(r'^[\d\s]+$', cleaned_name):
# 检查是否包含部门信息
dept_match = re.search(r'(华[东南北]销售[一二三四五六七八九十]+部|销售[一二三四五六七八九十]+部)',
cleaned_name)
if dept_match:
sales_department = dept_match.group(1)
# 从名称中移除部门信息
cleaned_name = re.sub(
r'(华[东南北]销售[一二三四五六七八九十]+部|销售[一二三四五六七八九十]+部)', '',
cleaned_name).strip()
# 如果清理后的名称不是通用词,则使用该名称
if cleaned_name.lower() not in ['销售', '客服', 'sales', 'service', 'sop组', 'sop', '银河']:
sales_name = cleaned_name
if sales_department:
sales_name = f"{sales_department}-{sales_name}"
break
elif sales_department:
# 如果只有部门信息,使用部门作为销售名称
sales_name = sales_department
break
# 提取真实的会话组编号
real_group_id = group_id
for line in lines:
# 尝试从对话内容中匹配会话组编号
id_match = re.search(r'销售组或客户\((\d+)\)', line)
if id_match:
real_group_id = id_match.group(1)
customer_name = f"客户{real_group_id}"
break
return DialogueInfo(
group_id=real_group_id,
customer_name=customer_name,
sales_staff_name=sales_name,
dialogue_content=dialogue
)
except Exception as e:
logger.error(f"格式化对话时发生错误: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
return DialogueInfo(
group_id=group_id,
customer_name=f"客户{group_id}",
sales_staff_name="银河销售人员",
dialogue_content=dialogue
)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def read_source_file(target_dir) -> str:
"""读取源文件内容,带重试机制"""
try:
with open(os.path.join(target_dir, 'source.md'), 'r', encoding='utf-8') as f:
content = f.read()
if not content:
raise ValueError("文件内容为空")
logger.info(f"文件总字符数: {len(content)}")
logger.info(f"文件总行数: {content.count(chr(10)) + 1}")
return content
except Exception as e:
logger.error(f"读取文件时发生错误: {str(e)}")
raise
def get_analysis_prompt(dialogue_content: str, corpus_examples: List[Tuple[int, str, str, str]]) -> str:
"""构建分析提示词"""
# 用户概况
customer_profile = """ """
# 历史对话摘要
history_dialog_summary = """ """
# 添加新的标签定义
tag_definitions = """ """
# 运营提供的背景知识
operate_context = '''
一、其中'客户购买潜力'如下,并且只可以从中进行选择:
1.意向客户可跟进-考虑生意:客户有意向通过在香港开设实体店(如餐饮、零售等)作为申请香港身份或续签的方式,具备实际业务需求,可重点跟进。
2.意向客户可跟进-考虑工位:客户希望通过设立在香港的办公室(租赁办公桌/工位)来满足身份申请或续签的条件,有实际办公需求。
3.意向客户可跟进-只考虑身份续签:客户当前已有香港身份,关注点集中在如何合法续签,暂未表露其他业务意图,可跟进其续签所需材料或方案。
4.意向客户可跟进-考虑自雇续签:客户希望以自雇(如注册公司担任董事/高管)方式完成香港身份续签,可能需协助设计合理业务架构。
5.意向客户可跟进-考虑教育择校:客户关注子女在香港接受教育。仅限于:幼儿园、小学、中学。
6.意向客户可跟进-需求不明确:客户表达出兴趣但未明确具体需求(如生意、工位、身份等),需进一步沟通挖掘其真实意向。
7.意向客户可跟进-有房产意向:客户对香港房地产有投资或置业的兴趣,可能与身份申请、教育等诉求相关联,需关联其他业务场景挖掘需求。
8.孵化类客户-保险受雇:客户计划通过受雇于香港保险公司获取香港身份,但真实性或可执行性仍需核实,处于业务孵化阶段。
9.孵化类客户-想香港身份挂靠:客户无实际业务运营意向,仅希望通过挂靠公司方式获取香港身份,存在合规风险,需谨慎评估。
10.孵化类客户-想找工作:客户希望以受雇身份(特别是专才/受雇签证)在香港工作,尚未明确雇主或工作机会,处于初步探索阶段。
11.孵化类客户-受雇真实性待挖掘:客户声称以受雇方式留港,但身份真实性或职位匹配度存疑,需要进一步验证其工作背景及雇主情况。
12.孵化类客户-已有续签方案_非银河:客户已有其他机构的身份续签方案,非银河公司提供,但仍可争取合作或二次转化。
13.其他-不知道需求:客户目前未表达任何具体需求,完全空白状态,需要从身份、教育、投资等方向全面试探引导。
14.已复购商务未至永居-观望:客户已购买商务身份产品,但尚未达到永居年限,目前处于观望状态,可能对服务效果或政策持观望态度。
15.已复购商务未至永居-考虑商务:客户已购买商务身份,目前考虑继续发展香港业务(如注册新公司、开实体店等),有后续合作潜力。
16.其他-生意星专项:特殊项目客户,可能参与公司针对香港创业/生意的专项扶持计划(如“生意星”项目),需对接专项资源。
17.已复购商务至永居-无需求:客户成功获香港永居身份,目前无进一步业务或身份服务需求,维持关系即可。
18.已复购商务至永居-考虑生意:客户已获永居,但仍希望继续在香港开展或拓展生意,可能有实体投资、选址、注册等需求。
19.已复购商务至永居-考虑工位:客户已获永居,考虑继续维持或扩大香港办公场所,可能用于实际业务运营或身份合规。
20.真实在港运营公司-无需求:客户拥有真实在港运营公司,当前无额外身份或业务服务需求,重点在维护客户关系。
21.真实在港运营公司-考虑工位:客户公司真实运营,但有扩大办公空间或改善办公环境的需求,可提供工位租赁等服务。
22.真实在港运营公司-考虑商务:客户公司实际运作中,考虑进一步拓展业务(如新公司设立、业务转型等),可提供商务咨询支持。
23.长期运营-已真实受雇于香港:客户已在香港有真实受雇工作,身份稳定,业务合作空间有限,但可作为优质资源维护。
24.长期运营-放弃身份:客户主动放弃香港身份。
25.长期运营-永居客户:客户已取得香港永居,身份稳定,可作为高质量客户维护,或引导其在港投资、置业等。
26.长期运营-留学在读中:客户或其子女在香港留学,已具身份或签证,关注教育服务、身份衔接等,需定期触达。
27.长期运营-专才:客户通过“输入内地人才计划”等方式来港,具备专业背景和工作履历,身份稳定,有较高服务延展空间。
二、其中'顾问卡点'如下,并且只可以从中进行选择:
1.价格异议
2.竞品对比
3.购买紧迫性
4.有意向但客户联系不上
5.续签永居整体成本超预期
6.未考虑好是否维持身份
7.合同条款/退费问题
8.与家人或朋友商量
9.担忧投入及续签成功率
10.其他
'''
# 构建JSON示例
json_example = '''{"会话session_id": "例如:916713",
"客户名称":"实际上就是客户(9873)中括号中的数字提取出来,组合成为客户9873,以此类推。例如:客户916713",
"跟进日期":"就是时间戳中日期的部分,年月日,没有的话就写无。例如:2025年05月17日",
"销售人员名称":"就是时间戳中销售人员的名字,如果是华北,华东,华南,华西开头的销售人员,优先选择。例如:华南销售六部",
"客户现状":"描述客户当前业务、痛点、外部环境等动态状态等",
"客户需求":"客户明确提出的需求或期望等且还没有被解决的,并且给出判断依据和原对话,不可以低于100个字",
"客户潜在需求": "基于上下文隐含的、更深层次需求,且还没有被解决的,并给出判断依据和原对话。不可以低于100个字",
"客户意向等级":"根据对话内容,判断客户的意向等级(S/A/B/C/孵化期/不愿意沟通),给出判断依据和原对话。例如:S,给出判断依据和原对话",
"销售卡点":"指出在销售与客户沟通过程中出现的阻碍或异议,且还没有被解决的,并给出判断依据和原对话。不可以低于100个字",
"次日动作":"给出销售人员下一步动作的建议,且还没有被解决的,不可以低于100个字",
"客户购买潜力":"",
"顾问卡点":"",
"客户购买潜力依据":"",
"顾问卡点依据":""
}
'''
# 构建基础提示词
prompt = f'''你是一个香港身份业务的高级营销专家,请将下面```中的对话内容和重点业务知识理解进行全面深度分析。
```
重点业务知识理解:
{operate_context}
对话内容:
{dialogue_content}
```
进行全面深度分析后,输出要求如下:
1.严格按照{json_example}的json格式输出,拒绝```json```的形式。
2.输出的内容中:
(1) 客户购买潜力:需要在指定范围内进行选取客户购买潜力,最多可选择三个,按照优先级排序,必须使用纯文本用逗号隔开。
(2) 客户购买潜力依据: 根据(1)选取出来的客户购买潜力给出一一对应的客户语料原文和分析依据。(原文依据必须保留时间按戳)
客户购买潜力依据的输出形式如下:
长期运营-已真实受雇于香港:2001-01-01 11:11:11 客户(927966) 客户语料原文。分析依据:针对长期运营-已真实受雇于香港对应的客户语料原文进行分析的依据内容;
...
(3) 顾问卡点:需要在指定范围内进行选取顾问卡点,最多可选择两个,按照优先级排序,必须使用纯文本用逗号隔开
(4) 顾问卡点依据: 根据(3)选取出来的顾问卡点,给出一一对应的对话语料原文依据。(原文依据必须保留有时间按戳)
顾问卡点依据的输出形式形式如下:
价格异议:2001-01-01 11:11:11 客户(927966) 客户语料原文。分析依据:针对顾问卡点价格异议对应的客户语料原文进行分析依据内容;
...
'''
return prompt
def process_dialogue_group(dialogue_info: DialogueInfo) -> List[AnalysisResult]:
"""处理单个对话组"""
try:
# 按round分割对话内容
rounds = re.split(r'\n(?=round\d+:\d{4}-\d{2}-\d{2})', dialogue_info.dialogue_content)
results = []
logger.info(f"开始处理对话组 {dialogue_info.group_id},共 {len(rounds)} 轮对话")
for round_idx, round_content in enumerate(rounds, 1):
if not round_content.strip():
continue
# 提取round的日期
round_match = re.match(r'round\d+:(\d{4}-\d{2}-\d{2})', round_content)
if not round_match:
logger.warning(f"对话组 {dialogue_info.group_id} 的第 {round_idx} 轮对话没有有效的日期标记")
continue
跟进日期 = round_match.group(1)
logger.info(f"处理对话组 {dialogue_info.group_id} 的第 {round_idx} 轮对话,日期: {跟进日期}")
# 移除round标记行
content_without_round = re.sub(r'^round\d+:\d{4}-\d{2}-\d{2}\n', '', round_content).strip()
# 检查对话内容是否为空
if not content_without_round:
logger.warning(f"对话组 {dialogue_info.group_id} 的第 {round_idx} 轮对话内容为空")
continue
# 调用大模型进行分析
analysis_result = analyze_dialogue_with_gpt(content_without_round)
if not analysis_result:
logger.warning(f"对话组 {dialogue_info.group_id} 在日期 {跟进日期} 的分析结果为空")
# 为这一轮创建默认结果
result = AnalysisResult(
会话组编号=dialogue_info.group_id,
客户名称=dialogue_info.customer_name,
销售人员名称=dialogue_info.sales_staff_name,
客户现状="对话过短,暂无更多信息",
客户需求="对话过短,暂无更多信息",
客户潜在需求="对话过短,暂无更多信息",
客户意向等级="对话过短,暂无更多信息",
销售卡点="对话过短,暂无更多信息",
次日动作="对话过短,暂无更多信息",
客户购买潜力="对话过短,暂无更多信息",
顾问卡点="对话过短,暂无更多信息",
客户购买潜力依据="对话过短,暂无更多信息",
顾问卡点依据="对话过短,暂无更多信息",
跟进日期=跟进日期,
round=round_idx
)
results.append(result)
continue
# 创建分析结果对象
result = AnalysisResult(
会话组编号=dialogue_info.group_id,
客户名称=dialogue_info.customer_name,
销售人员名称=dialogue_info.sales_staff_name,
客户现状=analysis_result.get("客户现状", ""),
客户需求=analysis_result.get("客户需求", ""),
客户潜在需求=analysis_result.get("客户潜在需求", ""),
客户意向等级=analysis_result.get("客户意向等级", ""),
销售卡点=analysis_result.get("销售卡点", ""),
次日动作=analysis_result.get("次日动作", ""),
客户购买潜力=analysis_result.get("客户购买潜力", ""),
顾问卡点=analysis_result.get("顾问卡点", ""),
客户购买潜力依据=analysis_result.get("客户购买潜力依据", ""),
顾问卡点依据=analysis_result.get("顾问卡点依据", ""),
跟进日期=跟进日期,
round=round_idx
)
results.append(result)
logger.info(f"对话组 {dialogue_info.group_id} 的第 {round_idx} 轮对话分析完成")
if not results:
logger.warning(f"对话组 {dialogue_info.group_id} 没有生成任何分析结果")
# 返回默认结果
return [get_default_analysis_result(dialogue_info)]
logger.info(f"对话组 {dialogue_info.group_id} 处理完成,共生成 {len(results)} 条分析结果")
return results
except Exception as e:
thread_safe_log(logging.error, f"处理对话组时出错: {str(e)}")
thread_safe_log(logging.error, traceback.format_exc())
return [get_default_analysis_result(dialogue_info)]
def get_default_analysis_result(dialogue_info: DialogueInfo) -> AnalysisResult:
"""获取默认的分析结果"""
return AnalysisResult(
会话组编号=dialogue_info.group_id,
客户名称=dialogue_info.customer_name,
销售人员名称=dialogue_info.sales_staff_name,
客户现状="分析失败",
客户需求="分析失败",
客户潜在需求="分析失败",
客户意向等级="分析失败",
销售卡点="分析失败",
次日动作="分析失败",
客户购买潜力="分析失败",
顾问卡点="分析失败",
客户购买潜力依据="分析失败",
顾问卡点依据="分析失败",
跟进日期=dialogue_info.跟进日期,
round=None
)
def write_result_to_csv(results: List[AnalysisResult], output_file: str):
"""将分析结果写入CSV文件"""
try:
with open(output_file, 'w', newline='', encoding='utf-8-sig') as f:
fieldnames = [
'会话组编号', '客户名称', '销售人员名称',
'客户现状', '客户需求', '客户潜在需求', '客户意向等级', '销售卡点', '次日动作', '客户购买潜力',
'顾问卡点', '客户购买潜力依据', '顾问卡点依据', '跟进日期', 'round']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
# 按会话组编号、round和跟进日期排序结果
sorted_results = sorted(results, key=lambda x: (
int(x.会话组编号) if x.会话组编号.isdigit() else float('inf'), # 会话组编号
x.round if x.round is not None else float('inf'), # round
x.跟进日期 if x.跟进日期 else '' # 跟进日期
))
for result in sorted_results:
# 如果跟进日期为空,使用默认值
跟进日期 = result.跟进日期 if result.跟进日期 else ''
row = {
'会话组编号': result.会话组编号,
'客户名称': result.客户名称,
'销售人员名称': result.销售人员名称,
'客户现状': result.客户现状,
'客户需求': result.客户需求,
'客户潜在需求': result.客户潜在需求,
'客户意向等级': result.客户意向等级,
'销售卡点': result.销售卡点,
'次日动作': result.次日动作,
'客户购买潜力': result.客户购买潜力,
'顾问卡点': result.顾问卡点,
'客户购买潜力依据': result.客户购买潜力依据,
'跟进日期': 跟进日期,
'顾问卡点依据': result.顾问卡点依据,
# 添加round字段
'round': result.round
}
writer.writerow(row)
logger.info(f"结果已成功写入文件: {output_file}")
logger.info(f"共写入 {len(results)} 条记录")
except Exception as e:
logger.error(f"写入CSV文件时发生错误: {str(e)}")
logger.error(traceback.format_exc())
@retry(stop=stop_after_attempt(2), wait=wait_exponential(multiplier=1, min=4, max=10))
def online_llm_streaming(prompt: str, request_timeout: int = 300) -> str:
"""使用在线流式LLM服务处理提示词,支持并发
Args:
prompt: 提示词文本
request_timeout: 请求超时时间(秒)
Returns:
str: LLM返回的结果
"""
try:
# 使用已导入的online_streaming模块
from online_streaming import online_llm_streaming as llm_service
# 创建实例并调用
ai_generate = llm_service(
input_query=prompt,
# api_key="app-Ipg9sBRE3FRKYX5TMVO6OthV", app-nRiSkZij2Bzzgf11VnFAFXs7
api_key="app-nRiSkZij2Bzzgf11VnFAFXs7",
route="/workflows/run",
response_mode="blocking"
)
# 运行并获取结果
response = ai_generate.run(timeout=request_timeout)
if not response:
thread_safe_log(logger.error, "AI生成返回空结果")
return "{}"
return response
except Exception as e:
thread_safe_log(
logger.error,
f"调用在线LLM服务时发生错误: {str(e)}"
)
thread_safe_log(logger.error, traceback.format_exc())
return "{}"
def analyze_dialogue_with_gpt(dialogue_content: str) -> Dict[str, Any]:
"""使用GPT分析对话内容"""
try:
# 设置更小的分段大小以确保不超过token限制
max_chunk_size = 120000 # 大约对应40000个token
# 检查内容长度并决定是否需要分段
if len(dialogue_content) > max_chunk_size:
logger.info(f"对话内容超过{max_chunk_size}字符,进行分段处理")
# 按对话行分割内容
dialogue_lines = dialogue_content.split('\n')
chunks = []
current_chunk = []
current_size = 0
for line in dialogue_lines:
line_size = len(line) + 1 # +1 for newline
if current_size + line_size > max_chunk_size and current_chunk:
chunks.append('\n'.join(current_chunk))
current_chunk = [line]
current_size = line_size
else:
current_chunk.append(line)
current_size += line_size
if current_chunk:
chunks.append('\n'.join(current_chunk))
logger.info(f"对话被分为{len(chunks)}段进行处理")
# 处理每个分段
results = []
for i, chunk in enumerate(chunks, 1):
logger.info(f"处理第{i}/{len(chunks)}段内容")
prompt = get_analysis_prompt(chunk, [])
response = online_llm_streaming(prompt)
chunk_result = extract_json_from_text(response)
if chunk_result and all(k in chunk_result for k in
["客户现状", "客户需求", "客户潜在需求", "客户意向等级", "销售卡点", "次日动作",
"客户购买潜力", "顾问卡点", "客户购买潜力依据", "顾问卡点依据"]):
results.append(chunk_result)
# 合并所有分段的结果
if results:
# 初始化合并结果
merged_result = results[0].copy()
# 收集所有分段的结果
all_customer_status = []
all_customer_needs = []
all_customer_potential_needs = []
all_customer_intention_levels = []
all_sales_card_points = []
all_next_actions = []
for result in results:
all_customer_status.append(result.get("客户现状", ""))
all_customer_needs.append(result.get("客户需求", ""))
all_customer_potential_needs.append(result.get("客户潜在需求", ""))
all_customer_intention_levels.append(result.get("客户意向等级", ""))
all_sales_card_points.append(result.get("销售卡点", ""))
all_next_actions.append(result.get("次日动作", ""))
all_customer_potential = result.get("客户购买潜力", "")
all_customer_card = result.get("顾问卡点", "")
all_customer_potential_reason = result.get("客户购买潜力依据", "")
all_customer_card_reason = result.get("顾问卡点依据", "")
# 合并结果,使用最新的非空值
merged_result["客户现状"] = all_customer_status[-1] if all_customer_status else ""
merged_result["客户需求"] = all_customer_needs[-1] if all_customer_needs else ""
merged_result["客户潜在需求"] = all_customer_potential_needs[-1] if all_customer_potential_needs else ""
merged_result["客户意向等级"] = all_customer_intention_levels[
-1] if all_customer_intention_levels else ""
merged_result["销售卡点"] = all_sales_card_points[-1] if all_sales_card_points else ""
merged_result["次日动作"] = all_next_actions[-1] if all_next_actions else ""
merged_result["客户购买潜力"] = all_customer_potential if all_customer_potential else ""
merged_result["顾问卡点"] = all_customer_card if all_customer_card else ""
merged_result["客户购买潜力依据"] = all_customer_potential_reason if all_customer_potential_reason else ""
merged_result["顾问卡点依据"] = all_customer_card_reason if all_customer_card_reason else ""
return merged_result
return {}
else:
# 内容未超过限制,只调用一次
prompt = get_analysis_prompt(dialogue_content, [])
response = online_llm_streaming(prompt)
# result = extract_json_from_text(response)
try:
result = json.loads(response.replace('\n',''))
except Exception as e:
logger.error(f"提取JSON时发生错误: {str(e)}")
logger.error(traceback.format_exc())
print('出错的response:',response)
result = {}
# 确保结果包含所有必要字段
if result:
if "客户现状" not in result:
result["客户现状"] = ""
if "客户需求" not in result:
result["客户需求"] = ""
if "客户潜在需求" not in result:
result["客户潜在需求"] = ""
if "客户意向等级" not in result:
result["客户意向等级"] = ""
if "销售卡点" not in result:
result["销售卡点"] = ""
if "次日动作" not in result:
result["次日动作"] = ""
if "客户购买潜力" not in result:
result["客户购买潜力"] = ""
if "顾问卡点" not in result:
result["顾问卡点"] = ""
if "客户购买潜力依据" not in result:
result["客户购买潜力依据"] = ""
if "顾问卡点依据" not in result:
result["顾问卡点依据"] = ""
return result
return {}
except Exception as e:
logger.error(f"分析对话时发生错误: {str(e)}")
logger.error(traceback.format_exc())
return {}
def merge_analysis_results(results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""合并多个分析结果
Args:
results: 分析结果列表
Returns:
合并后的结果
"""
if not results:
return {}
# 使用第一个结果作为基础
merged = results[0].copy()
# 处理新增的标签字段
tag_fields = ["客户现状", "客户需求", "客户潜在需求", "客户意向等级", "销售卡点", "次日动作", "客户购买潜力",
"顾问卡点", "客户购买潜力依据", "顾问卡点依据"]
for field in tag_fields:
# 使用最后一个非空值
for result in reversed(results):
if field in result and result[field]:
merged[field] = result[field]
break
return merged
def process_dialogue_batch(dialogues: List[DialogueInfo], max_workers: int = 10) -> List[AnalysisResult]:
"""并发处理对话组
Args:
dialogues: 对话组列表
max_workers: 最大并发数
Returns:
处理结果列表
"""
results = []
total = len(dialogues)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_dialogue = {
executor.submit(process_dialogue_group, dialogue): dialogue
for dialogue in dialogues
}
# 处理完成的任务
completed = 0
for future in concurrent.futures.as_completed(future_to_dialogue):
dialogue = future_to_dialogue[future]
try:
result = future.result()
if result:
results.extend(result)
completed += 1
thread_safe_log(
logger.info,
f"处理进度: {completed}/{total} ({completed / total * 100:.1f}%)"
)
except Exception as e:
thread_safe_log(
logger.error,
f"处理对话组 {dialogue.group_id} 时发生错误: {str(e)}"
)
thread_safe_log(logger.error, traceback.format_exc())
return results
def extract_json_from_text(text: str) -> Dict[str, Any]:
"""从文本中提取JSON内容
Args:
text: 包含JSON的文本
Returns:
解析后的JSON字典
"""
try:
# 首先尝试解析API响应
try:
api_response = json.loads(text)
if isinstance(api_response, dict) and 'data' in api_response:
if 'outputs' in api_response['data'] and 'output' in api_response['data']['outputs']:
output_str = api_response['data']['outputs']['output']
try:
# 清理JSON字符串
output_str = re.sub(r'[\n\r\t]', ' ', output_str) # 移除换行和制表符
output_str = re.sub(r'\s+', ' ', output_str) # 合并多个空格
output_str = re.sub(r',\s*([}\]])', r'\1', output_str) # 移除末尾多余的逗号
output_str = re.sub(r'。(?=\s*[}\]])', '.', output_str) # 将中文句号转换为英文句号
output_str = re.sub(r'E:\s*无意向', 'E', output_str) # 修正意向标签格式
result = json.loads(output_str)
# 验证必要字段
required_fields = ["会话session_id", "客户名称", "销售人员名称",
"客户现状", "客户需求", "客户潜在需求", "客户意向等级",
"销售卡点", "次日动作", "客户购买潜力", "顾问卡点",
"客户购买潜力依据", "顾问卡点依据"]
if all(field in result for field in required_fields):
return result
else:
logger.error("JSON缺少必要字段")
return {}
except json.JSONDecodeError as e:
logger.error(f"解析output字段中的JSON时出错: {str(e)}")
return {}
except:
pass
# 如果API响应解析失败,尝试直接查找JSON对象
json_match = re.search(r'\{[\s\S]*?\}', text)
if json_match:
json_str = json_match.group(0)
# 清理JSON字符串
json_str = re.sub(r'[\n\r\t]', ' ', json_str)
json_str = re.sub(r'\s+', ' ', json_str)
json_str = re.sub(r',\s*([}\]])', r'\1', json_str)
json_str = re.sub(r'。(?=\s*[}\]])', '.', json_str)
json_str = re.sub(r'E:\s*无意向', 'E', json_str)
result = json.loads(json_str)
return result
logger.error("未找到有效的JSON内容")
return {}
except json.JSONDecodeError as e:
logger.error(f"JSON解析错误: {str(e)}")
return {}
except Exception as e:
logger.error(f"提取JSON时发生错误: {str(e)}")
logger.error(traceback.format_exc())
return {}
def customer_day_analysis(data_file_dt=''):
"""主函数"""
try:
# 配置日志
setup_logging()
# 记录开始时间
start_time = datetime.now()
logger.info("=== 开始运行销售对话分析系统 ===")
logger.info(f"开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
# 创建输出目录
output_dir = 'user_day_research_result'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
logger.info(f"创建输出目录: {output_dir}")
# 读取对话数据文件
logger.info("正在读取对话数据文件...")
if data_file_dt == '':
target_dir = f'dialog_source_data/{datetime.today().strftime('%Y%m%d')}'
else:
target_dir = f'dialog_source_data/{data_file_dt}'
content = read_source_file(target_dir)
logger.info(f"文件总字符数: {len(content)}")
logger.info(f"文件总行数: {len(content.splitlines())}")
# 提取对话组
logger.info("开始提取对话组...")
dialogue_groups = extract_dialogue_groups(content)
# 并发处理对话组
logger.info(f"开始并发处理 {len(dialogue_groups)} 个对话组...")
results = process_dialogue_batch(dialogue_groups)
# 写入结果到CSV文件
if results:
logger.info("正在将结果写入CSV文件...")
if data_file_dt == '':
# timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
timestamp = datetime.now().strftime("%Y%m%d")
output_file = os.path.join(output_dir, f'user_day_research_result_{timestamp}.csv')
else:
output_file = os.path.join(output_dir, f'user_day_research_result_{data_file_dt}.csv')
write_result_to_csv(results, output_file)
logger.info(f"结果已写入文件: {output_file}")
else:
logger.warning("没有生成任何分析结果")
# 记录结束时间和统计信息
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
logger.info("=== 处理完成 ===")
logger.info(f"总耗时: {duration:.2f} 秒")
logger.info(f"成功处理对话组数量: {len(dialogue_groups)}/{len(dialogue_groups)}")
logger.info(f"生成分析结果数量: {len(results)} 条")
logger.info(f"结束时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
except Exception as e:
logger.error(f"程序执行过程中发生错误: {str(e)}")
logger.error(traceback.format_exc())
if __name__ == "__main__":
customer_day_analysis()
import json, random, re
import multiprocessing, math
import os
import hashlib
from pathlib import Path
import emoji
from datetime import datetime, timedelta
import tiktoken
import shutil
def truncate_list_by_token_count(text_list, max_tokens=128 * 1024, model_name="gpt-3.5-turbo"):
"""
截断文本列表,使其总 token 数不超过 max_tokens。
参数:
text_list (List[str]): 要处理的文本列表。
max_tokens (int): 最大 token 数,默认 128k(131072)。
model_name (str): 用于 token 计算的模型名。
返回:
List[str]: 截断后的文本列表。
"""
encoding = tiktoken.encoding_for_model(model_name)
total_tokens = 0
truncated_list = []
for text in text_list:
tokens = encoding.encode(text)
token_count = len(tokens)
if total_tokens + token_count > max_tokens:
break
truncated_list.append(text)
total_tokens += token_count
return truncated_list
def extract_emojis_list(text):
return [char for char in text if char in emoji.EMOJI_DATA]
def is_ad_text(text):
text = text.strip()
if len(extract_emojis_list(text)) >= 3:
return True
elif len(extract_emojis_list(text)) >= 2 and text.count('\\n\\n') >= 1:
return True
elif len(extract_emojis_list(text)) >= 1 and text.count('\\n\\n') >= 1 and text.strip().count('\\n') >= 1:
return True
elif len(extract_emojis_list(text)) >= 1 and text.count('\\n\\n') >= 2:
return True
elif len(extract_emojis_list(text)) >= 1 and text.count('\\n') >= 2:
return True
elif text.strip().count('\\n') >= 2 and text.count('\\n\\n') >= 2:
return True
elif re.search(r'亲爱的家长朋友|尊敬的银河|隆重推出|好礼相送|推荐奖金|银河精选要闻', text):
return True
else:
return False
def get_md5_value(string: str):
md5 = hashlib.md5()
md5.update(string.encode('utf-8'))
md5_value = md5.hexdigest()
return md5_value
# s = get_md5_value(string="https://prod-cdn-pub.galaxy-immi.com/production/ultron/ultron_phone/audio/3817385431338117814120250329160004.mp3")
# print(s)
def saler_customer_dialog_format(input_sub_list, output_file, lock, ):
for index, customer_path_file in input_sub_list:
result_list = []
with open(customer_path_file, 'r', encoding='utf-8') as f1:
# 客户id
customer_id = os.path.basename(customer_path_file).replace('customer_id_', '').replace('.json', '')
data = f1.read()
dialog_dict_list = json.loads(data)
if len(dialog_dict_list) == 0:
return result_list
dialog_dict_list = sorted(dialog_dict_list,
key=lambda x: datetime.strptime(x['msg_time'], '%Y-%m-%d %H:%M:%S'))
# 默认初始值
beisen_depart_name_list = []
try:
beisen_depart_name = [dialog_dict['beisen_depart_name'] for dialog_dict in dialog_dict_list if
dialog_dict['beisen_depart_name'] not in [None, '']][0]
except:
beisen_depart_name = '销售组'
for dialog_dict in dialog_dict_list:
if dialog_dict['beisen_depart_name'] not in [None, '']:
beisen_depart_name_list.append(dialog_dict['beisen_depart_name'])
# print(dialog_dict)
if dialog_dict['msg_type'] in ('image'):
content = '相关图片'
elif dialog_dict['msg_type'] in ('video'):
content = '相关视频'
elif dialog_dict['msg_type'] in ('file'):
content = '相关文件'
elif dialog_dict['msg_type'] in ('link'):
content = '相关链接'
elif dialog_dict['msg_type'] in ('emotion'):
content = '相关表情'
else:
content = repr(dialog_dict['content']).strip("'").strip('"')
if content == "None":
continue
# if dialog_dict['msg_time'] == '2025-04-27 10:34:59':
# print(content)
if dialog_dict['msg_type'] in ('voice', 'meeting_voice_call', 'phone'):
hash_id = get_md5_value(content)
# print("hash_id:",hash_id,"voice_url:", content)
tmp_hash_file = os.path.join(customer_path_file.replace(os.path.basename(customer_path_file), ''),
f'audio_{hash_id}.json')
tmp_result_list = []
tmp_result_list_1 = []
if os.path.isfile(tmp_hash_file): # 已识别音频文件
# 由于语音没有客服名称,因此取最近的客服名称
if len(beisen_depart_name_list) > 0:
beisen_depart_name = beisen_depart_name_list[-1]
with open(tmp_hash_file, 'r', encoding='utf-8') as f2:
tmp_data = f2.read()
try:
tmp_dict_list = json.loads(tmp_data)
except:
tmp_dict_list = None
if dialog_dict['msg_sender_type_id'] == 0:
# if dialog_dict['msg_sender_type_id'] == 0 and dialog_dict['msg_time'] == '2025-04-27 10:34:59':
if tmp_dict_list != None:
for sentence_dirc in tmp_dict_list['Result']['Sentences']:
tmp_BeginTime = sentence_dirc['BeginTime']
tmp_ChannelId = sentence_dirc['ChannelId']
tmp_Text = sentence_dirc['Text']
msg_time = (datetime.strptime(dialog_dict['msg_time'],
"%Y-%m-%d %H:%M:%S") + timedelta(
milliseconds=tmp_BeginTime)).strftime("%Y-%m-%d %H:%M:%S")
# --------------------语音通话中销售身份和客户身份直接使用"销售组与客户(id)"的形式
tmp_result_list_1.append(
f"""{msg_time} {beisen_depart_name}与客户({customer_id})-语音通话:{tmp_Text}""")
# --------------------使用GPT判断语音通话中销售身份和客户身份
# tmp_result_list.append(f"""{msg_time} 身份{tmp_ChannelId}:{tmp_Text}""")
# tmp_context = '\n'.join(tmp_result_list[:50])
# prompt = f"""
# ```
# {tmp_context}
# ```
# 请将上述```中的内容严格按照如下要求处理:
# 判断各身份的归属到:"{beisen_depart_name}-{dialog_dict['msg_category']}"或"客户({customer_id})-{dialog_dict['msg_category']}"
# 例如:{{"身份0或身份1":"{beisen_depart_name}-{dialog_dict['msg_category']}","身份0或身份1":"客户({customer_id})-{dialog_dict['msg_category']}"}}
# 补充:如果是银河的人员,则归属到"{beisen_depart_name}-{dialog_dict['msg_category']}"
# 严格按照"{{"":"","":"",...}}"的格式输出,拒绝```json```
# """
# is_try = True
# try_n = 1
# while is_try:
# try:
# identity_reco = get_online_llm_response(prompt)
# # print(identity_reco)
# identity_reco_dict = json.loads(identity_reco)
# # print(identity_reco_dict)
# is_try = False
# except Exception as e:
# try_n += 1
# if try_n > 2: # gpt调用有两次机会
# is_try = False
# print(e)
# pass
# for line in tmp_result_list:
# # print(line)
# for k, v in identity_reco_dict.items():
# line = line.replace(k, v)
# # print(line)
# tmp_result_list_1.append(line)
elif dialog_dict['msg_sender_type_id'] == 1: # 客户语音
if tmp_dict_list != None:
# print(tmp_dict_list)
# BizDuration = tmp_dict_list['BizDuration']
for sentence_dirc in tmp_dict_list['Result']['Sentences']:
tmp_BeginTime = sentence_dirc['BeginTime']
tmp_ChannelId = sentence_dirc['ChannelId']
tmp_Text = sentence_dirc['Text']
msg_time = (datetime.strptime(dialog_dict['msg_time'],
"%Y-%m-%d %H:%M:%S") + timedelta(
milliseconds=tmp_BeginTime)).strftime("%Y-%m-%d %H:%M:%S")
if dialog_dict['msg_type'] == 'meeting_voice_call':
tmp_result_list_1.append(
f"""{msg_time} {beisen_depart_name}与客户({customer_id})-语音通话:{tmp_Text}""")
else:
tmp_result_list_1.append(
f"""{msg_time} 客户({customer_id})-{dialog_dict['msg_category']}:{tmp_Text}""")
elif dialog_dict['msg_sender_type_id'] == 2: # 销售语音
if tmp_dict_list != None:
# print(tmp_dict_list)
# BizDuration = tmp_dict_list['BizDuration']
for sentence_dirc in tmp_dict_list['Result']['Sentences']:
tmp_BeginTime = sentence_dirc['BeginTime']
tmp_ChannelId = sentence_dirc['ChannelId']
tmp_Text = sentence_dirc['Text']
msg_time = (datetime.strptime(dialog_dict['msg_time'],
"%Y-%m-%d %H:%M:%S") + timedelta(
milliseconds=tmp_BeginTime)).strftime("%Y-%m-%d %H:%M:%S")
if dialog_dict['msg_type'] == 'meeting_voice_call':
tmp_result_list_1.append(
f"""{msg_time} {beisen_depart_name}与客户({customer_id})-语音通话:{tmp_Text}""")
else:
if dialog_dict['beisen_depart_name'] not in [None, '']:
tmp_result_list_1.append(
f"""{msg_time} {dialog_dict['beisen_depart_name']}-{dialog_dict['msg_category']}:{tmp_Text}""")
else:
tmp_result_list_1.append(
f"""{msg_time} {beisen_depart_name}-{dialog_dict['msg_category']}:{tmp_Text}""")
else: # 未识别音频文件的情况
print(f'客户({customer_id}):音频文件不存在:{tmp_hash_file} 内容:{content}')
# 输出结果:
if dialog_dict['beisen_depart_name'] in [None, '']:
tmp_result_list_1.append(
f"""{dialog_dict['msg_time']} 销售组-客户({customer_id})-{dialog_dict['msg_category']}:相关通话""")
# print(f"""{dialog_dict['msg_time']} {beisen_depart_name}-客户({customer_id})-{dialog_dict['msg_category']}:{content}""")
else:
tmp_result_list_1.append(
f"""{dialog_dict['msg_time']} {dialog_dict['beisen_depart_name']}-客户({customer_id})-{dialog_dict['msg_category']}:相关通话""")
# print(f"""{dialog_dict['msg_time']} {dialog_dict['beisen_depart_name']}-客户({customer_id})-{dialog_dict['msg_category']}:{content}""")
result_list.extend(tmp_result_list_1)
# break
else: # 非语音的对话输出
if dialog_dict['msg_sender_type_id'] == 1: # 客户
result_list.append(
f"""{dialog_dict['msg_time']} 客户({customer_id})-{dialog_dict['msg_category']}:{content}""")
elif dialog_dict['msg_sender_type_id'] == 2: # 销售
if dialog_dict['beisen_depart_name'] in [None, '']:
result_list.append(
f"""{dialog_dict['msg_time']} 销售组-{dialog_dict['msg_category']}:{content}""")
# print(f"""{dialog_dict['msg_time']} 销售组-{dialog_dict['msg_category']}:{content}""")
else:
result_list.append(
f"""{dialog_dict['msg_time']} {dialog_dict['beisen_depart_name']}-{dialog_dict['msg_category']}:{content}""")
# print(f"""{dialog_dict['msg_time']} {dialog_dict['beisen_depart_name']}-{dialog_dict['msg_category']}:{content}""")
lock.acquire() # 添加锁
try:
with open(output_file, 'a', encoding='utf-8') as f:
f.write(f'对话组:{index}' + "\n")
print(f'对话组:{index}', customer_path_file)
# print('\n'.join(result_list))
# f.write(mask_phone_email(text='\n'.join(result_list)) + "\n")
f.write('\n'.join(result_list) + "\n")
finally:
lock.release() # 释放锁
def multi_saler_customer_format(input_list, output_file, group_index=0, group_size=5, num_processes=5):
with open(output_file, 'w', encoding='utf-8') as f:
pass
# 创建进程锁
lock = multiprocessing.Lock()
num_groups = math.ceil(len(input_list) / group_size)
print(f"总共划分的组数为: {num_groups}")
processes = []
while group_index < num_groups:
# 确定当前可用的进程数量,取剩余分组数和设定进程数的较小值
available_processes = min(num_processes, num_groups - group_index)
print("当前第%s分组" % group_index, f"总共划分的组数为: {num_groups}")
for _ in range(available_processes):
start_index = group_index * group_size
end_index = min((group_index + 1) * group_size, len(input_list))
print(f"当前分组的索引范围: {start_index} - {end_index}")
input_sub_list = input_list[start_index:end_index]
if len(input_sub_list) > 0:
p = multiprocessing.Process(target=saler_customer_dialog_format,
args=(input_sub_list, output_file, lock,))
processes.append(p)
p.start()
group_index += 1
# 等待当前一批次启动的进程执行完毕
for p in processes[-available_processes:]:
p.join()
# 移除已完成的进程对象,避免内存占用过多
processes = processes[:-available_processes]
def get_dialog_to_load(parentdir):
# 1.数据清洗落地
subdirs = [os.path.join(parentdir, f.name) for f in Path(parentdir).iterdir() if f.is_dir()] # [:1]
if len(subdirs) == 0:
subdirs = [parentdir]
for subdir in subdirs:
outputfile = os.path.join(subdir, Path(subdir).name + '.txt')
matched_files = [os.path.join(subdir, f) for f in os.listdir(subdir) if
re.search(r'customer_id', f)]
matched_files = list(enumerate(matched_files, start=1))
multi_saler_customer_format(matched_files, outputfile, group_index=0, group_size=5, num_processes=5)
def load_dialog_to_merge(data_parent_dir, data_file_dt, start_dt='', end_dt=''):
# 2.数据整合落地
output_file = os.path.join(data_parent_dir, Path(data_parent_dir).name + '.txt')
data_sub_dir_list = [os.path.join(data_parent_dir, f.name) for f in Path(data_parent_dir).iterdir() if f.is_dir()]
if len(data_sub_dir_list) == 0:
data_sub_dir_list_file = [os.path.join(data_parent_dir, Path(data_parent_dir).name + '.txt')]
else:
data_sub_dir_list_file = [os.path.join(data_parent_dir, f.name, f.name + '.txt') for f in
Path(data_parent_dir).iterdir() if f.is_dir()]
print('整合的子文件有:\n', '\n'.join(data_sub_dir_list_file))
tmp_content_list = []
for file in data_sub_dir_list_file:
with open(file, 'r', encoding='utf-8') as infile:
contents = infile.readlines()
for content in contents:
tmp_content = ''
if '对话组' in content:
tmp_content = content
# tmp_content_list.append(content)
else:
try:
log_date = datetime.strptime(content.strip()[:10], "%Y-%m-%d")
if start_dt == '':
start_date = datetime.strptime('2000-01-01', "%Y-%m-%d")
else:
start_date = datetime.strptime(start_dt, "%Y-%m-%d")
if end_dt == '':
end_date = datetime.strptime('2999-01-01', "%Y-%m-%d")
else:
end_date = datetime.strptime(end_dt, "%Y-%m-%d")
if start_date <= log_date <= end_date:
tmp_content = content
# tmp_content_list.append(content)
except Exception as e:
print(f'err content:{content}', e)
if tmp_content != '':
tmp_content_list.append(tmp_content)
print('已合并数据文件:', file)
if start_dt == '' and end_dt == '':
tmp_str = ''
else:
tmp_str = f'({start_dt}_{end_dt})'
tmp_content_list_1 = tmp_content_list
# 按每天的聊天记录切分对话内容
# tmp_content_list_1 = []
# tmp_day = ''
# for index, content in enumerate(tmp_content_list[::-1]):
# dt = content.strip()[:10]
# if dt != tmp_day and tmp_day != '' and tmp_day.startswith(('20')):
# # print('curr:',tmp_day,'next',dt)
# tmp_content_list_1.append(tmp_day + '\n')
# tmp_day = dt
# # print(index, content)
# tmp_content_list_1.append(content)
# tmp_content_list_1.reverse()
# 剔除有广告没有客户回复的日对话
del_day_list = []
tmp_prefix_content_dic = {}
group_index = 1
for content in tmp_content_list_1:
if '对话组' in content:
tmp_pre = f'对话组:{group_index}\n'
tmp_prefix_content_dic[tmp_pre] = [tmp_pre]
group_index += 1
else:
if content.strip()[:10] + tmp_pre not in tmp_prefix_content_dic:
tmp_prefix_content_dic[content.strip()[:10] + tmp_pre] = [content]
else:
tmp_prefix_content_dic[content.strip()[:10] + tmp_pre].append(content)
for prefix, content_list in tmp_prefix_content_dic.items():
if not re.search(r"客户[(]\d+[)]", ''.join(content_list)) and prefix.startswith(('20')):
del_day_list.append(prefix)
for key in del_day_list:
tmp_prefix_content_dic.pop(key, None)
# 按天的内容切分对话
# split_dialog_num = 5
# for prefix,content_list in tmp_prefix_content_dic.items():
# tmp_list = []
# tmp_list.append(content_list[0])
# part_index = 1
# for i in range(1,len(content_list),split_dialog_num):
# if len(content_list[i:i + split_dialog_num]) >0:
# tmp_strs = ''.join(content_list[i:i + split_dialog_num])
# tmp_strs = f'part{part_index}\n'+tmp_strs
# tmp_list.append(tmp_strs)
# part_index +=1
# tmp_prefix_content_dic[prefix] = tmp_list
tmp_content_list_1 = [content for content_list in tmp_prefix_content_dic.values() for content in content_list]
# 控制每个对话组的对话总量,如果对话组没有对话则删除该组
tmp_group_content_list_dic = {}
for content in tmp_content_list_1:
if '对话组' in content:
tmp_strs = content
tmp_group_content_list_dic[tmp_strs] = [content + f'round1:{end_dt}\n']
else:
if tmp_strs in tmp_group_content_list_dic:
tmp_group_content_list_dic[tmp_strs].append(content)
del_group_list = []
for group, content_list in tmp_group_content_list_dic.items():
content_list = truncate_list_by_token_count(content_list, max_tokens=100 * 1024, model_name="gpt-3.5-turbo")
tmp_group_content_list_dic[group] = content_list
if len(content_list) <= 10: # 过滤出聊天量小于N条的当天内容
del_group_list.append(group)
for group in del_group_list:
tmp_group_content_list_dic.pop(group, None)
tmp_content_list_1 = [content for content_list in tmp_group_content_list_dic.values() for content in content_list]
# 按天节点添加对话轮数
tmp_content_list_2 = []
day_index = 1
for content in tmp_content_list_1:
if len(content.strip()) == 10 and content.strip().startswith(('20')):
content = f'round{day_index}:{content}'
day_index += 1
if content.startswith(('对话组')):
day_index = 1
tmp_content_list_2.append(content)
# 结果保存本地
load_path_file = output_file.split('.txt')[0] + tmp_str + '.txt'
with open(load_path_file, 'w', encoding='utf-8') as outfile:
for content in tmp_content_list_2:
outfile.write(content)
print('合并后输出数据文件:', load_path_file)
# 将保存的本地文件移动到目标目录
if data_file_dt == '':
target_dir = f'dialog_source_data/{datetime.today().strftime('%Y%m%d')}'
else:
target_dir = f'dialog_source_data/{data_file_dt}'
os.makedirs(target_dir, exist_ok=True)
shutil.move(load_path_file, os.path.join(target_dir, 'source.md'))
print('目标数据文件转入到:', os.path.join(target_dir, 'source.md'))
def extract_dialogue(customer_id, text, index_str='对话组:'):
matches = list(re.finditer(rf'{index_str}.*?(?={index_str}|$)', text, re.DOTALL))
for m in matches:
if customer_id in m.group():
return m.group()
return None
def get_customer_id_dialog(customer_id, input_file):
with open(input_file, 'r', encoding='utf-8') as infile:
content = infile.read()
result = extract_dialogue(customer_id=customer_id, text=content)
print(result)
if __name__ == '__main__':
# data_parent_dir = r'D:\Downloads\对客销售\对话语料\20250612-119未复购客户'
# get_dialog_to_load(data_parent_dir)
# load_dialog_to_merge(data_parent_dir, start_dt=f'{(datetime.today() - timedelta(days=60)).strftime('%Y-%m-%d')}', end_dt=f'{(datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d')}')
get_customer_id_dialog(customer_id='914108', input_file='./dialog_source_data/20250626/source.md')
from customer_dialog_clearn import get_dialog_to_load, load_dialog_to_merge
from customer_day_analysis import customer_day_analysis
from sqlalchemy import create_engine, text
import pandas as pd
from datetime import datetime, timedelta
import json, multiprocessing, math, os, subprocess
from online_streaming import online_llm_streaming
def get_db_customer_id_df(start_dt, end_dt):
host = 'rdsonlyread.rwlb.rds.aliyuncs.com'
user = 'tmp_presale'
pwd = 'czWYsd2fjx8HaKSCrXh'
db = 'presalemicros'
port = '3306'
engine = create_engine(f'mysql+pymysql://{user}:{pwd}@{host}:{port}/{db}')
sql_string = f"""
with temp as (
select 客户id,
TRIM(BOTH ',' from GROUP_CONCAT(工作状态)) 工作状态,
TRIM(BOTH ',' from GROUP_CONCAT(是否有居住地)) 是否有居住地,
TRIM(BOTH ',' from GROUP_CONCAT(流水怎么做)) 流水怎么做,
TRIM(BOTH ',' from GROUP_CONCAT(工位)) 工位,
TRIM(BOTH ',' from GROUP_CONCAT(销售卡点)) 销售卡点,
TRIM(BOTH ',' from GROUP_CONCAT(客户需求判断)) 客户需求判断,
TRIM(BOTH ',' from GROUP_CONCAT(意向产品)) 意向产品
from (
(select
ct.customer_id 客户id,
ifnull(GROUP_CONCAT(tgzzt.name),'') 工作状态,
ifnull(GROUP_CONCAT(tsfyjzd.name),'') 是否有居住地,
ifnull(GROUP_CONCAT(tlszmz.name),'') 流水怎么做,
ifnull(GROUP_CONCAT(tgw.name),'') 工位,
ifnull(GROUP_CONCAT(txskd.name),'') 销售卡点,
'' 客户需求判断,
'' 意向产品
from micros_customer_tag ct
left join micros_tag tgzzt on ct.tag_id=tgzzt.id and ct.deleted_at is null and tgzzt.pid=3 and tgzzt.deleted_at is null
left join micros_tag tsfyjzd on ct.tag_id=tsfyjzd.id and ct.deleted_at is null and tsfyjzd.pid=4 and tsfyjzd.deleted_at is null
left join micros_tag tlszmz on ct.tag_id=tlszmz.id and ct.deleted_at is null and tlszmz.pid=5 and tlszmz.deleted_at is null
left join micros_tag tgw on ct.tag_id=tgw.id and ct.deleted_at is null and tgw.pid=6 and tgw.deleted_at is null
left join micros_tag txskd on ct.tag_id=txskd.id and ct.deleted_at is null and txskd.pid=23 and txskd.deleted_at is null
where ct.updated_at between '{start_dt}' and '{end_dt}' and ct.deleted_at is null and (tgzzt.id is not null or tsfyjzd.id is not null or tlszmz.id is not null or tgw.id is not null or txskd.id is not null)
group by ct.customer_id
order by ct.customer_id)
union all
(
select
customer_id 客户id,
'' 工作状态,
'' 是否有居住地,
'' 流水怎么做,
'' 工位,
'' 销售卡点,
'' 客户需求判断,
GROUP_CONCAT(pp) 意向产品
from (
select distinct customer_id,CONCAT(product_name,'(',price,')') pp from micros_customer_attention_product where updated_at between '{start_dt}' and '{end_dt}' and deleted_at is null order by customer_id) as tmp group by customer_id)
union all
(
select
customer_id 客户id,
'' 工作状态,
'' 是否有居住地,
'' 流水怎么做,
'' 工位,
'' 销售卡点,
ifnull(GROUP_CONCAT(aa), '') 客户需求判断,
'' 意向产品
from (
SELECT
tttt.customer_id,
-- CONCAT(tttt.name, if(tttt.son is not null,CONCAT("(",tttt.son,")"),"")) aa
CONCAT(tttt.name, if(tttt.son is not null,CONCAT("-",tttt.son),"")) aa
from (
SELECT
ct.tag_id,
ct.customer_id,
tt3.name,
(
SELECT
GROUP_CONCAT(t2.name)
from micros_customer_tag ct2
left join micros_tag t2
on t2.id = ct2.tag_id
where
ct2.tag_id in (SELECT id from micros_tag t where t.pid = ct.tag_id and t.`level` = 3 and path like '2,24,%') and
ct2.customer_id = ct.customer_id and
ct2.deleted_at is null
) as son
FROM micros_customer_tag ct
left join micros_tag tt3
on tt3.id = ct.tag_id
where
EXISTS(SELECT 1 from micros_tag tt where tt.id = ct.tag_id and tt.level = 2 and tt.path='2,24') and
ct.deleted_at is null and
ct.updated_at between '{start_dt}' and '{end_dt}'
) as tttt
) ttttt
group by customer_id)
) tmp3 group by 客户id
)
select t.客户id,
t.工作状态,
t.是否有居住地,
t.流水怎么做,
t.工位,
t.意向产品,
t.销售卡点 as 人工_销售卡点,
t.客户需求判断 as 人工_客户需求判断,
t1.标签更新时间
from temp t
left join (select customer_id as 客户id,max(updated_at) as 标签更新时间 from micros_customer_tag where updated_at between '{start_dt}' and '{end_dt}' and deleted_at is null group by customer_id) t1
on t.客户id = t1.客户id
where t1.客户id is not null
and (length(trim(t.客户需求判断)) > 0)
and trim(t.客户需求判断) not in ('其他')
order by t1.标签更新时间 desc
"""
with engine.connect() as connection:
sql_result = connection.execute(text(sql_string))
customer_id_df = pd.DataFrame(sql_result.fetchall(), columns=sql_result.keys())
return customer_id_df
def get_db_data_df(start_dt, end_dt):
host = 'rdsonlyread.rwlb.rds.aliyuncs.com'
user = 'tmp_presale'
pwd = 'czWYsd2fjx8HaKSCrXh'
db = 'presalemicros'
port = '3306'
engine = create_engine(f'mysql+pymysql://{user}:{pwd}@{host}:{port}/{db}')
sql_string = f"""
with temp as (
select 客户id,
TRIM(BOTH ',' from GROUP_CONCAT(工作状态)) 工作状态,
TRIM(BOTH ',' from GROUP_CONCAT(是否有居住地)) 是否有居住地,
TRIM(BOTH ',' from GROUP_CONCAT(流水怎么做)) 流水怎么做,
TRIM(BOTH ',' from GROUP_CONCAT(工位)) 工位,
TRIM(BOTH ',' from GROUP_CONCAT(销售卡点)) 销售卡点,
TRIM(BOTH ',' from GROUP_CONCAT(客户需求判断)) 客户需求判断,
TRIM(BOTH ',' from GROUP_CONCAT(意向产品)) 意向产品
from (
(select
ct.customer_id 客户id,
ifnull(GROUP_CONCAT(tgzzt.name),'') 工作状态,
ifnull(GROUP_CONCAT(tsfyjzd.name),'') 是否有居住地,
ifnull(GROUP_CONCAT(tlszmz.name),'') 流水怎么做,
ifnull(GROUP_CONCAT(tgw.name),'') 工位,
ifnull(GROUP_CONCAT(txskd.name),'') 销售卡点,
'' 客户需求判断,
'' 意向产品
from micros_customer_tag ct
left join micros_tag tgzzt on ct.tag_id=tgzzt.id and ct.deleted_at is null and tgzzt.pid=3 and tgzzt.deleted_at is null
left join micros_tag tsfyjzd on ct.tag_id=tsfyjzd.id and ct.deleted_at is null and tsfyjzd.pid=4 and tsfyjzd.deleted_at is null
left join micros_tag tlszmz on ct.tag_id=tlszmz.id and ct.deleted_at is null and tlszmz.pid=5 and tlszmz.deleted_at is null
left join micros_tag tgw on ct.tag_id=tgw.id and ct.deleted_at is null and tgw.pid=6 and tgw.deleted_at is null
left join micros_tag txskd on ct.tag_id=txskd.id and ct.deleted_at is null and txskd.pid=23 and txskd.deleted_at is null
where ct.updated_at between '{start_dt}' and '{end_dt}' and ct.deleted_at is null and (tgzzt.id is not null or tsfyjzd.id is not null or tlszmz.id is not null or tgw.id is not null or txskd.id is not null)
group by ct.customer_id
order by ct.customer_id)
union all
(
select
customer_id 客户id,
'' 工作状态,
'' 是否有居住地,
'' 流水怎么做,
'' 工位,
'' 销售卡点,
'' 客户需求判断,
GROUP_CONCAT(pp) 意向产品
from (
select distinct customer_id,CONCAT(product_name,'(',price,')') pp from micros_customer_attention_product where updated_at between '{start_dt}' and '{end_dt}' and deleted_at is null order by customer_id) as tmp group by customer_id)
union all
(
select
customer_id 客户id,
'' 工作状态,
'' 是否有居住地,
'' 流水怎么做,
'' 工位,
'' 销售卡点,
ifnull(GROUP_CONCAT(aa), '') 客户需求判断,
'' 意向产品
from (
SELECT
tttt.customer_id,
-- CONCAT(tttt.name, if(tttt.son is not null,CONCAT("(",tttt.son,")"),"")) aa
CONCAT(tttt.name, if(tttt.son is not null,CONCAT("-",tttt.son),"")) aa
from (
SELECT
ct.tag_id,
ct.customer_id,
tt3.name,
(
SELECT
GROUP_CONCAT(t2.name)
from micros_customer_tag ct2
left join micros_tag t2
on t2.id = ct2.tag_id
where
ct2.tag_id in (SELECT id from micros_tag t where t.pid = ct.tag_id and t.`level` = 3 and path like '2,24,%') and
ct2.customer_id = ct.customer_id and
ct2.deleted_at is null
) as son
FROM micros_customer_tag ct
left join micros_tag tt3
on tt3.id = ct.tag_id
where
EXISTS(SELECT 1 from micros_tag tt where tt.id = ct.tag_id and tt.level = 2 and tt.path='2,24') and
ct.deleted_at is null and
ct.updated_at between '{start_dt}' and '{end_dt}'
) as tttt
) ttttt
group by customer_id)
) tmp3 group by 客户id
)
select t.客户id,
t.工作状态,
t.是否有居住地,
t.流水怎么做,
t.工位,
t.意向产品,
t.销售卡点 as 人工_销售卡点,
t.客户需求判断 as 人工_客户需求判断,
t1.标签更新时间
from temp t
left join (select customer_id as 客户id,max(updated_at) as 标签更新时间 from micros_customer_tag where updated_at between '{start_dt}' and '{end_dt}' and deleted_at is null group by customer_id) t1
on t.客户id = t1.客户id
where t1.客户id is not null
and (length(trim(t.客户需求判断)) > 0)
and trim(t.客户需求判断) not in ('其他')
order by t1.标签更新时间 desc
"""
with engine.connect() as connection:
sql_result = connection.execute(text(sql_string))
sql_result_df = pd.DataFrame(sql_result.fetchall(), columns=sql_result.keys())
return sql_result_df
def ai_man_compare_forward(row):
prompt = f"""
```
内容1:{row.iloc[0]}
内容2:{row.iloc[1]}
```
根据上面```中的内容1与内容2,打分规则如下:
如果内容1或内容2的宽松的意思一致性较高:得分80分到100分范围
如果内容1或内容2的宽松的意思一致性偏高:得分70分到80分范围
如果内容1或内容2的宽松的意思一致性偏低:得分60分到70分范围
如果内容1或内容2的宽松的意思一致性很低:得分50分到60分范围
如果内容1或内容2的宽松的意思几乎不相关:得分50分以下
如果内容1或内容2为空:得分50分
请严格按照{{"得分":"得分原因"}}的json格式给出最终得分和原因,拒绝```json```的形式。
例如:
{{"85":"意向客户可更进与长期运营,因为意向客户可更进,可以做长期运营的基础条件,因此意思一致性较高"}}
{{"50":"内容1为空,根据打分规则,如果内容1或内容2为空则得分为50分"}}
{{"30":"内容1为身份续签相关,内容2为教育择校相关,根据打分规则,如果内容1或内容2的意思一致性几乎不相关则得分为30分"}}
{{"0":"内容1为身份续签,内容2为放弃身份,根据打分规则,内容1或内容2意思完全相反则得分为0分"}}
"""
# try:
# tmp_ai_result = online_llm_streaming(prompt).run()
# output = next(iter(json.loads(tmp_ai_result)))
# print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# return output
# except:
# print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# return tmp_ai_result
try:
tmp_ai_result = online_llm_streaming(prompt).run()
# output = next(iter(json.loads(tmp_ai_result)))
# print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
return tmp_ai_result
except:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
return tmp_ai_result
def target_func(sub_df, result_pd_list):
sub_df = sub_df.fillna('').apply(ai_man_compare_forward, axis=1)
result_pd_list.append(sub_df)
def multi_func(df, group_index=0, group_size=5, num_processes=10):
manager = multiprocessing.Manager()
result_pd_list = manager.list()
num_groups = math.ceil(len(df) / group_size)
print(f"总共划分的组数为: {num_groups}")
processes = []
while group_index < num_groups:
# 确定当前可用的进程数量,取剩余分组数和设定进程数的较小值
available_processes = min(num_processes, num_groups - group_index)
print("当前第%s分组" % group_index, f"总共划分的组数为: {num_groups}")
for _ in range(available_processes):
start_index = group_index * group_size
end_index = min((group_index + 1) * group_size, len(df))
print(f"当前分组的索引范围: {start_index} - {end_index}")
sub_df = df[start_index:end_index]
if len(sub_df) > 0:
p = multiprocessing.Process(target=target_func,
args=(sub_df, result_pd_list,))
processes.append(p)
p.start()
group_index += 1
# 等待当前一批次启动的进程执行完毕
for p in processes[-available_processes:]:
p.join()
# 移除已完成的进程对象,避免内存占用过多
processes = processes[:-available_processes]
return pd.concat(result_pd_list).sort_index()
def split_rows_by_comma(df, split_column):
"""
将DataFrame根据指定列的逗号分隔符拆分成多行
参数:
df: 原始DataFrame
split_column: 需要拆分的列名
返回:
拆分后的新DataFrame
"""
# 步骤1: 拆分逗号分隔的字符串为列表 [1,3,6](@ref)
df[split_column] = df[split_column].str.split(',')
# 步骤2: 使用explode()展开列表为多行 [6](@ref)
exploded_df = df.explode(split_column)
# 步骤3: 重置索引并清理数据 [6](@ref)
exploded_df = exploded_df.reset_index(drop=True)
exploded_df[split_column] = exploded_df[split_column].str.strip()
return exploded_df
if __name__ == '__main__':
# 从数据库中获取客户id并生成客户id的json格式文件到目标路径
customer_id_df = get_db_customer_id_df(start_dt='2025-06-19', end_dt=datetime.now().strftime("%Y-%m-%d"))
tmp_customer_id_list = []
for index, customer_id in customer_id_df.iterrows():
tmp_dict = {"客户id": f"{customer_id['客户id']}"}
tmp_customer_id_list.append(tmp_dict)
tmp_customer_id_dict = {"RECORDS": tmp_customer_id_list}
diglog_original_date_dir = f'./diglog_original_data/{datetime.now().strftime("%Y-%m-%d")}'
if not os.path.exists(diglog_original_date_dir):
os.makedirs(diglog_original_date_dir)
print(f"文件夹 {diglog_original_date_dir} 已创建。")
diglog_original_date_file = os.path.join(diglog_original_date_dir, f'{datetime.now().strftime("%Y%m%d")}.json')
with open(diglog_original_date_file, "w", encoding="utf-8") as f:
json.dump(tmp_customer_id_dict, f, ensure_ascii=False, indent=4)
print('customer_id文件生成:', diglog_original_date_file)
# 根据customerid的json文件,调用语料解析文本程序,生成对话文本语料文件到本地
diglog_original_date_file = './custom_sales/' + diglog_original_date_file.replace('./', '')
command = f"cd /opt/ai_sale && /root/miniconda3/envs/ai_sale/bin/python get_data.py {diglog_original_date_file} https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=cc1909e1-a8bf-4807-8165-c2776a02b23c"
print(command, '...')
subprocess.run(command, shell=True)
max_diglog_dict = \
sorted([f for f in os.listdir('/opt/ai_sale/data') if os.path.isdir(os.path.join('/opt/ai_sale/data', f))],
reverse=True)[0]
print('生成语料文件夹:', max_diglog_dict)
# 将解析出的对话文本语料进行规范整理
# data_parent_dir = r'D:\Yanjq_Project\custom_sales\diglog_original_data\2025-07-28\2025-07-28-114941'
data_parent_dir = os.path.join('/opt/ai_sale/data', max_diglog_dict)
data_dt = datetime.now().strftime("%Y%m%d")
# data_dt = '20250708'
get_dialog_to_load(data_parent_dir)
load_dialog_to_merge(data_parent_dir, data_file_dt=data_dt,
start_dt=f'{(datetime.strptime(data_dt, "%Y%m%d") - timedelta(days=60)).strftime("%Y-%m-%d")}',
end_dt=f'{(datetime.strptime(data_dt, "%Y%m%d") - timedelta(days=1)).strftime("%Y-%m-%d")}')
customer_day_analysis(data_file_dt=data_dt)
# 从crm数据库中获取真实标签关联到分析结果中
sql_result_df = get_db_data_df(start_dt='2025-06-19', end_dt=datetime.now().strftime("%Y-%m-%d"))
sql_result_df['客户id'] = sql_result_df['客户id'].astype(str)
file_path = f'./user_day_research_result/user_day_research_result_{data_dt}.csv'
data_df = pd.read_csv(file_path)
data_df['客户id'] = data_df['客户名称'].str.replace('客户', '')
data_df = pd.merge(data_df, sql_result_df, on='客户id', how='left')
# data_df['客户名称'] = data_df['客户id']
data_df = data_df.drop(['客户id', 'round'], axis=1)
data_df_needs_ai = data_df['客户购买潜力'].str.replace(',', ',').str.split(',', n=2, expand=True)
# 判断不足3列时补齐
while data_df_needs_ai.shape[1] < 3:
data_df_needs_ai[data_df_needs_ai.shape[1]] = None
data_df_needs_ai.columns = ['AI_客户需求1', 'AI_客户需求2', 'AI_客户需求3']
data_df['AI_客户需求二级标签'] = data_df_needs_ai.fillna('').apply(lambda x: ','.join(list(filter(None,
set(['' if x.iloc[
0] is None else
x.iloc[
0].replace(
' ',
'').split(
'-')[
0] if '-' in
x.iloc[
0] else
x.iloc[0],
'' if x.iloc[
1] is None else
x.iloc[
1].replace(
' ',
'').split(
'-')[
0] if '-' in
x.iloc[
1] else
x.iloc[1],
'' if x.iloc[
2] is None else
x.iloc[
2].replace(
' ',
'').split(
'-')[
0] if '-' in
x.iloc[
2] else
x.iloc[2]
]
)
)
)
), axis=1)
data_df['AI_客户需求三级标签'] = data_df_needs_ai.fillna('').apply(lambda x: ','.join(list(filter(None,
set(['' if x.iloc[
0] is None else
x.iloc[
0].replace(
' ',
'').split(
'-')[
1] if '-' in
x.iloc[
0] else '',
'' if x.iloc[
1] is None else
x.iloc[
1].replace(
' ',
'').split(
'-')[
1] if '-' in
x.iloc[
1] else '',
'' if x.iloc[
2] is None else
x.iloc[
2].replace(
' ',
'').split(
'-')[
1] if '-' in
x.iloc[
2] else ''
]
)
)
)
), axis=1)
data_df_needs_man = data_df['人工_客户需求判断'].str.replace(',', ',').str.split(',', n=2, expand=True)
# 判断不足3列时补齐
while data_df_needs_man.shape[1] < 3:
data_df_needs_man[data_df_needs_man.shape[1]] = None
data_df_needs_man.columns = ['人工_客户需求1', '人工_客户需求2', '人工_客户需求3']
data_df['人工_客户需求二级标签'] = data_df_needs_man.fillna('').apply(lambda x: ','.join(list(filter(None,
set(['' if
x.iloc[
0] is None else
x.iloc[
0].replace(
' ',
'').split(
'-')[
0] if '-' in
x.iloc[
0] else
x.iloc[0],
'' if
x.iloc[
1] is None else
x.iloc[
1].replace(
' ',
'').split(
'-')[
0] if '-' in
x.iloc[
1] else
x.iloc[1],
'' if
x.iloc[
2] is None else
x.iloc[
2].replace(
' ',
'').split(
'-')[
0] if '-' in
x.iloc[
2] else
x.iloc[2]
]
)
)
)
), axis=1)
data_df['人工_客户需求三级标签'] = data_df_needs_man.fillna('').apply(lambda x: ','.join(list(filter(None,
set(['' if
x.iloc[
0] is None else
x.iloc[
0].replace(
' ',
'').split(
'-')[
1] if '-' in
x.iloc[
0] else '',
'' if
x.iloc[
1] is None else
x.iloc[
1].replace(
' ',
'').split(
'-')[
1] if '-' in
x.iloc[
1] else '',
'' if
x.iloc[
2] is None else
x.iloc[
2].replace(
' ',
'').split(
'-')[
1] if '-' in
x.iloc[
2] else ''
]
)
)
)
), axis=1)
data_df = data_df.rename(columns={'客户购买潜力依据': 'AI_客户购买潜力依据'})
data_df = data_df.rename(columns={'顾问卡点依据': 'AI_顾问卡点依据'})
data_df = data_df.rename(columns={'顾问卡点': 'AI_销售卡点'})
data_df = data_df[
['会话组编号', '客户名称', '客户现状', '客户需求', '客户潜在需求', '客户意向等级', '跟进日期', '标签更新时间',
'AI_客户购买潜力依据', 'AI_顾问卡点依据', 'AI_客户需求二级标签', 'AI_客户需求三级标签', 'AI_销售卡点',
'人工_客户需求二级标签', '人工_客户需求三级标签', '人工_销售卡点']]
data_df['核验_客户需求二级标签'] = ''
data_df['核验_客户需求三级标签'] = ''
data_df['核验_销售卡点'] = ''
# data_df['核验_客户需求二级标签打分'] = data_df[['AI_客户需求二级标签', '人工_客户需求二级标签']].fillna('').swifter.apply(ai_man_compare_forward, axis=1)
# data_df['核验_客户需求三级标签打分'] = data_df[['AI_客户需求三级标签', '人工_客户需求三级标签']].fillna('').swifter.apply(ai_man_compare_forward, axis=1)
data_df = data_df.sort_index()
data_df = split_rows_by_comma(data_df, '人工_客户需求二级标签')
data_df = split_rows_by_comma(data_df, '人工_客户需求三级标签')
data_df = split_rows_by_comma(data_df, 'AI_客户需求二级标签')
data_df = split_rows_by_comma(data_df, 'AI_客户需求三级标签')
data_df = data_df[data_df['AI_客户需求三级标签'].notna()]
data_df = data_df[data_df['人工_客户需求三级标签'].notna()]
data_df['核验_客户需求二级标签打分'] = multi_func(
data_df[['AI_客户需求二级标签', '人工_客户需求二级标签']].fillna(''))
data_df['核验_客户需求三级标签打分'] = multi_func(
data_df[['AI_客户需求三级标签', '人工_客户需求三级标签']].fillna(''))
# data_df['核验_销售卡点打分'] = multi_func(data_df[['AI_销售卡点', '人工_销售卡点']].fillna(''))
data_df = data_df.sort_values(by=['标签更新时间'], ascending=[False])
data_df.to_excel(file_path.replace('csv', 'xlsx'), index=False)
# 创建数据库连接
engine = create_engine(
'mysql+pymysql://test_pldb_rw:fbtYqZ0I4YszvFEjaKZuoTXCM77uP2@test-scrm-cluster.rwlb.rds.aliyuncs.com/ai')
engine2 = create_engine(
'mysql+pymysql://ai:LWJRe05rX5oe7rFH@prodwritepolardb.rwlb.rds.aliyuncs.com/ai')
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
data_df['data_dt'] = yesterday
# 将data_df写入MySQL
data_df.rename(columns={'会话组编号': 'group_id',
'客户名称': 'customer_name',
'客户现状': 'current_status',
'客户需求': 'stated_needs',
'客户潜在需求': 'potential_needs',
'客户意向等级': 'intention_level',
'跟进日期': 'followup_date',
'标签更新时间': 'tag_update_time',
'AI_客户购买潜力依据': 'ai_purchase_potential',
'AI_顾问卡点依据': 'ai_consultant_blockers',
'AI_客户需求二级标签': 'ai_secondary_tags',
'AI_客户需求三级标签': 'ai_tertiary_tags',
'AI_销售卡点': 'ai_sales_blockers',
'人工_客户需求二级标签': 'manual_secondary_tags',
'人工_客户需求三级标签': 'manual_tertiary_tags',
'人工_销售卡点': 'manual_sales_blockers',
'核验_客户需求二级标签': 'verified_secondary_tags',
'核验_客户需求三级标签': 'verified_tertiary_tags',
'核验_销售卡点': 'verified_sales_blockers',
'核验_客户需求二级标签打分': 'verified_secondary_scores',
'核验_客户需求三级标签打分': 'verified_tertiary_scores',
'核验_销售卡点打分': 'verified_blocker_scores'
}, inplace=True)
data_df.to_sql(name='ai_sale_original_data', con=engine, if_exists='append', index=False)
data_df.to_sql(name='ai_sale_original_data', con=engine2, if_exists='append', index=False)
to_mysql = pd.read_excel(file_path.replace('csv', 'xlsx'))
# print('输出文件:', file_path.replace('csv', 'xlsx'))
# 核心拆分函数
# online_streaming.py
import json
import logging
import traceback
from typing import Dict
import requests
from requests import RequestException
from tenacity import retry, retry_if_exception_type, wait_fixed, stop_after_attempt, before_log
import time
logger = logging.getLogger()
class AiGenerationException(Exception):
""" AI生成异常 """
def __init__(self, message: str = "AI生成错误"):
self.message = message
def __str__(self):
return f'AiGenerationException: {self.message}'
class online_llm_streaming(object):
""" AI生成 """
base_url = "https://test-copilot.galaxy-immi.com/v1"
def __init__(self, input_query, api_key: str = "app-Ipg9sBRE3FRKYX5TMVO6OthV", route: str = "/workflows/run",
response_mode: str = "blocking"):
self.route = route
self.response_mode = response_mode
self.api_key = api_key
self.inputs = {"input_query": input_query}
def ai_generate(self, url: str, headers: Dict, data: Dict, timeout: int, response_mode: str) -> str:
"""AI生成"""
if response_mode == "blocking":
resp = requests.post(
url=url,
headers=headers,
json=data,
timeout=timeout
)
if resp.status_code != 200:
error_msg = f'AI生成失败,http-status_code:{resp.status_code}\nresponse.text:\n=====\n{resp.text}\n=====\n'
logger.error(error_msg)
raise AiGenerationException(message=error_msg)
res_json = resp.json()
logger.info(f"AI生成返回:\n=====\n{json.dumps(res_json, indent=4, ensure_ascii=False)}\n=====\n")
# 检查状态
if not isinstance(res_json, dict):
error_msg = f'AI生成返回格式错误,res_json不是字典类型'
logger.error(error_msg)
return "{}"
if 'data' not in res_json:
error_msg = f'AI生成返回格式错误,缺少data字段'
logger.error(error_msg)
return "{}"
data = res_json['data']
if not isinstance(data, dict):
error_msg = f'AI生成返回格式错误,data不是字典类型'
logger.error(error_msg)
return "{}"
if 'status' in data and data['status'] == "failed":
error_msg = f'AI生成失败,data.status为failed'
logger.error(error_msg)
return "{}"
if 'outputs' not in data or not isinstance(data['outputs'], dict):
error_msg = f'AI生成返回格式错误,缺少outputs字段或outputs不是字典类型'
logger.error(error_msg)
return "{}"
if 'output' not in data['outputs']:
error_msg = f'AI生成返回格式错误,缺少output字段'
logger.error(error_msg)
return "{}"
return data['outputs']['output']
elif response_mode == "streaming":
resp = requests.post(
url=url,
headers=headers,
json=data,
timeout=1200,
stream=True
)
if resp.status_code != 200:
error_msg = f'AI生成失败,http-status_code:{resp.status_code}\nresponse.text:\n=====\n{resp.text}\n=====\n'
logger.error(error_msg)
raise AiGenerationException(message=error_msg)
result = ""
for chunk in resp.iter_lines():
if not chunk:
continue
try:
_, data = chunk.decode('utf-8').split(':', maxsplit=1)
data = data.strip()
if data == "ping":
continue
chunk_data = json.loads(data)
if not isinstance(chunk_data, dict):
continue
if 'event' not in chunk_data:
continue
if chunk_data['event'] != "workflow_finished":
continue
if 'data' not in chunk_data:
continue
chunk_result = chunk_data['data']
if not isinstance(chunk_result, dict):
continue
if chunk_result.get('status') == 'failed':
error_msg = f'AI生成失败,chunk_data:\n=====\n{chunk_data}\n=====\n'
logger.error(error_msg)
raise AiGenerationException(message=error_msg)
if 'outputs' not in chunk_result or not isinstance(chunk_result['outputs'], dict):
continue
if 'output' not in chunk_result['outputs']:
continue
result += chunk_result['outputs']['output']
except Exception as e:
logger.warning(f"处理数据块时发生错误: {str(e)}")
continue
return result
else:
raise AiGenerationException(message=f"不支持的response_mode:{response_mode}")
def run(self, timeout: int = 600) -> str:
"""运行AI生成"""
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
data = {
"inputs": self.inputs,
"response_mode": self.response_mode,
"user": "fadsf"
}
try:
response_output = self.ai_generate(
url=f"{self.base_url}{self.route}",
headers=headers,
data=data,
timeout=timeout,
response_mode=self.response_mode
)
return response_output
except Exception as e:
logger.error(f"AI生成失败: {str(e)}")
logger.error(traceback.format_exc())
return "{}"
if __name__ == '__main__':
# test1
ai_generate = online_llm_streaming(input_query="香港四大天王是谁?")
res1 = ai_generate.run()
print(res1)
# settings.py
"""配置文件"""
import os
from typing import Dict, Any
# LLM配置
class LLMConfig:
BASE_URL = os.getenv("LLM_BASE_URL", "http://your-llm-service-url")
ROUTE = os.getenv("LLM_ROUTE", "/v1/chat/completions")
API_KEY = os.getenv("LLM_API_KEY", "your-api-key")
MAX_RETRIES = int(os.getenv("LLM_MAX_RETRIES", "3"))
TIMEOUT = int(os.getenv("LLM_TIMEOUT", "30"))
# 文件路径配置
class PathConfig:
CORPUS_FILE = os.getenv("CORPUS_FILE", "sales_corpus.csv")
TAGS_FILE = os.getenv("TAGS_FILE", "sales_tags.csv")
CHANNEL_CONFIG = os.getenv("CHANNEL_CONFIG", "channel_activity.yaml")
LOG_FILE = os.getenv("LOG_FILE", "sales_analysis.log")
# 销售违规场景定义
SALES_VIOLATIONS: Dict[str, list] = {
"虚假信息": ["空壳公司", "挂靠", "不用注销户籍", "自动延续", "享受本地待遇"],
"过度承诺": ["肯定", "一定", "保证", "100%", "绝对"],
"不当建议": ["随便找", "不用着急", "来得及", "自雇"],
"违法建议": ["虚假证明", "造假", "修改材料"],
}
# 销售质量评估标准
SALES_QUALITY_METRICS: Dict[str, Dict[str, Any]] = {
"专业性": {
"weight": 0.3,
"factors": ["产品知识准确", "政策解释清晰", "专业术语使用恰当"]
},
"合规性": {
"weight": 0.3,
"factors": ["无违规承诺", "无虚假信息", "无不当建议"]
},
"服务态度": {
"weight": 0.2,
"factors": ["礼貌用语", "耐心解答", "积极主动"]
},
"沟通技巧": {
"weight": 0.2,
"factors": ["需求挖掘", "异议处理", "总结复述"]
}
}
# 分析配置
class AnalysisConfig:
MIN_DIALOGUE_LENGTH = int(os.getenv("MIN_DIALOGUE_LENGTH", "50"))
MAX_DIALOGUE_LENGTH = int(os.getenv("MAX_DIALOGUE_LENGTH", "3000"))
BATCH_SIZE = int(os.getenv("BATCH_SIZE", "10"))
SCORE_THRESHOLD = float(os.getenv("SCORE_THRESHOLD", "0.6"))
# 日志配置
class LogConfig:
LEVEL = os.getenv("LOG_LEVEL", "INFO")
FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
ENCODING = 'utf-8'
import pandas as pd
from sqlalchemy import create_engine, text
if __name__ == '__main__':
file_path = f'./user_day_research_result/user_day_research_result_20250711.xlsx'
to_mysql = pd.read_excel(file_path)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment