TA的每日心情 | 奋斗 6 小时前 |
---|
签到天数: 2929 天 [LV.Master]伴坛终老
|
1.png
(130.74 KB, 下载次数: 0)
2.png
(114.06 KB, 下载次数: 0)
3.png
(89.65 KB, 下载次数: 0)
4.png
(102.08 KB, 下载次数: 0)
5.png
(103.72 KB, 下载次数: 0)
6.png
(122.47 KB, 下载次数: 0)
7.png
(164.87 KB, 下载次数: 0)
8.png
(106.19 KB, 下载次数: 0)
9.png
(198.37 KB, 下载次数: 0)
10.png
(98.63 KB, 下载次数: 0)
- # -*- coding: utf-8 -*-
- """
- 企业文件管理系统 v3.0
- 功能:1. 网络共享扫描 2. 智能文件去重 3. 操作审计报告
- """
- import os
- import csv
- import hashlib
- import time
- import sys
- from datetime import datetime
- from collections import defaultdict
- # 尝试加载GUI模块
- try:
- import tkinter as tk
- from tkinter import filedialog, messagebox
- HAS_GUI = True
- except ImportError:
- HAS_GUI = False
- # 配置参数
- DEFAULT_CHUNK_SIZE = 65536 # 64KB分块哈希计算
- MAX_LOG_ENTRIES = 1000 # 最大日志记录数
- SAFE_DIRECTORIES = {'Windows', 'Program Files'} # 受保护目录
- class FileManager:
- def __init__(self):
- self.operation_log = []
- self.error_log = []
- def log_operation(self, event_type, path, metadata=None):
- """记录操作日志"""
- entry = {
- "timestamp": datetime.now(),
- "type": event_type,
- "path": path,
- "metadata": metadata or {}
- }
- self.operation_log.append(entry)
- if len(self.operation_log) > MAX_LOG_ENTRIES:
- self.operation_log.pop(0)
- def get_hash(self, file_path):
- """计算文件哈希值(SHA-256)"""
- hasher = hashlib.sha256()
- try:
- with open(file_path, 'rb') as f:
- while chunk := f.read(DEFAULT_CHUNK_SIZE):
- hasher.update(chunk)
- return hasher.hexdigest()
- except Exception as e:
- self.error_log.append(f"哈希计算失败 [{file_path}]: {str(e)}")
- return None
- def scan_network_share(self, target_path):
- """深度扫描网络共享"""
- scan_report = []
- start_time = time.time()
- def recursive_scan(current_path):
- try:
- with os.scandir(current_path) as entries:
- for entry in entries:
- try:
- if entry.is_dir(follow_symlinks=False):
- if not entry.name.startswith('$'):
- recursive_scan(entry.path)
- else:
- stat = entry.stat()
- scan_report.append({
- "path": entry.path,
- "size": stat.st_size,
- "modified": stat.st_mtime,
- "hash": self.get_hash(entry.path)
- })
- except PermissionError:
- self.error_log.append(f"权限不足: {entry.path}")
- except Exception as e:
- self.error_log.append(f"扫描错误: {entry.path} ({str(e)})")
- except Exception as e:
- self.error_log.append(f"扫描中止: {current_path} ({str(e)})")
- recursive_scan(target_path)
- self.log_operation("NETSCAN", target_path, {
- "duration": time.time() - start_time,
- "files_scanned": len(scan_report)
- })
- return scan_report
- def deduplicate_files(self, target_path):
- """智能文件去重引擎"""
- hash_registry = defaultdict(list)
- deleted_records = []
- preserved_files = set()
- # 第一阶段:构建哈希索引
- for root, dirs, files in os.walk(target_path):
- # 跳过系统目录
- dirs[:] = [d for d in dirs if d not in SAFE_DIRECTORIES]
-
- for file in files:
- file_path = os.path.join(root, file)
- file_hash = self.get_hash(file_path)
- if file_hash:
- hash_registry[file_hash].append(file_path)
- # 第二阶段:处理重复文件
- for file_hash, files in hash_registry.items():
- if len(files) > 1:
- # 按修改时间排序(保留最新)
- sorted_files = sorted(
- files,
- key=lambda x: os.path.getmtime(x),
- reverse=True
- )
- preserved_file = sorted_files[0]
- preserved_files.add(preserved_file)
- # 记录删除操作
- for duplicate in sorted_files[1:]:
- try:
- file_stat = os.stat(duplicate)
- deleted_records.append({
- "deleted_path": duplicate,
- "preserved_path": preserved_file,
- "size": file_stat.st_size,
- "modified": file_stat.st_mtime,
- "hash": file_hash
- })
- except Exception as e:
- self.error_log.append(f"记录失败: {duplicate} ({str(e)})")
- # 第三阶段:执行删除
- success_count = 0
- for record in deleted_records:
- try:
- os.remove(record["deleted_path"])
- success_count += 1
- self.log_operation("DELETION", record["deleted_path"], {
- "preserved": record["preserved_path"],
- "hash": record["hash"][:12]
- })
- except Exception as e:
- self.error_log.append(f"删除失败: {record['deleted_path']} ({str(e)})")
- # 生成报告
- report_path = self.generate_report(
- target_path,
- preserved_files,
- deleted_records,
- success_count
- )
- return {
- "total_files": sum(len(v) for v in hash_registry.values()),
- "duplicates": len(deleted_records),
- "deleted": success_count,
- "report_path": report_path
- }
- def generate_report(self, target_path, preserved, deleted, success_count):
- """生成审计报告"""
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- report_dir = os.path.join(os.getcwd(), "reports")
- os.makedirs(report_dir, exist_ok=True)
- report_path = os.path.join(report_dir,
- f"DedupeReport_{os.path.basename(target_path)}_{timestamp}.csv")
- with open(report_path, 'w', newline='', encoding='utf-8-sig') as f:
- writer = csv.writer(f)
-
- # Header
- writer.writerow(["企业文件管理审计报告"])
- writer.writerow([f"目标目录: {target_path}"])
- writer.writerow([f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"])
- writer.writerow([])
-
- # 删除记录
- writer.writerow(["删除文件路径", "大小 (MB)", "最后修改时间", "保留文件路径", "哈希片段"])
- for record in deleted:
- writer.writerow([
- record["deleted_path"],
- round(record["size"] / 1048576, 2),
- datetime.fromtimestamp(record["modified"]).strftime('%Y-%m-%d %H:%M'),
- record["preserved_path"],
- record["hash"][:12]
- ])
-
- # 统计摘要
- writer.writerow([])
- writer.writerow(["总文件数", sum(len(v) for v in deleted) + len(preserved)])
- writer.writerow(["发现重复", len(deleted)])
- writer.writerow(["成功删除", success_count])
- writer.writerow(["保留文件", len(preserved)])
- writer.writerow(["错误数量", len(self.error_log)])
- return report_path
- class UserInterface:
- @staticmethod
- def select_directory(gui_title="选择目录"):
- """跨平台目录选择"""
- if HAS_GUI:
- root = tk.Tk()
- root.withdraw()
- root.attributes('-topmost', True)
- path = filedialog.askdirectory(title=gui_title)
- root.destroy()
- return path
- else:
- print("\n命令行模式:")
- while True:
- path = input("请输入完整路径(或输入Q退出): ").strip()
- if path.upper() == 'Q':
- return None
- if os.path.isdir(path):
- return os.path.normpath(path)
- print("错误:无效路径")
- @classmethod
- def display_report(cls, report_path):
- """报告展示"""
- if HAS_GUI:
- root = tk.Tk()
- root.withdraw()
- messagebox.showinfo(
- "操作完成",
- f"报告已生成至:\n{report_path}"
- )
- root.destroy()
- else:
- print(f"\n操作完成,报告路径:\n{report_path}")
- if __name__ == "__main__":
- fm = FileManager()
- ui = UserInterface()
- # 操作菜单
- print("\n企业文件管理系统")
- print("1. 扫描网络共享")
- print("2. 清理重复文件")
- print("3. 退出")
- choice = input("请选择操作:").strip()
- if choice == '1':
- target = ui.select_directory("选择扫描目录")
- if target:
- report = fm.scan_network_share(target)
- print(f"扫描完成,发现 {len(report)} 个文件")
- elif choice == '2':
- target = ui.select_directory("选择清理目录")
- if target:
- # 安全确认
- if HAS_GUI:
- confirm = messagebox.askyesno(
- "确认删除",
- "将永久删除重复文件!\n请确认已备份重要数据。"
- )
- else:
- confirm = input("将永久删除文件,确认操作?(y/N): ").lower() == 'y'
- if confirm:
- result = fm.deduplicate_files(target)
- ui.display_report(result['report_path'])
- print(f"删除完成:{result['deleted']}/{result['duplicates']} 成功")
- print("\n操作日志:")
- for entry in fm.operation_log[-3:]:
- print(f"[{entry['timestamp']}] {entry['type']}: {entry['path']}")
- if fm.error_log:
- print("\n最近错误:")
- for error in fm.error_log[-3:]:
- print(f"! {error}")
复制代码
|
|