python文本合并脚本
做数据集本地化时,用到了文本txt合并问题,用了trae -cn ai辅助测试一下效果,还可以吧,但还是不如人灵光,反复的小错,如果与对成手,应该很简单,这里只做了测试吧,南无阿弥陀佛。哈哈。
开发的独立程序,合并过程如下图:
源码如下:
import os
import argparse
import concurrent.futures
import zipfile
import tkinter as tk
from tkinter import filedialog
from tkinter import ttk # 导入 ttk 模块
from bs4 import BeautifulSoup # 导入 bs4 模块
from html.parser import HTMLParser
import configparser
# 在文件头部添加sys模块导入
import sys
# 在文件头部添加缺失的模块导入
import re # 添加正则模块导入class MyHTMLParser(HTMLParser):def __init__(self):super().__init__()self.text = ""def handle_data(self, data):self.text += datadef process_file(file_path, output_file):encodings = ['utf-8-sig', 'gb18030', 'big5'] # 修改编码优先级last_error = Nonefor encoding in encodings:try:with open(file_path, 'r', encoding=encoding, errors='strict') as infile: # 使用严格模式if file_path.endswith(('.html', '.htm')):content = BeautifulSoup(infile, 'html.parser').get_text()else:content = infile.read()lines = content.splitlines()non_empty_lines = [line for line in lines if line.strip()]cleaned_content = '\n'.join(non_empty_lines) + '\n'try:with open(output_file, 'a', encoding='utf-8-sig') as outfile: # 使用带BOM的UTF-8if cleaned_content:outfile.write(cleaned_content)print(f"成功写入文件: {file_path} (编码: {encoding})") # 添加编码信息return True, Noneelse:return False, Exception("清理后内容为空")except Exception as e:print(f"写入文件失败: {str(e)}")return False, eexcept UnicodeDecodeError as e:last_error = econtinuereturn False, last_error or Exception(f"所有编码尝试失败: {str(last_error)}")def merge_txt_files(input_folder, output_file, progress, root, listbox, total_files_label, current_file_label, max_files=None):try:# 确保输出目录存在os.makedirs(os.path.dirname(output_file), exist_ok=True)# 确保输出文件存在且为空with open(output_file, 'w', encoding='utf-8') as f:f.write('')except Exception as e:listbox.insert(tk.END, f"无法初始化输出文件: {str(e)}")returntxt_files = []for root_dir, _, files in os.walk(input_folder):for file in files:if file.endswith(('.txt', '.html', '.htm')):txt_files.append(os.path.join(root_dir, file))elif file.endswith('.zip'):# 保留zip处理逻辑但简化passif max_files and str(max_files).isdigit():txt_files = txt_files[:int(max_files)]total_files = len(txt_files)if total_files_label:total_files_label.config(text=f"文件总数: {total_files}")for i, file_path in enumerate(txt_files):listbox.insert(tk.END, f"处理中: {file_path}")success, error = process_file(file_path, output_file)if not success:listbox.insert(tk.END, f"处理失败: {file_path}")if error:listbox.insert(tk.END, f"错误详情: {str(error)}")listbox.itemconfig(tk.END, {'fg':'red'}) # 设置错误信息为红色continueif current_file_label:current_file_label.config(text=f"当前进度: {i+1}/{total_files}")if progress:progress['value'] = ((i+1) / total_files) * 100root.update_idletasks()# 删除以下两个函数
def read_history_paths():config = configparser.ConfigParser()config.read('history_paths.ini')input_folder = ""output_file = ""if 'Paths' in config:input_folder = config['Paths'].get('input_folder', '')output_file = config['Paths'].get('output_file', '')return input_folder, output_filedef save_history_paths(input_folder, output_file):config = configparser.ConfigParser()config['Paths'] = {'input_folder': input_folder,'output_file': output_file}with open('history_paths.ini', 'w') as configfile:config.write(configfile)def run_merge():# 获取新路径input_folder = input_folder_entry.get()output_file = output_file_entry.get()max_files = max_files_entry.get()print(f"输入文件夹路径: {input_folder}")print(f"输出文件路径: {output_file}")if input_folder:def update_file_count():input_folder = input_folder_entry.get()if input_folder and os.path.isdir(input_folder):txt_files = []for root_dir, _, files in os.walk(input_folder):for file in files:if file.endswith(('.txt', '.html', '.htm')):txt_files.append(os.path.join(root_dir, file))max_files_entry.delete(0, tk.END)max_files_entry.insert(0, str(len(txt_files)))if input_folder and output_file:# 自动创建输出目录os.makedirs(os.path.dirname(output_file), exist_ok=True)# 路径权限验证if not os.access(os.path.dirname(output_file), os.W_OK):status_label.config(text=f"输出路径不可写: {os.path.dirname(output_file)}")return# 初始化UI组件file_process_listbox.delete(0, tk.END)progress_bar['value'] = 0# 状态标签(添加在进度条下方)status_label = tk.Label(root, text="准备就绪", fg="blue")status_label.grid(row=5, column=0, columnspan=3, pady=5, sticky='ew')# 文件统计标签(添加在状态标签下方)total_files_label = tk.Label(root, text="文件总数: 0")total_files_label.grid(row=6, column=0, padx=5, sticky='e')current_file_label = tk.Label(root, text="当前完成文件数: 0/0")current_file_label.grid(row=6, column=1, padx=5, sticky='w')# 添加滚动条scrollbar = tk.Scrollbar(root)scrollbar.grid(row=4, column=3, sticky='ns')file_process_listbox.config(yscrollcommand=scrollbar.set)scrollbar.config(command=file_process_listbox.yview)# 设置统一字体root.option_add("*Font", "微软雅黑 10")# 调用合并函数merge_txt_files(input_folder, output_file,progress_bar,root,file_process_listbox,total_files_label,current_file_label,max_files)# 保存新的历史路径# 删除以下保存历史路径的代码# save_history_paths(input_folder, output_file)# 确保在文件处理完成后更新状态栏文字status_label.config(text=f"已将 {input_folder} 中的文件合并到 {output_file}")# 移除多余的状态更新代码# if input_folder and output_file:# status_label.config(text=f"已将 {input_folder} 中的文件合并到 {output_file}")def extract_zip(zip_file_path, temp_dir):try:with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:zip_ref.extractall(temp_dir)except Exception as e:print(f"解压文件 {zip_file_path} 时出错: {e}") # 已有日志输出,可保留或添加更多细节# 运行主循环
# 在main模块添加完整的GUI初始化代码
if __name__ == "__main__":root = tk.Tk()root.title("TXT 文件合并工具")root.geometry("800x600")# 输入文件夹组件tk.Label(root, text="输入文件夹:").grid(row=0, column=0, padx=5, pady=5)input_folder_entry = tk.Entry(root, width=50)input_folder_entry.grid(row=0, column=1, padx=5, pady=5)# 输出文件组件 tk.Label(root, text="输出文件:").grid(row=1, column=0, padx=5, pady=5)output_file_entry = tk.Entry(root, width=50)output_file_entry.grid(row=1, column=1, padx=5, pady=5)# 移除这行有问题的代码# output_file_entry.insert(0, output_file_history) # 填充历史路径# 修改输入文件夹浏览按钮tk.Button(root, text="浏览", command=lambda: [input_folder_entry.delete(0, tk.END),input_folder_entry.insert(0, filedialog.askdirectory()),update_file_count() # 添加自动更新文件数量功能]).grid(row=0, column=2, padx=5)tk.Button(root, text="浏览", command=lambda: [output_file_entry.delete(0, tk.END),output_file_entry.insert(0, filedialog.asksaveasfilename())]).grid(row=1, column=2, padx=5)# 最大文件数限制# 最大文件数限制tk.Label(root, text="最大合并文件数:").grid(row=2, column=0, padx=5, pady=5)max_files_entry = tk.Entry(root, width=50)max_files_entry.insert(0, "10000") # 设置默认值为10000max_files_entry.grid(row=2, column=1, padx=5, pady=5)# 进度条和执行按钮progress_bar = ttk.Progressbar(root, orient="horizontal", length=400, mode="determinate")progress_bar.grid(row=3, column=0, columnspan=2, pady=10, sticky='ew')run_button = tk.Button(root, text="开始合并", command=run_merge, bg="#4CAF50", fg="white")run_button.grid(row=3, column=2, padx=10, pady=10, sticky='ew')# 文件处理列表file_process_listbox = tk.Listbox(root, width=100, height=20)file_process_listbox.grid(row=4, column=0, columnspan=3, padx=10, pady=10, sticky="nsew")# 状态标签和统计信息status_label = tk.Label(root, text="准备就绪", fg="blue")status_label.grid(row=5, column=0, columnspan=3, pady=5, sticky='ew')# 配置布局权重# 确保有正确的布局权重设置root.columnconfigure(1, weight=1) # 使中间列可扩展root.rowconfigure(4, weight=1) # 使列表区域可扩展if len(sys.argv) > 1:parser = argparse.ArgumentParser()parser.add_argument('input_folder')parser.add_argument('output_file')args = parser.parse_args()merge_txt_files(args.input_folder, args.output_file, None, None, None, None, None)else:# 确保所有布局代码在 mainloop 之前root.mainloop()
南无阿弥陀佛,哈哈。