#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 高性能 OpenAI API 密钥验证脚本 支持并发验证、去重、多模型测试 """ import asyncio import aiohttp import json import time import sys import os from typing import List, Dict, Set, Tuple from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor import argparse # 配置 DEFAULT_KEYS_FILE = "keys.txt" DEFAULT_BASE_URL = "https://api.openai.com" DEFAULT_CONCURRENCY = 50 DEFAULT_TIMEOUT = 30 # 测试模型列表 TEST_MODELS = [ "gpt-4o-mini", "gpt-4.1-mini", "gpt-4.1-nano" ] @dataclass class KeyValidationResult: """密钥验证结果""" key: str key_preview: str is_valid: bool model_results: Dict[str, bool] error_message: str = "" class KeyValidator: """高性能密钥验证器""" def __init__(self, base_url: str = DEFAULT_BASE_URL, timeout: int = DEFAULT_TIMEOUT, concurrency: int = DEFAULT_CONCURRENCY): self.base_url = base_url.rstrip('/') self.timeout = timeout self.concurrency = concurrency self.session = None async def __aenter__(self): """异步上下文管理器入口""" connector = aiohttp.TCPConnector( limit=self.concurrency * 2, limit_per_host=self.concurrency, ttl_dns_cache=300, use_dns_cache=True, ) timeout = aiohttp.ClientTimeout(total=self.timeout) self.session = aiohttp.ClientSession( connector=connector, timeout=timeout, headers={ 'User-Agent': 'GPT-Load-KeyValidator/1.0' } ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器退出""" if self.session: await self.session.close() def get_key_preview(self, key: str) -> str: """获取密钥预览(脱敏显示)""" if len(key) < 20: return key[:4] + "***" + key[-4:] return key[:8] + "***" + key[-8:] async def test_model(self, key: str, model: str) -> bool: """测试单个模型是否可用""" url = f"{self.base_url}/v1/chat/completions" headers = { "Authorization": f"Bearer {key}", "Content-Type": "application/json" } payload = { "model": model, "messages": [ {"role": "user", "content": "Hi"} ], "max_tokens": 1, "temperature": 0 } try: async with self.session.post(url, headers=headers, json=payload) as response: if response.status == 200: return True elif response.status == 401: # 认证失败,密钥无效 return False elif response.status == 404: # 模型不存在或无权限 return False elif response.status == 429: # 速率限制,但密钥可能有效 return True else: # 其他错误,认为模型不可用 return False except asyncio.TimeoutError: return False except Exception: return False async def validate_key(self, key: str) -> KeyValidationResult: """验证单个密钥""" key_preview = self.get_key_preview(key) model_results = {} # 并发测试所有模型 tasks = [] for model in TEST_MODELS: task = asyncio.create_task(self.test_model(key, model)) tasks.append((model, task)) # 等待所有测试完成 for model, task in tasks: try: result = await task model_results[model] = result except Exception as e: model_results[model] = False # 判断密钥是否有效(至少一个模型可用) is_valid = any(model_results.values()) return KeyValidationResult( key=key, key_preview=key_preview, is_valid=is_valid, model_results=model_results ) def load_keys(file_path: str) -> List[str]: """加载密钥文件""" if not os.path.exists(file_path): print(f"❌ 密钥文件不存在: {file_path}") sys.exit(1) keys = [] with open(file_path, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): line = line.strip() if line and not line.startswith('#'): keys.append(line) return keys def deduplicate_keys(keys: List[str]) -> List[str]: """去重密钥""" seen = set() unique_keys = [] for key in keys: if key not in seen: seen.add(key) unique_keys.append(key) return unique_keys def format_model_status(model_results: Dict[str, bool]) -> str: """格式化模型状态显示""" status_parts = [] for model in TEST_MODELS: if model in model_results: emoji = "✅" if model_results[model] else "❌" status_parts.append(f"{emoji} {model}") else: status_parts.append(f"❓ {model}") return " | ".join(status_parts) async def validate_keys_batch(keys: List[str], base_url: str, timeout: int, concurrency: int) -> List[KeyValidationResult]: """批量验证密钥""" results = [] async with KeyValidator(base_url, timeout, concurrency) as validator: # 创建信号量限制并发数 semaphore = asyncio.Semaphore(concurrency) async def validate_with_semaphore(key: str) -> KeyValidationResult: async with semaphore: return await validator.validate_key(key) # 创建所有验证任务 tasks = [validate_with_semaphore(key) for key in keys] # 使用 as_completed 来实时显示进度 completed = 0 total = len(tasks) print(f"\n🚀 开始验证 {total} 个密钥...") print("=" * 120) print(f"{'序号':<6} {'密钥预览':<20} {'状态':<6} {'模型测试结果':<80}") print("=" * 120) for coro in asyncio.as_completed(tasks): result = await coro completed += 1 # 实时输出结果 status_emoji = "✅ 有效" if result.is_valid else "❌ 无效" model_status = format_model_status(result.model_results) print(f"{completed:<6} {result.key_preview:<20} {status_emoji:<6} {model_status}") results.append(result) return results def save_results(results: List[KeyValidationResult], output_dir: str = "."): """保存验证结果到文件""" valid_keys = [] invalid_keys = [] for result in results: if result.is_valid: valid_keys.append(result.key) else: invalid_keys.append(result.key) # 保存有效密钥 valid_file = os.path.join(output_dir, "valid_keys.txt") with open(valid_file, 'w', encoding='utf-8') as f: for key in valid_keys: f.write(f"{key}\n") # 保存无效密钥 invalid_file = os.path.join(output_dir, "invalid_keys.txt") with open(invalid_file, 'w', encoding='utf-8') as f: for key in invalid_keys: f.write(f"{key}\n") return valid_file, invalid_file, len(valid_keys), len(invalid_keys) def print_summary(results: List[KeyValidationResult], valid_count: int, invalid_count: int, valid_file: str, invalid_file: str, duration: float): """打印验证总结""" total = len(results) print("\n" + "=" * 120) print("📊 验证结果总结") print("=" * 120) print(f"总密钥数量: {total}") print(f"有效密钥数: {valid_count} ({valid_count/total*100:.1f}%)") print(f"无效密钥数: {invalid_count} ({invalid_count/total*100:.1f}%)") print(f"验证耗时: {duration:.2f} 秒") print(f"平均速度: {total/duration:.1f} 密钥/秒") print() print(f"📁 结果文件:") print(f" 有效密钥: {valid_file}") print(f" 无效密钥: {invalid_file}") # 模型统计 print(f"\n📈 模型可用性统计:") model_stats = {model: 0 for model in TEST_MODELS} for result in results: if result.is_valid: for model, available in result.model_results.items(): if available: model_stats[model] += 1 for model in TEST_MODELS: count = model_stats[model] percentage = count / valid_count * 100 if valid_count > 0 else 0 print(f" {model}: {count}/{valid_count} ({percentage:.1f}%)") async def main(): """主函数""" parser = argparse.ArgumentParser(description="OpenAI API 密钥验证工具") parser.add_argument("-f", "--file", default=DEFAULT_KEYS_FILE, help="密钥文件路径") parser.add_argument("-u", "--url", default=DEFAULT_BASE_URL, help="API 基础URL") parser.add_argument("-c", "--concurrency", type=int, default=DEFAULT_CONCURRENCY, help="并发数") parser.add_argument("-t", "--timeout", type=int, default=DEFAULT_TIMEOUT, help="超时时间(秒)") parser.add_argument("-o", "--output", default=".", help="输出目录") args = parser.parse_args() print("🔑 OpenAI API 密钥验证工具") print(f"📁 密钥文件: {args.file}") print(f"🌐 API地址: {args.url}") print(f"⚡ 并发数: {args.concurrency}") print(f"⏱️ 超时时间: {args.timeout}秒") print(f"🧪 测试模型: {', '.join(TEST_MODELS)}") # 加载和去重密钥 print(f"\n📖 加载密钥文件...") raw_keys = load_keys(args.file) print(f" 原始密钥数量: {len(raw_keys)}") unique_keys = deduplicate_keys(raw_keys) duplicates = len(raw_keys) - len(unique_keys) print(f" 去重后数量: {len(unique_keys)}") if duplicates > 0: print(f" 发现重复: {duplicates} 个") if not unique_keys: print("❌ 没有找到有效的密钥") sys.exit(1) # 开始验证 start_time = time.time() results = await validate_keys_batch(unique_keys, args.url, args.timeout, args.concurrency) duration = time.time() - start_time # 保存结果 valid_file, invalid_file, valid_count, invalid_count = save_results(results, args.output) # 打印总结 print_summary(results, valid_count, invalid_count, valid_file, invalid_file, duration) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n\n⚠️ 用户中断验证过程") sys.exit(1) except Exception as e: print(f"\n❌ 验证过程中发生错误: {e}") sys.exit(1)