# ====================== 导入模块区 ======================
from flask import Flask, request, render_template_string, session, redirect, url_for
import openai
import io
import re
import zipfile
import xml.etree.ElementTree as ET
import traceback
from functools import wraps
import subprocess
import shutil
# 文件解析依赖兼容判断
# docx
try:
from docx import Document
HAS_DOCX = True
except ImportError:
HAS_DOCX = False
print("[WARN] python-docx未安装,docx解析不可用")
# PyPDF2
try:
from PyPDF2 import PdfReader
HAS_PYPDF2 = True
except ImportError:
HAS_PYPDF2 = False
print("[WARN] PyPDF2未安装,pdf解析不可用")
# pdfplumber
try:
import pdfplumber
HAS_PDFPLUMBER = True
except ImportError:
HAS_PDFPLUMBER = False
print("[WARN] pdfplumber未安装,高质量pdf解析不可用")
# ====================== Flask 基础配置 ======================
app = Flask(__name__)
app.secret_key = "luowenfanwenji547b4f7882714c2994cd9155671c76e7.M1Lx5hRGdBlpq8Ty"
app.config["MAX_CONTENT_LENGTH"] = 16 * 1024 * 1024 # 16MB 文件限制
# 大模型GLM4-Flash配置
openai.api_key = "sk-替换成你的智谱API密钥"
openai.api_base = "https://open.bigmodel.cn/api/paas/v4"
MODEL_NAME = "glm-4-flash"
# 登录用户
USERS = {
"admin": "123456"
}
# 支持上传文件格式
ALLOWED_EXTENSIONS = {
'txt', 'md', 'csv', 'log', 'json', 'xml',
'docx', 'doc', 'pdf',
'rtf', 'odt',
'html', 'htm',
}
FORMAT_LABELS = {
'txt': 'TXT纯文本', 'md': 'Markdown', 'csv': 'CSV表格',
'log': '日志文件', 'json': 'JSON', 'xml': 'XML',
'docx': 'Word文档', 'doc': '旧版Word',
'pdf': 'PDF文档', 'rtf': 'RTF富文本',
'odt': 'OpenDocument', 'html': 'HTML网页', 'htm': 'HTML网页',
}
# ====================== 工具函数 ======================
def allowed_file(filename):
"""校验文件后缀是否合法"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# ---------------------- 文件解析器 子函数 ----------------------
def _extract_txt(raw_bytes):
"""纯文本多编码兼容解析"""
enc_list = ['utf-8', 'utf-8-sig', 'gbk', 'gb2312', 'gb18030', 'big5', 'euc-jp', 'shift_jis', 'latin-1']
for enc in enc_list:
try:
text = raw_bytes.decode(enc)
if text.strip():
return text
except (UnicodeDecodeError, LookupError):
continue
# 兜底忽略错误解码
return raw_bytes.decode('utf-8', errors='ignore')
def _extract_csv(raw_bytes):
"""CSV解析,清理空行"""
text = _extract_txt(raw_bytes)
lines = [line.strip() for line in text.split('\n') if line.strip()]
return '\n'.join(lines)
def _extract_docx_xml(raw_bytes):
"""docx基础XML解析:正文+页眉页脚"""
W_NS = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'
try:
with zipfile.ZipFile(io.BytesIO(raw_bytes), 'r') as z:
target_files = [
n for n in z.namelist()
if n == 'word/document.xml' or n.startswith('word/header') or n.startswith('word/footer')
]
all_lines = []
for fname in target_files:
try:
root = ET.fromstring(z.read(fname))
for p_node in root.iter(f'{W_NS}p'):
txt_list = [t.text for t in p_node.iter(f'{W_NS}t') if t.text]
line = ''.join(txt_list).strip()
if line:
all_lines.append(line)
except Exception:
continue
# 去重相邻重复行
dedup = []
for line in all_lines:
if not dedup or line != dedup[-1]:
dedup.append(line)
print(f"[docx-XML]提取{len(dedup)}行")
return '\n'.join(dedup) if dedup else ""
except Exception as e:
print(f"[docx-XML]解析失败: {e}")
return ""
def _extract_docx_full_scan(raw_bytes):
"""docx全量XML扫描:文本框、形状、绘图文字兜底"""
W_NS = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'
A_NS = '{http://schemas.openxmlformats.org/drawingml/2006/main}'
try:
with zipfile.ZipFile(io.BytesIO(raw_bytes), 'r') as z:
xml_files = [f for f in z.namelist() if f.endswith('.xml')]
all_lines = []
for fname in xml_files:
try:
root = ET.fromstring(z.read(fname))
# Word正文文本
for p_node in root.iter(f'{W_NS}p'):
txts = [t.text for t in p_node.iter(f'{W_NS}t') if t.text]
line = ''.join(txts).strip()
if line:
all_lines.append(line)
# 绘图/形状文本
for sp_node in root.iter(f'{A_NS}sp'):
txts = [t.text for t in sp_node.iter(f'{A_NS}t') if t.text]
line = ''.join(txts).strip()
if line and line not in all_lines:
all_lines.append(line)
except Exception:
continue
dedup = []
for line in all_lines:
if not dedup or line != dedup[-1]:
dedup.append(line)
print(f"[docx-全量扫描]解析{len(xml_files)}个XML,提取{len(dedup)}行")
return '\n'.join(dedup) if dedup else ""
except Exception as e:
print(f"[docx-全量扫描]失败: {e}")
return ""
def _extract_docx(raw_bytes):
"""docx主解析逻辑:优先python-docx,失败走XML兜底"""
parts = []
# 方案1 python-docx
if HAS_DOCX:
try:
doc = Document(io.BytesIO(raw_bytes))
# 段落
for p in doc.paragraphs:
t = p.text.strip()
if t:
parts.append(t)
# 表格
for table in doc.tables:
for row in table.rows:
cells = [cell.text.strip() for cell in row.cells if cell.text.strip()]
if cells:
parts.append(' | '.join(cells))
if parts:
print(f"[docx]常规提取{len(parts)}段")
return '\n'.join(parts)
except Exception as e:
print(f"[docx]python-docx解析失败: {e}")
# 方案2 页眉页脚XML
xml_res = _extract_docx_xml(raw_bytes)
if xml_res:
return xml_res
# 方案3 全量扫描兜底
return _extract_docx_full_scan(raw_bytes)
def _extract_doc(raw_bytes):
"""旧版 .doc 文件多重解析策略"""
# 策略1 antiword命令行
if shutil.which("antiword"):
try:
proc = subprocess.run(
["antiword", "-"],
input=raw_bytes, capture_output=True, timeout=15
)
if proc.returncode == 0 and proc.stdout:
text = proc.stdout.decode("utf-8", errors="ignore")
if text.strip():
print("[doc] antiword提取成功")
return text
except Exception:
pass
# 策略2 catdoc
if shutil.which("catdoc"):
try:
proc = subprocess.run(
["catdoc", "-"],
input=raw_bytes, capture_output=True, timeout=15
)
if proc.returncode == 0 and proc.stdout:
text = proc.stdout.decode("utf-8", errors="ignore")
if text.strip():
print("[doc] catdoc提取成功")
return text
except Exception:
pass
# 策略3 伪装docx兼容
if HAS_DOCX:
try:
doc = Document(io.BytesIO(raw_bytes))
parts = [p.text.strip() for p in doc.paragraphs if p.text.strip()]
if parts:
print("[doc] python-docx兼容提取成功")
return '\n'.join(parts)
except Exception:
pass
# 策略4 UTF16LE二进制提取
try:
text = raw_bytes.decode("utf-16-le", errors="ignore")
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', "", text)
text = re.sub(r'\n{3,}', '\n\n', text)
matches = re.findall(r'[\u4e00-\u9fff\w\s,.:;!?,。、:;!?\-()()/]+', text)
res = '\n'.join(chunk.strip() for chunk in matches if len(chunk.strip()) > 1)
if res and len(res) > 30:
print(f"[doc]二进制提取成功,字符数:{len(res)}")
return res
except Exception:
pass
# 策略5 GBK兜底
try:
text = raw_bytes.decode("gbk", errors="ignore")
matches = re.findall(r'[\u4e00-\u9fff\w\s,.:;!?,。、:;!?\-()()/]+', text)
res = '\n'.join(chunk.strip() for chunk in matches if len(chunk.strip()) > 1)
if res and len(res) > 30:
return res
except Exception:
pass
return ""
def _extract_pdf(raw_bytes):
"""PDF解析:优先pdfplumber,兜底PyPDF2"""
# 方案1 pdfplumber高质量
if HAS_PDFPLUMBER:
try:
with pdfplumber.open(io.BytesIO(raw_bytes)) as pdf:
parts = []
for page in pdf.pages:
page_txt = page.extract_text()
if page_txt and page_txt.strip():
parts.append(page_txt.strip())
if parts:
print(f"[pdf] pdfplumber提取{len(parts)}页")
return '\n'.join(parts)
except Exception as e:
print(f"[pdf] pdfplumber失败: {e}")
# 方案2 PyPDF2
if HAS_PYPDF2:
try:
reader = PdfReader(io.BytesIO(raw_bytes))
parts = []
for page in reader.pages:
page_txt = page.extract_text()
if page_txt and page_txt.strip():
parts.append(page_txt.strip())
if parts:
print(f"[pdf] PyPDF2提取{len(parts)}页")
return '\n'.join(parts)
except Exception as e:
print(f"[pdf] PyPDF2失败: {e}")
return ""
def _extract_rtf(raw_bytes):
"""RTF富文本清洗提取文字"""
text = _extract_txt(raw_bytes)
if not text.strip().startswith(r'{\rtf'):
return text
# 移除RTF资源组
text = re.sub(r'\{\\fonttbl[^}]*\}', "", text)
text = re.sub(r'\{\\colortbl[^}]*\}', "", text)
text = re.sub(r'\{\\stylesheet[^}]*\}', "", text)
text = re.sub(r'\{\\\*\\[^}]*\}', "", text)
# Unicode转义
text = re.sub(r"\\u(\d+)\??", lambda m: chr(int(m.group(1))), text)
# 移除控制符
text = re.sub(r'\\[a-zA-Z]+\d*\s?', ' ', text)
text = re.sub(r"\\'[0-9a-fA-F]{2}", "", text)
text = re.sub(r'[{}]', "", text)
# 空白清理
text = re.sub(r' +', ' ', text)
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
def _extract_odt(raw_bytes):
"""ODT OpenDocument文本提取"""
try:
with zipfile.ZipFile(io.BytesIO(raw_bytes), 'r') as z:
if "content.xml" in z.namelist():
xml_content = z.read("content.xml").decode("utf-8")
text = re.sub(r'<[^>]+>', '\n', xml_content)
text = re.sub(r'\n{2,}', '\n', text)
text = re.sub(r'[ \t]+', ' ', text)
lines = [l.strip() for l in text.split('\n') if l.strip()]
return '\n'.join(lines)
except Exception:
pass
return ""
def _extract_html(raw_bytes):
"""HTML网页纯文本提取,过滤标签、脚本、注释"""
text = _extract_txt(raw_bytes)
# 移除脚本、样式、注释
text = re.sub(r'', "", text, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'', "", text, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'', "", text, flags=re.DOTALL)
# 换行标签替换
text = re.sub(r'
', '\n', text, flags=re.IGNORECASE)
block_tags = r'(p|div|li|h[1-6]|tr|td|th|blockquote|section)>'
text = re.sub(block_tags, '\n', text, flags=re.IGNORECASE)
# 删除所有标签
text = re.sub(r'<[^>]+>', "", text)
# HTML实体替换
entity_map = {
' ': ' ', '&': '&', '<': '<', '>': '>',
'"': '"', ''': "'", ' ': ' ', ' ': ' '
}
for ent, char in entity_map.items():
text = text.replace(ent, char)
text = re.sub(r'(\d+);', lambda m: chr(int(m.group(1))), text)
# 空白压缩
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r'[ \t]+', ' ', text)
return text.strip()
def parse_file(filename, raw_bytes):
"""文件统一分发解析入口,返回 (文本内容, 错误信息)"""
ext = filename.rsplit('.', 1)[1].lower()
extract_map = {
'txt': _extract_txt, 'md': _extract_txt,
'csv': _extract_csv, 'log': _extract_txt,
'json': _extract_txt, 'xml': _extract_txt,
'docx': _extract_docx, 'doc': _extract_doc,
'pdf': _extract_pdf,
'rtf': _extract_rtf, 'odt': _extract_odt,
'html': _extract_html, 'htm': _extract_html,
}
extract_func = extract_map.get(ext, _extract_txt)
try:
content = extract_func(raw_bytes)
if content and content.strip():
return content.strip(), None
else:
return None, f"文件内容为空({ext}格式无可用文字)"
except Exception as e:
err_log = traceback.format_exc()
print(f"[文件解析异常] {ext}:\n{err_log}")
return None, f"解析失败:{str(e)}"
# ====================== 登录鉴权装饰器 ======================
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if "username" not in session:
return redirect(url_for("login"))
return f(*args, **kwargs)
return decorated
# ====================== 前端HTML模板字符串 ======================
LOGIN_HTML = """