设为首页收藏本站
查看: 53|回复: 3

python去重小脚本

[复制链接]
  • TA的每日心情
    奋斗
    6 小时前
  • 签到天数: 2929 天

    [LV.Master]伴坛终老

    发表于 3 天前 | 显示全部楼层 |阅读模式
    1.png (130.74 KB, 下载次数: 0)
    2.png (114.06 KB, 下载次数: 0)
    3.png (89.65 KB, 下载次数: 0)
    4.png (102.08 KB, 下载次数: 0)
    5.png (103.72 KB, 下载次数: 0)
    6.png (122.47 KB, 下载次数: 0)
    7.png (164.87 KB, 下载次数: 0)
    8.png (106.19 KB, 下载次数: 0)
    9.png (198.37 KB, 下载次数: 0)
    10.png (98.63 KB, 下载次数: 0)

    1. # -*- coding: utf-8 -*-
    2. """
    3. 企业文件管理系统 v3.0
    4. 功能:1. 网络共享扫描  2. 智能文件去重  3. 操作审计报告
    5. """

    6. import os
    7. import csv
    8. import hashlib
    9. import time
    10. import sys
    11. from datetime import datetime
    12. from collections import defaultdict

    13. # 尝试加载GUI模块
    14. try:
    15.     import tkinter as tk
    16.     from tkinter import filedialog, messagebox
    17.     HAS_GUI = True
    18. except ImportError:
    19.     HAS_GUI = False

    20. # 配置参数
    21. DEFAULT_CHUNK_SIZE = 65536  # 64KB分块哈希计算
    22. MAX_LOG_ENTRIES = 1000      # 最大日志记录数
    23. SAFE_DIRECTORIES = {'Windows', 'Program Files'}  # 受保护目录

    24. class FileManager:
    25.     def __init__(self):
    26.         self.operation_log = []
    27.         self.error_log = []

    28.     def log_operation(self, event_type, path, metadata=None):
    29.         """记录操作日志"""
    30.         entry = {
    31.             "timestamp": datetime.now(),
    32.             "type": event_type,
    33.             "path": path,
    34.             "metadata": metadata or {}
    35.         }
    36.         self.operation_log.append(entry)
    37.         if len(self.operation_log) > MAX_LOG_ENTRIES:
    38.             self.operation_log.pop(0)

    39.     def get_hash(self, file_path):
    40.         """计算文件哈希值(SHA-256)"""
    41.         hasher = hashlib.sha256()
    42.         try:
    43.             with open(file_path, 'rb') as f:
    44.                 while chunk := f.read(DEFAULT_CHUNK_SIZE):
    45.                     hasher.update(chunk)
    46.             return hasher.hexdigest()
    47.         except Exception as e:
    48.             self.error_log.append(f"哈希计算失败 [{file_path}]: {str(e)}")
    49.             return None

    50.     def scan_network_share(self, target_path):
    51.         """深度扫描网络共享"""
    52.         scan_report = []
    53.         start_time = time.time()

    54.         def recursive_scan(current_path):
    55.             try:
    56.                 with os.scandir(current_path) as entries:
    57.                     for entry in entries:
    58.                         try:
    59.                             if entry.is_dir(follow_symlinks=False):
    60.                                 if not entry.name.startswith('$'):
    61.                                     recursive_scan(entry.path)
    62.                             else:
    63.                                 stat = entry.stat()
    64.                                 scan_report.append({
    65.                                     "path": entry.path,
    66.                                     "size": stat.st_size,
    67.                                     "modified": stat.st_mtime,
    68.                                     "hash": self.get_hash(entry.path)
    69.                                 })
    70.                         except PermissionError:
    71.                             self.error_log.append(f"权限不足: {entry.path}")
    72.                         except Exception as e:
    73.                             self.error_log.append(f"扫描错误: {entry.path} ({str(e)})")
    74.             except Exception as e:
    75.                 self.error_log.append(f"扫描中止: {current_path} ({str(e)})")

    76.         recursive_scan(target_path)
    77.         self.log_operation("NETSCAN", target_path, {
    78.             "duration": time.time() - start_time,
    79.             "files_scanned": len(scan_report)
    80.         })
    81.         return scan_report

    82.     def deduplicate_files(self, target_path):
    83.         """智能文件去重引擎"""
    84.         hash_registry = defaultdict(list)
    85.         deleted_records = []
    86.         preserved_files = set()

    87.         # 第一阶段:构建哈希索引
    88.         for root, dirs, files in os.walk(target_path):
    89.             # 跳过系统目录
    90.             dirs[:] = [d for d in dirs if d not in SAFE_DIRECTORIES]
    91.             
    92.             for file in files:
    93.                 file_path = os.path.join(root, file)
    94.                 file_hash = self.get_hash(file_path)
    95.                 if file_hash:
    96.                     hash_registry[file_hash].append(file_path)

    97.         # 第二阶段:处理重复文件
    98.         for file_hash, files in hash_registry.items():
    99.             if len(files) > 1:
    100.                 # 按修改时间排序(保留最新)
    101.                 sorted_files = sorted(
    102.                     files,
    103.                     key=lambda x: os.path.getmtime(x),
    104.                     reverse=True
    105.                 )
    106.                 preserved_file = sorted_files[0]
    107.                 preserved_files.add(preserved_file)

    108.                 # 记录删除操作
    109.                 for duplicate in sorted_files[1:]:
    110.                     try:
    111.                         file_stat = os.stat(duplicate)
    112.                         deleted_records.append({
    113.                             "deleted_path": duplicate,
    114.                             "preserved_path": preserved_file,
    115.                             "size": file_stat.st_size,
    116.                             "modified": file_stat.st_mtime,
    117.                             "hash": file_hash
    118.                         })
    119.                     except Exception as e:
    120.                         self.error_log.append(f"记录失败: {duplicate} ({str(e)})")

    121.         # 第三阶段:执行删除
    122.         success_count = 0
    123.         for record in deleted_records:
    124.             try:
    125.                 os.remove(record["deleted_path"])
    126.                 success_count += 1
    127.                 self.log_operation("DELETION", record["deleted_path"], {
    128.                     "preserved": record["preserved_path"],
    129.                     "hash": record["hash"][:12]
    130.                 })
    131.             except Exception as e:
    132.                 self.error_log.append(f"删除失败: {record['deleted_path']} ({str(e)})")

    133.         # 生成报告
    134.         report_path = self.generate_report(
    135.             target_path,
    136.             preserved_files,
    137.             deleted_records,
    138.             success_count
    139.         )

    140.         return {
    141.             "total_files": sum(len(v) for v in hash_registry.values()),
    142.             "duplicates": len(deleted_records),
    143.             "deleted": success_count,
    144.             "report_path": report_path
    145.         }

    146.     def generate_report(self, target_path, preserved, deleted, success_count):
    147.         """生成审计报告"""
    148.         timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    149.         report_dir = os.path.join(os.getcwd(), "reports")
    150.         os.makedirs(report_dir, exist_ok=True)
    151.         report_path = os.path.join(report_dir,
    152.             f"DedupeReport_{os.path.basename(target_path)}_{timestamp}.csv")

    153.         with open(report_path, 'w', newline='', encoding='utf-8-sig') as f:
    154.             writer = csv.writer(f)
    155.             
    156.             # Header
    157.             writer.writerow(["企业文件管理审计报告"])
    158.             writer.writerow([f"目标目录: {target_path}"])
    159.             writer.writerow([f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"])
    160.             writer.writerow([])
    161.             
    162.             # 删除记录
    163.             writer.writerow(["删除文件路径", "大小 (MB)", "最后修改时间", "保留文件路径", "哈希片段"])
    164.             for record in deleted:
    165.                 writer.writerow([
    166.                     record["deleted_path"],
    167.                     round(record["size"] / 1048576, 2),
    168.                     datetime.fromtimestamp(record["modified"]).strftime('%Y-%m-%d %H:%M'),
    169.                     record["preserved_path"],
    170.                     record["hash"][:12]
    171.                 ])
    172.             
    173.             # 统计摘要
    174.             writer.writerow([])
    175.             writer.writerow(["总文件数", sum(len(v) for v in deleted) + len(preserved)])
    176.             writer.writerow(["发现重复", len(deleted)])
    177.             writer.writerow(["成功删除", success_count])
    178.             writer.writerow(["保留文件", len(preserved)])
    179.             writer.writerow(["错误数量", len(self.error_log)])

    180.         return report_path

    181. class UserInterface:
    182.     @staticmethod
    183.     def select_directory(gui_title="选择目录"):
    184.         """跨平台目录选择"""
    185.         if HAS_GUI:
    186.             root = tk.Tk()
    187.             root.withdraw()
    188.             root.attributes('-topmost', True)
    189.             path = filedialog.askdirectory(title=gui_title)
    190.             root.destroy()
    191.             return path
    192.         else:
    193.             print("\n命令行模式:")
    194.             while True:
    195.                 path = input("请输入完整路径(或输入Q退出): ").strip()
    196.                 if path.upper() == 'Q':
    197.                     return None
    198.                 if os.path.isdir(path):
    199.                     return os.path.normpath(path)
    200.                 print("错误:无效路径")

    201.     @classmethod
    202.     def display_report(cls, report_path):
    203.         """报告展示"""
    204.         if HAS_GUI:
    205.             root = tk.Tk()
    206.             root.withdraw()
    207.             messagebox.showinfo(
    208.                 "操作完成",
    209.                 f"报告已生成至:\n{report_path}"
    210.             )
    211.             root.destroy()
    212.         else:
    213.             print(f"\n操作完成,报告路径:\n{report_path}")

    214. if __name__ == "__main__":
    215.     fm = FileManager()
    216.     ui = UserInterface()

    217.     # 操作菜单
    218.     print("\n企业文件管理系统")
    219.     print("1. 扫描网络共享")
    220.     print("2. 清理重复文件")
    221.     print("3. 退出")

    222.     choice = input("请选择操作:").strip()

    223.     if choice == '1':
    224.         target = ui.select_directory("选择扫描目录")
    225.         if target:
    226.             report = fm.scan_network_share(target)
    227.             print(f"扫描完成,发现 {len(report)} 个文件")

    228.     elif choice == '2':
    229.         target = ui.select_directory("选择清理目录")
    230.         if target:
    231.             # 安全确认
    232.             if HAS_GUI:
    233.                 confirm = messagebox.askyesno(
    234.                     "确认删除",
    235.                     "将永久删除重复文件!\n请确认已备份重要数据。"
    236.                 )
    237.             else:
    238.                 confirm = input("将永久删除文件,确认操作?(y/N): ").lower() == 'y'

    239.             if confirm:
    240.                 result = fm.deduplicate_files(target)
    241.                 ui.display_report(result['report_path'])
    242.                 print(f"删除完成:{result['deleted']}/{result['duplicates']} 成功")

    243.     print("\n操作日志:")
    244.     for entry in fm.operation_log[-3:]:
    245.         print(f"[{entry['timestamp']}] {entry['type']}: {entry['path']}")

    246.     if fm.error_log:
    247.         print("\n最近错误:")
    248.         for error in fm.error_log[-3:]:
    249.             print(f"! {error}")
    复制代码

    1. python python去重小脚本.py
    复制代码

    点评

    666 :)  发表于 前天 20:00
  • TA的每日心情
    奋斗
    6 小时前
  • 签到天数: 2929 天

    [LV.Master]伴坛终老

     楼主| 发表于 前天 13:28 | 显示全部楼层


    python扫描重复文件
    1. import os
    2. import hashlib
    3. import tkinter as tk
    4. from tkinter import filedialog
    5. import xlwt  # 改用xlwt生成兼容性更好的xls格式
    6. import time

    7. def select_directory():
    8.     """选择文件夹对话框"""
    9.     root = tk.Tk()
    10.     root.withdraw()
    11.     return filedialog.askdirectory()

    12. def file_hash(filepath):
    13.     """计算文件的MD5哈希值(兼容旧系统)"""
    14.     hash_md5 = hashlib.md5()
    15.     try:
    16.         with open(filepath, "rb") as f:
    17.             while True:
    18.                 chunk = f.read(4096)
    19.                 if not chunk:
    20.                     break
    21.                 hash_md5.update(chunk)
    22.         return hash_md5.hexdigest()
    23.     except Exception as e:
    24.         print("[Error] 无法读取文件 {}: {}".format(filepath, str(e)))
    25.         return None

    26. def scan_files(directory, max_depth=None):
    27.     """扫描目录及子目录文件(兼容旧路径格式)"""
    28.     file_dict = {}
    29.     directory = os.path.normpath(directory)  # 规范化路径格式

    30.     for root, dirs, files in os.walk(directory):
    31.         # 计算当前深度
    32.         rel_path = os.path.relpath(root, directory)
    33.         current_depth = rel_path.count(os.sep) if rel_path != '.' else 0
    34.         
    35.         if max_depth is not None and current_depth >= max_depth:
    36.             del dirs[:]
    37.             continue

    38.         for file in files:
    39.             file_path = os.path.join(root, file)
    40.             try:
    41.                 file_size = os.path.getsize(file_path)
    42.                 mtime = os.path.getmtime(file_path)
    43.                 modified_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(mtime))
    44.                 file_hash_value = file_hash(file_path)
    45.                
    46.                 if file_hash_value:
    47.                     key = (file_hash_value, file_size)
    48.                     if key not in file_dict:
    49.                         file_dict[key] = []
    50.                     file_dict[key].append({
    51.                         "path": file_path,
    52.                         "size": file_size,
    53.                         "modified": modified_time
    54.                     })
    55.             except Exception as e:
    56.                 print("[Error] 跳过文件 {}: {}".format(file_path, str(e)))
    57.     return file_dict

    58. def export_to_excel(duplicates, output_file="duplicates.xls"):
    59.     """导出重复文件到Excel 97-2003格式"""
    60.     wb = xlwt.Workbook(encoding='utf-8')
    61.     ws = wb.add_sheet('重复文件报告')

    62.     # 设置样式
    63.     header_style = xlwt.easyxf(
    64.         'font: bold on; align: wrap on, vert centre, horiz center;'
    65.         'borders: left thin, right thin, top thin, bottom thin;'
    66.     )
    67.     group_style = xlwt.easyxf(
    68.         'pattern: pattern solid, fore_colour pale_blue;'
    69.         'font: bold on;'
    70.     )

    71.     # 写入表头
    72.     ws.write(0, 0, "重复文件组", header_style)
    73.     ws.write(0, 1, "文件路径", header_style)
    74.     ws.write(0, 2, "文件大小 (Bytes)", header_style)
    75.     ws.write(0, 3, "最后修改时间", header_style)
    76.    
    77.     row = 1
    78.     group_num = 1
    79.     for key, files in duplicates.items():
    80.         if len(files) > 1:
    81.             # 合并单元格显示组信息
    82.             ws.write(row, 0, "重复文件组 {} (共 {} 个)".format(group_num, len(files)), group_style)
    83.             ws.write(row, 1, "MD5: {}".format(key[0]), group_style)
    84.             ws.row(row).set_style(xlwt.easyxf('pattern: pattern solid, fore_colour gray25;'))
    85.             
    86.             row +=1
    87.             for file in files:
    88.                 ws.write(row, 0, group_num)
    89.                 ws.write(row, 1, file["path"])
    90.                 ws.write(row, 2, file["size"])
    91.                 ws.write(row, 3, file["modified"])
    92.                 row +=1
    93.             group_num +=1
    94.    
    95.     # 调整列宽
    96.     ws.col(0).width = 3000
    97.     ws.col(1).width = 12000
    98.     ws.col(2).width = 4000
    99.     ws.col(3).width = 4000

    100.     wb.save(output_file)
    101.     print("报告已生成:{}".format(os.path.abspath(output_file)))

    102. def main():
    103.     """主程序"""
    104.     target_dir = select_directory()
    105.     if not target_dir:
    106.         print("操作已取消")
    107.         return

    108.     try:
    109.         depth_input = input("请输入扫描深度(0=仅当前目录,回车=不限深度): ").strip()
    110.         max_depth = int(depth_input) if depth_input else None
    111.     except ValueError:
    112.         max_depth = None

    113.     print("扫描中...(可能需要较长时间)")
    114.     all_files = scan_files(target_dir, max_depth)
    115.     duplicates = {k: v for k, v in all_files.items() if len(v) > 1}

    116.     if duplicates:
    117.         print("发现 {} 组重复文件".format(len(duplicates)))
    118.         export_to_excel(duplicates)
    119.     else:
    120.         print("未发现重复文件")

    121. if __name__ == "__main__":
    122.     # 配置适用于旧系统的编码环境
    123.     import sys
    124.     if sys.version_info < (3,):
    125.         reload(sys)
    126.         sys.setdefaultencoding('utf-8')
    127.     main()
    复制代码
    回复 支持 1 反对 0

    使用道具 举报

  • TA的每日心情
    奋斗
    4 小时前
  • 签到天数: 341 天

    [LV.8]以坛为家I

    发表于 前天 19:59 | 显示全部楼层
    刺客老哥牛逼
    回复 支持 反对

    使用道具 举报

    您需要登录后才可以回帖 登录 | 注册

    本版积分规则

    红盟社区--红客联盟 

    Processed in 0.066789 second(s), 28 queries.

    站点统计| 举报| Archiver| 手机版| 黑屋 |   

    备案号:冀ICP备20006029号-1 Powered by HUC © 2001-2021 Comsenz Inc.

    手机扫我进入移动触屏客户端

    关注我们可获取更多热点资讯

    Honor accompaniments. theme macfee

    快速回复 返回顶部 返回列表