H.U.C_刺客 发表于 2025-5-6 14:58:34

python去重小脚本












# -*- coding: utf-8 -*-
"""
企业文件管理系统 v3.0
功能:1. 网络共享扫描2. 智能文件去重3. 操作审计报告
"""

import os
import csv
import hashlib
import time
import sys
from datetime import datetime
from collections import defaultdict

# 尝试加载GUI模块
try:
    import tkinter as tk
    from tkinter import filedialog, messagebox
    HAS_GUI = True
except ImportError:
    HAS_GUI = False

# 配置参数
DEFAULT_CHUNK_SIZE = 65536# 64KB分块哈希计算
MAX_LOG_ENTRIES = 1000      # 最大日志记录数
SAFE_DIRECTORIES = {'Windows', 'Program Files'}# 受保护目录

class FileManager:
    def __init__(self):
      self.operation_log = []
      self.error_log = []

    def log_operation(self, event_type, path, metadata=None):
      """记录操作日志"""
      entry = {
            "timestamp": datetime.now(),
            "type": event_type,
            "path": path,
            "metadata": metadata or {}
      }
      self.operation_log.append(entry)
      if len(self.operation_log) > MAX_LOG_ENTRIES:
            self.operation_log.pop(0)

    def get_hash(self, file_path):
      """计算文件哈希值(SHA-256)"""
      hasher = hashlib.sha256()
      try:
            with open(file_path, 'rb') as f:
                while chunk := f.read(DEFAULT_CHUNK_SIZE):
                  hasher.update(chunk)
            return hasher.hexdigest()
      except Exception as e:
            self.error_log.append(f"哈希计算失败 [{file_path}]: {str(e)}")
            return None

    def scan_network_share(self, target_path):
      """深度扫描网络共享"""
      scan_report = []
      start_time = time.time()

      def recursive_scan(current_path):
            try:
                with os.scandir(current_path) as entries:
                  for entry in entries:
                        try:
                            if entry.is_dir(follow_symlinks=False):
                              if not entry.name.startswith('$'):
                                    recursive_scan(entry.path)
                            else:
                              stat = entry.stat()
                              scan_report.append({
                                    "path": entry.path,
                                    "size": stat.st_size,
                                    "modified": stat.st_mtime,
                                    "hash": self.get_hash(entry.path)
                              })
                        except PermissionError:
                            self.error_log.append(f"权限不足: {entry.path}")
                        except Exception as e:
                            self.error_log.append(f"扫描错误: {entry.path} ({str(e)})")
            except Exception as e:
                self.error_log.append(f"扫描中止: {current_path} ({str(e)})")

      recursive_scan(target_path)
      self.log_operation("NETSCAN", target_path, {
            "duration": time.time() - start_time,
            "files_scanned": len(scan_report)
      })
      return scan_report

    def deduplicate_files(self, target_path):
      """智能文件去重引擎"""
      hash_registry = defaultdict(list)
      deleted_records = []
      preserved_files = set()

      # 第一阶段:构建哈希索引
      for root, dirs, files in os.walk(target_path):
            # 跳过系统目录
            dirs[:] =
            
            for file in files:
                file_path = os.path.join(root, file)
                file_hash = self.get_hash(file_path)
                if file_hash:
                  hash_registry.append(file_path)

      # 第二阶段:处理重复文件
      for file_hash, files in hash_registry.items():
            if len(files) > 1:
                # 按修改时间排序(保留最新)
                sorted_files = sorted(
                  files,
                  key=lambda x: os.path.getmtime(x),
                  reverse=True
                )
                preserved_file = sorted_files
                preserved_files.add(preserved_file)

                # 记录删除操作
                for duplicate in sorted_files:
                  try:
                        file_stat = os.stat(duplicate)
                        deleted_records.append({
                            "deleted_path": duplicate,
                            "preserved_path": preserved_file,
                            "size": file_stat.st_size,
                            "modified": file_stat.st_mtime,
                            "hash": file_hash
                        })
                  except Exception as e:
                        self.error_log.append(f"记录失败: {duplicate} ({str(e)})")

      # 第三阶段:执行删除
      success_count = 0
      for record in deleted_records:
            try:
                os.remove(record["deleted_path"])
                success_count += 1
                self.log_operation("DELETION", record["deleted_path"], {
                  "preserved": record["preserved_path"],
                  "hash": record["hash"][:12]
                })
            except Exception as e:
                self.error_log.append(f"删除失败: {record['deleted_path']} ({str(e)})")

      # 生成报告
      report_path = self.generate_report(
            target_path,
            preserved_files,
            deleted_records,
            success_count
      )

      return {
            "total_files": sum(len(v) for v in hash_registry.values()),
            "duplicates": len(deleted_records),
            "deleted": success_count,
            "report_path": report_path
      }

    def generate_report(self, target_path, preserved, deleted, success_count):
      """生成审计报告"""
      timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
      report_dir = os.path.join(os.getcwd(), "reports")
      os.makedirs(report_dir, exist_ok=True)
      report_path = os.path.join(report_dir,
            f"DedupeReport_{os.path.basename(target_path)}_{timestamp}.csv")

      with open(report_path, 'w', newline='', encoding='utf-8-sig') as f:
            writer = csv.writer(f)
            
            # Header
            writer.writerow(["企业文件管理审计报告"])
            writer.writerow()
            writer.writerow()
            writer.writerow([])
            
            # 删除记录
            writer.writerow(["删除文件路径", "大小 (MB)", "最后修改时间", "保留文件路径", "哈希片段"])
            for record in deleted:
                writer.writerow([
                  record["deleted_path"],
                  round(record["size"] / 1048576, 2),
                  datetime.fromtimestamp(record["modified"]).strftime('%Y-%m-%d %H:%M'),
                  record["preserved_path"],
                  record["hash"][:12]
                ])
            
            # 统计摘要
            writer.writerow([])
            writer.writerow(["总文件数", sum(len(v) for v in deleted) + len(preserved)])
            writer.writerow(["发现重复", len(deleted)])
            writer.writerow(["成功删除", success_count])
            writer.writerow(["保留文件", len(preserved)])
            writer.writerow(["错误数量", len(self.error_log)])

      return report_path

class UserInterface:
    @staticmethod
    def select_directory(gui_title="选择目录"):
      """跨平台目录选择"""
      if HAS_GUI:
            root = tk.Tk()
            root.withdraw()
            root.attributes('-topmost', True)
            path = filedialog.askdirectory(title=gui_title)
            root.destroy()
            return path
      else:
            print("\n命令行模式:")
            while True:
                path = input("请输入完整路径(或输入Q退出): ").strip()
                if path.upper() == 'Q':
                  return None
                if os.path.isdir(path):
                  return os.path.normpath(path)
                print("错误:无效路径")

    @classmethod
    def display_report(cls, report_path):
      """报告展示"""
      if HAS_GUI:
            root = tk.Tk()
            root.withdraw()
            messagebox.showinfo(
                "操作完成",
                f"报告已生成至:\n{report_path}"
            )
            root.destroy()
      else:
            print(f"\n操作完成,报告路径:\n{report_path}")

if __name__ == "__main__":
    fm = FileManager()
    ui = UserInterface()

    # 操作菜单
    print("\n企业文件管理系统")
    print("1. 扫描网络共享")
    print("2. 清理重复文件")
    print("3. 退出")

    choice = input("请选择操作:").strip()

    if choice == '1':
      target = ui.select_directory("选择扫描目录")
      if target:
            report = fm.scan_network_share(target)
            print(f"扫描完成,发现 {len(report)} 个文件")

    elif choice == '2':
      target = ui.select_directory("选择清理目录")
      if target:
            # 安全确认
            if HAS_GUI:
                confirm = messagebox.askyesno(
                  "确认删除",
                  "将永久删除重复文件!\n请确认已备份重要数据。"
                )
            else:
                confirm = input("将永久删除文件,确认操作?(y/N): ").lower() == 'y'

            if confirm:
                result = fm.deduplicate_files(target)
                ui.display_report(result['report_path'])
                print(f"删除完成:{result['deleted']}/{result['duplicates']} 成功")

    print("\n操作日志:")
    for entry in fm.operation_log[-3:]:
      print(f"[{entry['timestamp']}] {entry['type']}: {entry['path']}")

    if fm.error_log:
      print("\n最近错误:")
      for error in fm.error_log[-3:]:
            print(f"! {error}")

python python去重小脚本.py

H.U.C_刺客 发表于 2025-5-7 13:28:03



python扫描重复文件
import os
import hashlib
import tkinter as tk
from tkinter import filedialog
import xlwt# 改用xlwt生成兼容性更好的xls格式
import time

def select_directory():
    """选择文件夹对话框"""
    root = tk.Tk()
    root.withdraw()
    return filedialog.askdirectory()

def file_hash(filepath):
    """计算文件的MD5哈希值(兼容旧系统)"""
    hash_md5 = hashlib.md5()
    try:
      with open(filepath, "rb") as f:
            while True:
                chunk = f.read(4096)
                if not chunk:
                  break
                hash_md5.update(chunk)
      return hash_md5.hexdigest()
    except Exception as e:
      print(" 无法读取文件 {}: {}".format(filepath, str(e)))
      return None

def scan_files(directory, max_depth=None):
    """扫描目录及子目录文件(兼容旧路径格式)"""
    file_dict = {}
    directory = os.path.normpath(directory)# 规范化路径格式

    for root, dirs, files in os.walk(directory):
      # 计算当前深度
      rel_path = os.path.relpath(root, directory)
      current_depth = rel_path.count(os.sep) if rel_path != '.' else 0
      
      if max_depth is not None and current_depth >= max_depth:
            del dirs[:]
            continue

      for file in files:
            file_path = os.path.join(root, file)
            try:
                file_size = os.path.getsize(file_path)
                mtime = os.path.getmtime(file_path)
                modified_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(mtime))
                file_hash_value = file_hash(file_path)
               
                if file_hash_value:
                  key = (file_hash_value, file_size)
                  if key not in file_dict:
                        file_dict = []
                  file_dict.append({
                        "path": file_path,
                        "size": file_size,
                        "modified": modified_time
                  })
            except Exception as e:
                print(" 跳过文件 {}: {}".format(file_path, str(e)))
    return file_dict

def export_to_excel(duplicates, output_file="duplicates.xls"):
    """导出重复文件到Excel 97-2003格式"""
    wb = xlwt.Workbook(encoding='utf-8')
    ws = wb.add_sheet('重复文件报告')

    # 设置样式
    header_style = xlwt.easyxf(
      'font: bold on; align: wrap on, vert centre, horiz center;'
      'borders: left thin, right thin, top thin, bottom thin;'
    )
    group_style = xlwt.easyxf(
      'pattern: pattern solid, fore_colour pale_blue;'
      'font: bold on;'
    )

    # 写入表头
    ws.write(0, 0, "重复文件组", header_style)
    ws.write(0, 1, "文件路径", header_style)
    ws.write(0, 2, "文件大小 (Bytes)", header_style)
    ws.write(0, 3, "最后修改时间", header_style)
   
    row = 1
    group_num = 1
    for key, files in duplicates.items():
      if len(files) > 1:
            # 合并单元格显示组信息
            ws.write(row, 0, "重复文件组 {} (共 {} 个)".format(group_num, len(files)), group_style)
            ws.write(row, 1, "MD5: {}".format(key), group_style)
            ws.row(row).set_style(xlwt.easyxf('pattern: pattern solid, fore_colour gray25;'))
            
            row +=1
            for file in files:
                ws.write(row, 0, group_num)
                ws.write(row, 1, file["path"])
                ws.write(row, 2, file["size"])
                ws.write(row, 3, file["modified"])
                row +=1
            group_num +=1
   
    # 调整列宽
    ws.col(0).width = 3000
    ws.col(1).width = 12000
    ws.col(2).width = 4000
    ws.col(3).width = 4000

    wb.save(output_file)
    print("报告已生成:{}".format(os.path.abspath(output_file)))

def main():
    """主程序"""
    target_dir = select_directory()
    if not target_dir:
      print("操作已取消")
      return

    try:
      depth_input = input("请输入扫描深度(0=仅当前目录,回车=不限深度): ").strip()
      max_depth = int(depth_input) if depth_input else None
    except ValueError:
      max_depth = None

    print("扫描中...(可能需要较长时间)")
    all_files = scan_files(target_dir, max_depth)
    duplicates = {k: v for k, v in all_files.items() if len(v) > 1}

    if duplicates:
      print("发现 {} 组重复文件".format(len(duplicates)))
      export_to_excel(duplicates)
    else:
      print("未发现重复文件")

if __name__ == "__main__":
    # 配置适用于旧系统的编码环境
    import sys
    if sys.version_info < (3,):
      reload(sys)
      sys.setdefaultencoding('utf-8')
    main()

alglsf666 发表于 2025-5-7 19:59:45

刺客老哥牛逼:)

H.U.C清风 发表于 2025-5-21 07:41:39

谢谢分享,已回复。
页: [1]
查看完整版本: python去重小脚本