| 
 
TA的每日心情|  | 奋斗 2 小时前
 | 
|---|
 签到天数: 3033 天 [LV.Master]伴坛终老 | 
 
|  1.png
(130.74 KB, 下载次数: 0) 
  2.png
(114.06 KB, 下载次数: 0) 
  3.png
(89.65 KB, 下载次数: 0) 
  4.png
(102.08 KB, 下载次数: 0) 
  5.png
(103.72 KB, 下载次数: 0) 
  6.png
(122.47 KB, 下载次数: 0) 
  7.png
(164.87 KB, 下载次数: 0) 
  8.png
(106.19 KB, 下载次数: 0) 
  9.png
(198.37 KB, 下载次数: 0) 
  10.png
(98.63 KB, 下载次数: 0) 
 
 复制代码# -*- coding: utf-8 -*-
"""
企业文件管理系统 v3.0
功能:1. 网络共享扫描  2. 智能文件去重  3. 操作审计报告
"""
import os
import csv
import hashlib
import time
import sys
from datetime import datetime
from collections import defaultdict
# 尝试加载GUI模块
try:
    import tkinter as tk
    from tkinter import filedialog, messagebox
    HAS_GUI = True
except ImportError:
    HAS_GUI = False
# 配置参数
DEFAULT_CHUNK_SIZE = 65536  # 64KB分块哈希计算
MAX_LOG_ENTRIES = 1000      # 最大日志记录数
SAFE_DIRECTORIES = {'Windows', 'Program Files'}  # 受保护目录
class FileManager:
    def __init__(self):
        self.operation_log = []
        self.error_log = []
    def log_operation(self, event_type, path, metadata=None):
        """记录操作日志"""
        entry = {
            "timestamp": datetime.now(),
            "type": event_type,
            "path": path,
            "metadata": metadata or {}
        }
        self.operation_log.append(entry)
        if len(self.operation_log) > MAX_LOG_ENTRIES:
            self.operation_log.pop(0)
    def get_hash(self, file_path):
        """计算文件哈希值(SHA-256)"""
        hasher = hashlib.sha256()
        try:
            with open(file_path, 'rb') as f:
                while chunk := f.read(DEFAULT_CHUNK_SIZE):
                    hasher.update(chunk)
            return hasher.hexdigest()
        except Exception as e:
            self.error_log.append(f"哈希计算失败 [{file_path}]: {str(e)}")
            return None
    def scan_network_share(self, target_path):
        """深度扫描网络共享"""
        scan_report = []
        start_time = time.time()
        def recursive_scan(current_path):
            try:
                with os.scandir(current_path) as entries:
                    for entry in entries:
                        try:
                            if entry.is_dir(follow_symlinks=False):
                                if not entry.name.startswith('$'):
                                    recursive_scan(entry.path)
                            else:
                                stat = entry.stat()
                                scan_report.append({
                                    "path": entry.path,
                                    "size": stat.st_size,
                                    "modified": stat.st_mtime,
                                    "hash": self.get_hash(entry.path)
                                })
                        except PermissionError:
                            self.error_log.append(f"权限不足: {entry.path}")
                        except Exception as e:
                            self.error_log.append(f"扫描错误: {entry.path} ({str(e)})")
            except Exception as e:
                self.error_log.append(f"扫描中止: {current_path} ({str(e)})")
        recursive_scan(target_path)
        self.log_operation("NETSCAN", target_path, {
            "duration": time.time() - start_time,
            "files_scanned": len(scan_report)
        })
        return scan_report
    def deduplicate_files(self, target_path):
        """智能文件去重引擎"""
        hash_registry = defaultdict(list)
        deleted_records = []
        preserved_files = set()
        # 第一阶段:构建哈希索引
        for root, dirs, files in os.walk(target_path):
            # 跳过系统目录
            dirs[:] = [d for d in dirs if d not in SAFE_DIRECTORIES]
            
            for file in files:
                file_path = os.path.join(root, file)
                file_hash = self.get_hash(file_path)
                if file_hash:
                    hash_registry[file_hash].append(file_path)
        # 第二阶段:处理重复文件
        for file_hash, files in hash_registry.items():
            if len(files) > 1:
                # 按修改时间排序(保留最新)
                sorted_files = sorted(
                    files,
                    key=lambda x: os.path.getmtime(x),
                    reverse=True
                )
                preserved_file = sorted_files[0]
                preserved_files.add(preserved_file)
                # 记录删除操作
                for duplicate in sorted_files[1:]:
                    try:
                        file_stat = os.stat(duplicate)
                        deleted_records.append({
                            "deleted_path": duplicate,
                            "preserved_path": preserved_file,
                            "size": file_stat.st_size,
                            "modified": file_stat.st_mtime,
                            "hash": file_hash
                        })
                    except Exception as e:
                        self.error_log.append(f"记录失败: {duplicate} ({str(e)})")
        # 第三阶段:执行删除
        success_count = 0
        for record in deleted_records:
            try:
                os.remove(record["deleted_path"])
                success_count += 1
                self.log_operation("DELETION", record["deleted_path"], {
                    "preserved": record["preserved_path"],
                    "hash": record["hash"][:12]
                })
            except Exception as e:
                self.error_log.append(f"删除失败: {record['deleted_path']} ({str(e)})")
        # 生成报告
        report_path = self.generate_report(
            target_path,
            preserved_files,
            deleted_records,
            success_count
        )
        return {
            "total_files": sum(len(v) for v in hash_registry.values()),
            "duplicates": len(deleted_records),
            "deleted": success_count,
            "report_path": report_path
        }
    def generate_report(self, target_path, preserved, deleted, success_count):
        """生成审计报告"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        report_dir = os.path.join(os.getcwd(), "reports")
        os.makedirs(report_dir, exist_ok=True)
        report_path = os.path.join(report_dir, 
            f"DedupeReport_{os.path.basename(target_path)}_{timestamp}.csv")
        with open(report_path, 'w', newline='', encoding='utf-8-sig') as f:
            writer = csv.writer(f)
            
            # Header
            writer.writerow(["企业文件管理审计报告"])
            writer.writerow([f"目标目录: {target_path}"])
            writer.writerow([f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"])
            writer.writerow([])
            
            # 删除记录
            writer.writerow(["删除文件路径", "大小 (MB)", "最后修改时间", "保留文件路径", "哈希片段"])
            for record in deleted:
                writer.writerow([
                    record["deleted_path"],
                    round(record["size"] / 1048576, 2),
                    datetime.fromtimestamp(record["modified"]).strftime('%Y-%m-%d %H:%M'),
                    record["preserved_path"],
                    record["hash"][:12]
                ])
            
            # 统计摘要
            writer.writerow([])
            writer.writerow(["总文件数", sum(len(v) for v in deleted) + len(preserved)])
            writer.writerow(["发现重复", len(deleted)])
            writer.writerow(["成功删除", success_count])
            writer.writerow(["保留文件", len(preserved)])
            writer.writerow(["错误数量", len(self.error_log)])
        return report_path
class UserInterface:
    @staticmethod
    def select_directory(gui_title="选择目录"):
        """跨平台目录选择"""
        if HAS_GUI:
            root = tk.Tk()
            root.withdraw()
            root.attributes('-topmost', True)
            path = filedialog.askdirectory(title=gui_title)
            root.destroy()
            return path
        else:
            print("\n命令行模式:")
            while True:
                path = input("请输入完整路径(或输入Q退出): ").strip()
                if path.upper() == 'Q':
                    return None
                if os.path.isdir(path):
                    return os.path.normpath(path)
                print("错误:无效路径")
    @classmethod
    def display_report(cls, report_path):
        """报告展示"""
        if HAS_GUI:
            root = tk.Tk()
            root.withdraw()
            messagebox.showinfo(
                "操作完成",
                f"报告已生成至:\n{report_path}"
            )
            root.destroy()
        else:
            print(f"\n操作完成,报告路径:\n{report_path}")
if __name__ == "__main__":
    fm = FileManager()
    ui = UserInterface()
    # 操作菜单
    print("\n企业文件管理系统")
    print("1. 扫描网络共享")
    print("2. 清理重复文件")
    print("3. 退出")
    choice = input("请选择操作:").strip()
    if choice == '1':
        target = ui.select_directory("选择扫描目录")
        if target:
            report = fm.scan_network_share(target)
            print(f"扫描完成,发现 {len(report)} 个文件")
    elif choice == '2':
        target = ui.select_directory("选择清理目录")
        if target:
            # 安全确认
            if HAS_GUI:
                confirm = messagebox.askyesno(
                    "确认删除",
                    "将永久删除重复文件!\n请确认已备份重要数据。"
                )
            else:
                confirm = input("将永久删除文件,确认操作?(y/N): ").lower() == 'y'
            if confirm:
                result = fm.deduplicate_files(target)
                ui.display_report(result['report_path'])
                print(f"删除完成:{result['deleted']}/{result['duplicates']} 成功")
    print("\n操作日志:")
    for entry in fm.operation_log[-3:]:
        print(f"[{entry['timestamp']}] {entry['type']}: {entry['path']}")
    if fm.error_log:
        print("\n最近错误:")
        for error in fm.error_log[-3:]:
            print(f"! {error}")
 | 
 |