diff --git a/Makefile b/Makefile index 8f95467..e9604c9 100644 --- a/Makefile +++ b/Makefile @@ -184,7 +184,7 @@ compose-down: .PHONY: validate-keys validate-keys: @echo "🐍 使用 Python 版本验证密钥..." - python3 scripts/validate-keys.py -c 300 -t 15 + python3 scripts/validate-keys.py -c 100 -t 15 # 健康检查 .PHONY: health diff --git a/scripts/validate-keys.py b/scripts/validate-keys.py index fa4917b..b2f90a1 100755 --- a/scripts/validate-keys.py +++ b/scripts/validate-keys.py @@ -7,13 +7,11 @@ import asyncio import aiohttp -import json import time import sys import os -from typing import List, Dict, Set, Tuple +from typing import List, Dict, Tuple, Optional from dataclasses import dataclass -from concurrent.futures import ThreadPoolExecutor import argparse # 配置 @@ -132,7 +130,7 @@ class KeyValidator: try: result = await task model_results[model] = result - except Exception as e: + except Exception: model_results[model] = False # 判断密钥是否有效(至少一个模型可用) @@ -160,17 +158,20 @@ def load_keys(file_path: str) -> List[str]: return keys -def deduplicate_keys(keys: List[str]) -> List[str]: - """去重密钥""" +def deduplicate_keys(keys: List[str]) -> Tuple[List[str], List[str]]: + """去重密钥,返回(唯一密钥列表, 重复密钥列表)""" seen = set() unique_keys = [] - + duplicate_keys = [] + for key in keys: if key not in seen: seen.add(key) unique_keys.append(key) - - return unique_keys + else: + duplicate_keys.append(key) + + return unique_keys, duplicate_keys def format_model_status(model_results: Dict[str, bool]) -> str: """格式化模型状态显示""" @@ -222,59 +223,75 @@ async def validate_keys_batch(keys: List[str], base_url: str, timeout: int, conc return results -def save_results(results: List[KeyValidationResult], output_dir: str = "."): +def save_results(results: List[KeyValidationResult], duplicate_keys: Optional[List[str]] = None, output_dir: str = "."): """保存验证结果到文件""" valid_keys = [] invalid_keys = [] - + for result in results: if result.is_valid: valid_keys.append(result.key) else: invalid_keys.append(result.key) - + # 保存有效密钥 valid_file = os.path.join(output_dir, "valid_keys.txt") with open(valid_file, 'w', encoding='utf-8') as f: for key in valid_keys: f.write(f"{key}\n") - + # 保存无效密钥 invalid_file = os.path.join(output_dir, "invalid_keys.txt") with open(invalid_file, 'w', encoding='utf-8') as f: for key in invalid_keys: f.write(f"{key}\n") - - return valid_file, invalid_file, len(valid_keys), len(invalid_keys) -def print_summary(results: List[KeyValidationResult], valid_count: int, invalid_count: int, - valid_file: str, invalid_file: str, duration: float): + # 保存重复密钥 + duplicate_file = None + if duplicate_keys: + duplicate_file = os.path.join(output_dir, "duplicate_keys.txt") + with open(duplicate_file, 'w', encoding='utf-8') as f: + f.write("# 重复的密钥(已去重处理)\n") + f.write(f"# 发现 {len(duplicate_keys)} 个重复密钥\n") + f.write("# 这些密钥在验证过程中被自动去重\n\n") + for key in duplicate_keys: + f.write(f"{key}\n") + + return valid_file, invalid_file, duplicate_file, len(valid_keys), len(invalid_keys) + +def print_summary(results: List[KeyValidationResult], valid_count: int, invalid_count: int, + valid_file: str, invalid_file: str, duplicate_file: Optional[str], + duplicate_count: int, duration: float): """打印验证总结""" total = len(results) - + print("\n" + "=" * 120) print("📊 验证结果总结") print("=" * 120) print(f"总密钥数量: {total}") print(f"有效密钥数: {valid_count} ({valid_count/total*100:.1f}%)") print(f"无效密钥数: {invalid_count} ({invalid_count/total*100:.1f}%)") + if duplicate_count > 0: + print(f"重复密钥数: {duplicate_count}") print(f"验证耗时: {duration:.2f} 秒") print(f"平均速度: {total/duration:.1f} 密钥/秒") print() - print(f"📁 结果文件:") + print("📁 结果文件:") print(f" 有效密钥: {valid_file}") print(f" 无效密钥: {invalid_file}") - + if duplicate_file: + print(f" 重复密钥: {duplicate_file}") + # 模型统计 - print(f"\n📈 模型可用性统计:") + print("\n📈 模型可用性统计:") model_stats = {model: 0 for model in TEST_MODELS} - + for result in results: if result.is_valid: for model, available in result.model_results.items(): if available: model_stats[model] += 1 - + for model in TEST_MODELS: count = model_stats[model] percentage = count / valid_count * 100 if valid_count > 0 else 0 @@ -299,30 +316,33 @@ async def main(): print(f"🧪 测试模型: {', '.join(TEST_MODELS)}") # 加载和去重密钥 - print(f"\n📖 加载密钥文件...") + print("\n📖 加载密钥文件...") raw_keys = load_keys(args.file) print(f" 原始密钥数量: {len(raw_keys)}") - - unique_keys = deduplicate_keys(raw_keys) - duplicates = len(raw_keys) - len(unique_keys) + + unique_keys, duplicate_keys = deduplicate_keys(raw_keys) + duplicate_count = len(duplicate_keys) print(f" 去重后数量: {len(unique_keys)}") - if duplicates > 0: - print(f" 发现重复: {duplicates} 个") - + if duplicate_count > 0: + print(f" 发现重复: {duplicate_count} 个") + if not unique_keys: print("❌ 没有找到有效的密钥") sys.exit(1) - + # 开始验证 start_time = time.time() results = await validate_keys_batch(unique_keys, args.url, args.timeout, args.concurrency) duration = time.time() - start_time - + # 保存结果 - valid_file, invalid_file, valid_count, invalid_count = save_results(results, args.output) - + valid_file, invalid_file, duplicate_file, valid_count, invalid_count = save_results( + results, duplicate_keys if duplicate_keys else None, args.output + ) + # 打印总结 - print_summary(results, valid_count, invalid_count, valid_file, invalid_file, duration) + print_summary(results, valid_count, invalid_count, valid_file, invalid_file, + duplicate_file, duplicate_count, duration) if __name__ == "__main__": try: