python去重小脚本
# -*- coding: utf-8 -*-
"""
企业文件管理系统 v3.0
功能:1. 网络共享扫描2. 智能文件去重3. 操作审计报告
"""
import os
import csv
import hashlib
import time
import sys
from datetime import datetime
from collections import defaultdict
# 尝试加载GUI模块
try:
import tkinter as tk
from tkinter import filedialog, messagebox
HAS_GUI = True
except ImportError:
HAS_GUI = False
# 配置参数
DEFAULT_CHUNK_SIZE = 65536# 64KB分块哈希计算
MAX_LOG_ENTRIES = 1000 # 最大日志记录数
SAFE_DIRECTORIES = {'Windows', 'Program Files'}# 受保护目录
class FileManager:
def __init__(self):
self.operation_log = []
self.error_log = []
def log_operation(self, event_type, path, metadata=None):
"""记录操作日志"""
entry = {
"timestamp": datetime.now(),
"type": event_type,
"path": path,
"metadata": metadata or {}
}
self.operation_log.append(entry)
if len(self.operation_log) > MAX_LOG_ENTRIES:
self.operation_log.pop(0)
def get_hash(self, file_path):
"""计算文件哈希值(SHA-256)"""
hasher = hashlib.sha256()
try:
with open(file_path, 'rb') as f:
while chunk := f.read(DEFAULT_CHUNK_SIZE):
hasher.update(chunk)
return hasher.hexdigest()
except Exception as e:
self.error_log.append(f"哈希计算失败 [{file_path}]: {str(e)}")
return None
def scan_network_share(self, target_path):
"""深度扫描网络共享"""
scan_report = []
start_time = time.time()
def recursive_scan(current_path):
try:
with os.scandir(current_path) as entries:
for entry in entries:
try:
if entry.is_dir(follow_symlinks=False):
if not entry.name.startswith('$'):
recursive_scan(entry.path)
else:
stat = entry.stat()
scan_report.append({
"path": entry.path,
"size": stat.st_size,
"modified": stat.st_mtime,
"hash": self.get_hash(entry.path)
})
except PermissionError:
self.error_log.append(f"权限不足: {entry.path}")
except Exception as e:
self.error_log.append(f"扫描错误: {entry.path} ({str(e)})")
except Exception as e:
self.error_log.append(f"扫描中止: {current_path} ({str(e)})")
recursive_scan(target_path)
self.log_operation("NETSCAN", target_path, {
"duration": time.time() - start_time,
"files_scanned": len(scan_report)
})
return scan_report
def deduplicate_files(self, target_path):
"""智能文件去重引擎"""
hash_registry = defaultdict(list)
deleted_records = []
preserved_files = set()
# 第一阶段:构建哈希索引
for root, dirs, files in os.walk(target_path):
# 跳过系统目录
dirs[:] =
for file in files:
file_path = os.path.join(root, file)
file_hash = self.get_hash(file_path)
if file_hash:
hash_registry.append(file_path)
# 第二阶段:处理重复文件
for file_hash, files in hash_registry.items():
if len(files) > 1:
# 按修改时间排序(保留最新)
sorted_files = sorted(
files,
key=lambda x: os.path.getmtime(x),
reverse=True
)
preserved_file = sorted_files
preserved_files.add(preserved_file)
# 记录删除操作
for duplicate in sorted_files:
try:
file_stat = os.stat(duplicate)
deleted_records.append({
"deleted_path": duplicate,
"preserved_path": preserved_file,
"size": file_stat.st_size,
"modified": file_stat.st_mtime,
"hash": file_hash
})
except Exception as e:
self.error_log.append(f"记录失败: {duplicate} ({str(e)})")
# 第三阶段:执行删除
success_count = 0
for record in deleted_records:
try:
os.remove(record["deleted_path"])
success_count += 1
self.log_operation("DELETION", record["deleted_path"], {
"preserved": record["preserved_path"],
"hash": record["hash"][:12]
})
except Exception as e:
self.error_log.append(f"删除失败: {record['deleted_path']} ({str(e)})")
# 生成报告
report_path = self.generate_report(
target_path,
preserved_files,
deleted_records,
success_count
)
return {
"total_files": sum(len(v) for v in hash_registry.values()),
"duplicates": len(deleted_records),
"deleted": success_count,
"report_path": report_path
}
def generate_report(self, target_path, preserved, deleted, success_count):
"""生成审计报告"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_dir = os.path.join(os.getcwd(), "reports")
os.makedirs(report_dir, exist_ok=True)
report_path = os.path.join(report_dir,
f"DedupeReport_{os.path.basename(target_path)}_{timestamp}.csv")
with open(report_path, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.writer(f)
# Header
writer.writerow(["企业文件管理审计报告"])
writer.writerow()
writer.writerow()
writer.writerow([])
# 删除记录
writer.writerow(["删除文件路径", "大小 (MB)", "最后修改时间", "保留文件路径", "哈希片段"])
for record in deleted:
writer.writerow([
record["deleted_path"],
round(record["size"] / 1048576, 2),
datetime.fromtimestamp(record["modified"]).strftime('%Y-%m-%d %H:%M'),
record["preserved_path"],
record["hash"][:12]
])
# 统计摘要
writer.writerow([])
writer.writerow(["总文件数", sum(len(v) for v in deleted) + len(preserved)])
writer.writerow(["发现重复", len(deleted)])
writer.writerow(["成功删除", success_count])
writer.writerow(["保留文件", len(preserved)])
writer.writerow(["错误数量", len(self.error_log)])
return report_path
class UserInterface:
@staticmethod
def select_directory(gui_title="选择目录"):
"""跨平台目录选择"""
if HAS_GUI:
root = tk.Tk()
root.withdraw()
root.attributes('-topmost', True)
path = filedialog.askdirectory(title=gui_title)
root.destroy()
return path
else:
print("\n命令行模式:")
while True:
path = input("请输入完整路径(或输入Q退出): ").strip()
if path.upper() == 'Q':
return None
if os.path.isdir(path):
return os.path.normpath(path)
print("错误:无效路径")
@classmethod
def display_report(cls, report_path):
"""报告展示"""
if HAS_GUI:
root = tk.Tk()
root.withdraw()
messagebox.showinfo(
"操作完成",
f"报告已生成至:\n{report_path}"
)
root.destroy()
else:
print(f"\n操作完成,报告路径:\n{report_path}")
if __name__ == "__main__":
fm = FileManager()
ui = UserInterface()
# 操作菜单
print("\n企业文件管理系统")
print("1. 扫描网络共享")
print("2. 清理重复文件")
print("3. 退出")
choice = input("请选择操作:").strip()
if choice == '1':
target = ui.select_directory("选择扫描目录")
if target:
report = fm.scan_network_share(target)
print(f"扫描完成,发现 {len(report)} 个文件")
elif choice == '2':
target = ui.select_directory("选择清理目录")
if target:
# 安全确认
if HAS_GUI:
confirm = messagebox.askyesno(
"确认删除",
"将永久删除重复文件!\n请确认已备份重要数据。"
)
else:
confirm = input("将永久删除文件,确认操作?(y/N): ").lower() == 'y'
if confirm:
result = fm.deduplicate_files(target)
ui.display_report(result['report_path'])
print(f"删除完成:{result['deleted']}/{result['duplicates']} 成功")
print("\n操作日志:")
for entry in fm.operation_log[-3:]:
print(f"[{entry['timestamp']}] {entry['type']}: {entry['path']}")
if fm.error_log:
print("\n最近错误:")
for error in fm.error_log[-3:]:
print(f"! {error}")
python python去重小脚本.py
python扫描重复文件
import os
import hashlib
import tkinter as tk
from tkinter import filedialog
import xlwt# 改用xlwt生成兼容性更好的xls格式
import time
def select_directory():
"""选择文件夹对话框"""
root = tk.Tk()
root.withdraw()
return filedialog.askdirectory()
def file_hash(filepath):
"""计算文件的MD5哈希值(兼容旧系统)"""
hash_md5 = hashlib.md5()
try:
with open(filepath, "rb") as f:
while True:
chunk = f.read(4096)
if not chunk:
break
hash_md5.update(chunk)
return hash_md5.hexdigest()
except Exception as e:
print(" 无法读取文件 {}: {}".format(filepath, str(e)))
return None
def scan_files(directory, max_depth=None):
"""扫描目录及子目录文件(兼容旧路径格式)"""
file_dict = {}
directory = os.path.normpath(directory)# 规范化路径格式
for root, dirs, files in os.walk(directory):
# 计算当前深度
rel_path = os.path.relpath(root, directory)
current_depth = rel_path.count(os.sep) if rel_path != '.' else 0
if max_depth is not None and current_depth >= max_depth:
del dirs[:]
continue
for file in files:
file_path = os.path.join(root, file)
try:
file_size = os.path.getsize(file_path)
mtime = os.path.getmtime(file_path)
modified_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(mtime))
file_hash_value = file_hash(file_path)
if file_hash_value:
key = (file_hash_value, file_size)
if key not in file_dict:
file_dict = []
file_dict.append({
"path": file_path,
"size": file_size,
"modified": modified_time
})
except Exception as e:
print(" 跳过文件 {}: {}".format(file_path, str(e)))
return file_dict
def export_to_excel(duplicates, output_file="duplicates.xls"):
"""导出重复文件到Excel 97-2003格式"""
wb = xlwt.Workbook(encoding='utf-8')
ws = wb.add_sheet('重复文件报告')
# 设置样式
header_style = xlwt.easyxf(
'font: bold on; align: wrap on, vert centre, horiz center;'
'borders: left thin, right thin, top thin, bottom thin;'
)
group_style = xlwt.easyxf(
'pattern: pattern solid, fore_colour pale_blue;'
'font: bold on;'
)
# 写入表头
ws.write(0, 0, "重复文件组", header_style)
ws.write(0, 1, "文件路径", header_style)
ws.write(0, 2, "文件大小 (Bytes)", header_style)
ws.write(0, 3, "最后修改时间", header_style)
row = 1
group_num = 1
for key, files in duplicates.items():
if len(files) > 1:
# 合并单元格显示组信息
ws.write(row, 0, "重复文件组 {} (共 {} 个)".format(group_num, len(files)), group_style)
ws.write(row, 1, "MD5: {}".format(key), group_style)
ws.row(row).set_style(xlwt.easyxf('pattern: pattern solid, fore_colour gray25;'))
row +=1
for file in files:
ws.write(row, 0, group_num)
ws.write(row, 1, file["path"])
ws.write(row, 2, file["size"])
ws.write(row, 3, file["modified"])
row +=1
group_num +=1
# 调整列宽
ws.col(0).width = 3000
ws.col(1).width = 12000
ws.col(2).width = 4000
ws.col(3).width = 4000
wb.save(output_file)
print("报告已生成:{}".format(os.path.abspath(output_file)))
def main():
"""主程序"""
target_dir = select_directory()
if not target_dir:
print("操作已取消")
return
try:
depth_input = input("请输入扫描深度(0=仅当前目录,回车=不限深度): ").strip()
max_depth = int(depth_input) if depth_input else None
except ValueError:
max_depth = None
print("扫描中...(可能需要较长时间)")
all_files = scan_files(target_dir, max_depth)
duplicates = {k: v for k, v in all_files.items() if len(v) > 1}
if duplicates:
print("发现 {} 组重复文件".format(len(duplicates)))
export_to_excel(duplicates)
else:
print("未发现重复文件")
if __name__ == "__main__":
# 配置适用于旧系统的编码环境
import sys
if sys.version_info < (3,):
reload(sys)
sys.setdefaultencoding('utf-8')
main() 刺客老哥牛逼:) 谢谢分享,已回复。
页:
[1]