From 1feeb2c8ac6863174f8c0666fcb3fa2cb9cb3e5f Mon Sep 17 00:00:00 2001 From: tbphp Date: Sat, 7 Jun 2025 13:42:31 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0key=E9=AA=8C=E8=AF=81?= =?UTF-8?q?=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + Makefile | 9 ++ README.md | 14 ++ scripts/validate-keys.py | 335 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 359 insertions(+) create mode 100755 scripts/validate-keys.py diff --git a/.gitignore b/.gitignore index 269dbfd..9f2eb7b 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ # Test binary, built with `go test -c` *.test +*.txt # Output of the go coverage tool, specifically when used with LiteIDE *.out diff --git a/Makefile b/Makefile index f2bdeae..8f95467 100644 --- a/Makefile +++ b/Makefile @@ -180,6 +180,12 @@ compose-down: docker-compose down docker-compose -f docker-compose.dev.yml down 2>/dev/null || true +# 密钥验证 +.PHONY: validate-keys +validate-keys: + @echo "🐍 使用 Python 版本验证密钥..." + python3 scripts/validate-keys.py -c 300 -t 15 + # 健康检查 .PHONY: health health: @@ -245,3 +251,6 @@ help: @echo " stats - 查看统计信息" @echo " reset-keys - 重置密钥状态" @echo " blacklist - 查看黑名单" + @echo "" + @echo "密钥验证:" + @echo " validate-keys - 验证 API 密钥" diff --git a/README.md b/README.md index b083d6f..0147c79 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,20 @@ cp .env.example .env | CORS | `ENABLE_CORS` | true | 是否启用 CORS | | 连接池 | `MAX_SOCKETS` | 50 | HTTP 连接池最大连接数 | +## 🔑 密钥验证工具 + +项目提供了高性能的 API 密钥验证工具,支持批量验证、去重和多模型测试: + +### 快速使用 + +```bash +# 自动选择最佳验证方式 +make validate-keys + +# 或直接运行 +./scripts/validate-keys.py +``` + ## 📊 监控端点 | 端点 | 方法 | 说明 | diff --git a/scripts/validate-keys.py b/scripts/validate-keys.py new file mode 100755 index 0000000..fa4917b --- /dev/null +++ b/scripts/validate-keys.py @@ -0,0 +1,335 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +高性能 OpenAI API 密钥验证脚本 +支持并发验证、去重、多模型测试 +""" + +import asyncio +import aiohttp +import json +import time +import sys +import os +from typing import List, Dict, Set, Tuple +from dataclasses import dataclass +from concurrent.futures import ThreadPoolExecutor +import argparse + +# 配置 +DEFAULT_KEYS_FILE = "keys.txt" +DEFAULT_BASE_URL = "https://api.openai.com" +DEFAULT_CONCURRENCY = 50 +DEFAULT_TIMEOUT = 30 + +# 测试模型列表 +TEST_MODELS = [ + "gpt-4o-mini", + "gpt-4.1-mini", + "gpt-4.1-nano" +] + +@dataclass +class KeyValidationResult: + """密钥验证结果""" + key: str + key_preview: str + is_valid: bool + model_results: Dict[str, bool] + error_message: str = "" + +class KeyValidator: + """高性能密钥验证器""" + + def __init__(self, base_url: str = DEFAULT_BASE_URL, timeout: int = DEFAULT_TIMEOUT, concurrency: int = DEFAULT_CONCURRENCY): + self.base_url = base_url.rstrip('/') + self.timeout = timeout + self.concurrency = concurrency + self.session = None + + async def __aenter__(self): + """异步上下文管理器入口""" + connector = aiohttp.TCPConnector( + limit=self.concurrency * 2, + limit_per_host=self.concurrency, + ttl_dns_cache=300, + use_dns_cache=True, + ) + + timeout = aiohttp.ClientTimeout(total=self.timeout) + self.session = aiohttp.ClientSession( + connector=connector, + timeout=timeout, + headers={ + 'User-Agent': 'GPT-Load-KeyValidator/1.0' + } + ) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """异步上下文管理器退出""" + if self.session: + await self.session.close() + + def get_key_preview(self, key: str) -> str: + """获取密钥预览(脱敏显示)""" + if len(key) < 20: + return key[:4] + "***" + key[-4:] + return key[:8] + "***" + key[-8:] + + async def test_model(self, key: str, model: str) -> bool: + """测试单个模型是否可用""" + url = f"{self.base_url}/v1/chat/completions" + headers = { + "Authorization": f"Bearer {key}", + "Content-Type": "application/json" + } + + payload = { + "model": model, + "messages": [ + {"role": "user", "content": "Hi"} + ], + "max_tokens": 1, + "temperature": 0 + } + + try: + async with self.session.post(url, headers=headers, json=payload) as response: + if response.status == 200: + return True + elif response.status == 401: + # 认证失败,密钥无效 + return False + elif response.status == 404: + # 模型不存在或无权限 + return False + elif response.status == 429: + # 速率限制,但密钥可能有效 + return True + else: + # 其他错误,认为模型不可用 + return False + + except asyncio.TimeoutError: + return False + except Exception: + return False + + async def validate_key(self, key: str) -> KeyValidationResult: + """验证单个密钥""" + key_preview = self.get_key_preview(key) + model_results = {} + + # 并发测试所有模型 + tasks = [] + for model in TEST_MODELS: + task = asyncio.create_task(self.test_model(key, model)) + tasks.append((model, task)) + + # 等待所有测试完成 + for model, task in tasks: + try: + result = await task + model_results[model] = result + except Exception as e: + model_results[model] = False + + # 判断密钥是否有效(至少一个模型可用) + is_valid = any(model_results.values()) + + return KeyValidationResult( + key=key, + key_preview=key_preview, + is_valid=is_valid, + model_results=model_results + ) + +def load_keys(file_path: str) -> List[str]: + """加载密钥文件""" + if not os.path.exists(file_path): + print(f"❌ 密钥文件不存在: {file_path}") + sys.exit(1) + + keys = [] + with open(file_path, 'r', encoding='utf-8') as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + if line and not line.startswith('#'): + keys.append(line) + + return keys + +def deduplicate_keys(keys: List[str]) -> List[str]: + """去重密钥""" + seen = set() + unique_keys = [] + + for key in keys: + if key not in seen: + seen.add(key) + unique_keys.append(key) + + return unique_keys + +def format_model_status(model_results: Dict[str, bool]) -> str: + """格式化模型状态显示""" + status_parts = [] + for model in TEST_MODELS: + if model in model_results: + emoji = "✅" if model_results[model] else "❌" + status_parts.append(f"{emoji} {model}") + else: + status_parts.append(f"❓ {model}") + + return " | ".join(status_parts) + +async def validate_keys_batch(keys: List[str], base_url: str, timeout: int, concurrency: int) -> List[KeyValidationResult]: + """批量验证密钥""" + results = [] + + async with KeyValidator(base_url, timeout, concurrency) as validator: + # 创建信号量限制并发数 + semaphore = asyncio.Semaphore(concurrency) + + async def validate_with_semaphore(key: str) -> KeyValidationResult: + async with semaphore: + return await validator.validate_key(key) + + # 创建所有验证任务 + tasks = [validate_with_semaphore(key) for key in keys] + + # 使用 as_completed 来实时显示进度 + completed = 0 + total = len(tasks) + + print(f"\n🚀 开始验证 {total} 个密钥...") + print("=" * 120) + print(f"{'序号':<6} {'密钥预览':<20} {'状态':<6} {'模型测试结果':<80}") + print("=" * 120) + + for coro in asyncio.as_completed(tasks): + result = await coro + completed += 1 + + # 实时输出结果 + status_emoji = "✅ 有效" if result.is_valid else "❌ 无效" + model_status = format_model_status(result.model_results) + + print(f"{completed:<6} {result.key_preview:<20} {status_emoji:<6} {model_status}") + + results.append(result) + + return results + +def save_results(results: List[KeyValidationResult], output_dir: str = "."): + """保存验证结果到文件""" + valid_keys = [] + invalid_keys = [] + + for result in results: + if result.is_valid: + valid_keys.append(result.key) + else: + invalid_keys.append(result.key) + + # 保存有效密钥 + valid_file = os.path.join(output_dir, "valid_keys.txt") + with open(valid_file, 'w', encoding='utf-8') as f: + for key in valid_keys: + f.write(f"{key}\n") + + # 保存无效密钥 + invalid_file = os.path.join(output_dir, "invalid_keys.txt") + with open(invalid_file, 'w', encoding='utf-8') as f: + for key in invalid_keys: + f.write(f"{key}\n") + + return valid_file, invalid_file, len(valid_keys), len(invalid_keys) + +def print_summary(results: List[KeyValidationResult], valid_count: int, invalid_count: int, + valid_file: str, invalid_file: str, duration: float): + """打印验证总结""" + total = len(results) + + print("\n" + "=" * 120) + print("📊 验证结果总结") + print("=" * 120) + print(f"总密钥数量: {total}") + print(f"有效密钥数: {valid_count} ({valid_count/total*100:.1f}%)") + print(f"无效密钥数: {invalid_count} ({invalid_count/total*100:.1f}%)") + print(f"验证耗时: {duration:.2f} 秒") + print(f"平均速度: {total/duration:.1f} 密钥/秒") + print() + print(f"📁 结果文件:") + print(f" 有效密钥: {valid_file}") + print(f" 无效密钥: {invalid_file}") + + # 模型统计 + print(f"\n📈 模型可用性统计:") + model_stats = {model: 0 for model in TEST_MODELS} + + for result in results: + if result.is_valid: + for model, available in result.model_results.items(): + if available: + model_stats[model] += 1 + + for model in TEST_MODELS: + count = model_stats[model] + percentage = count / valid_count * 100 if valid_count > 0 else 0 + print(f" {model}: {count}/{valid_count} ({percentage:.1f}%)") + +async def main(): + """主函数""" + parser = argparse.ArgumentParser(description="OpenAI API 密钥验证工具") + parser.add_argument("-f", "--file", default=DEFAULT_KEYS_FILE, help="密钥文件路径") + parser.add_argument("-u", "--url", default=DEFAULT_BASE_URL, help="API 基础URL") + parser.add_argument("-c", "--concurrency", type=int, default=DEFAULT_CONCURRENCY, help="并发数") + parser.add_argument("-t", "--timeout", type=int, default=DEFAULT_TIMEOUT, help="超时时间(秒)") + parser.add_argument("-o", "--output", default=".", help="输出目录") + + args = parser.parse_args() + + print("🔑 OpenAI API 密钥验证工具") + print(f"📁 密钥文件: {args.file}") + print(f"🌐 API地址: {args.url}") + print(f"⚡ 并发数: {args.concurrency}") + print(f"⏱️ 超时时间: {args.timeout}秒") + print(f"🧪 测试模型: {', '.join(TEST_MODELS)}") + + # 加载和去重密钥 + print(f"\n📖 加载密钥文件...") + raw_keys = load_keys(args.file) + print(f" 原始密钥数量: {len(raw_keys)}") + + unique_keys = deduplicate_keys(raw_keys) + duplicates = len(raw_keys) - len(unique_keys) + print(f" 去重后数量: {len(unique_keys)}") + if duplicates > 0: + print(f" 发现重复: {duplicates} 个") + + if not unique_keys: + print("❌ 没有找到有效的密钥") + sys.exit(1) + + # 开始验证 + start_time = time.time() + results = await validate_keys_batch(unique_keys, args.url, args.timeout, args.concurrency) + duration = time.time() - start_time + + # 保存结果 + valid_file, invalid_file, valid_count, invalid_count = save_results(results, args.output) + + # 打印总结 + print_summary(results, valid_count, invalid_count, valid_file, invalid_file, duration) + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n\n⚠️ 用户中断验证过程") + sys.exit(1) + except Exception as e: + print(f"\n❌ 验证过程中发生错误: {e}") + sys.exit(1)