#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import csv import time import re import requests from pathlib import Path # =========================== # 配置:文件路径 # =========================== COOKIE_FILE = Path("cookie.txt") # 你的 cookie 存在这里(不建议动这个) # =========================== # Cookie 解析函数 # =========================== def load_cookie_from_file() -> dict: if not COOKIE_FILE.exists(): raise RuntimeError( "未找到 cookie.txt,请在脚本同目录创建 cookie.txt,内容为浏览器抓到的 Cookie 字符串" ) cookie_str = COOKIE_FILE.read_text().strip() if not cookie_str: raise RuntimeError("cookie.txt 内容为空,请把 curl -b 'xxx' 里的内容贴进去") cookies = {} for part in cookie_str.split(";"): part = part.strip() if not part: continue if "=" in part: k, v = part.split("=", 1) cookies[k.strip()] = v.strip() return cookies # =========================== # HTTP 相关 # =========================== BASE_URL = "https://partner.steamgames.com/querycdkey/cdkey" HEADERS = { "User-Agent": ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/142.0.0.0 Safari/537.36" ), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", } def query_key(key: str, cookies: dict) -> str: """查询单个 KEY,返回 HTML 文本""" params = {"cdkey": key, "method": "query"} resp = requests.get( BASE_URL, params=params, headers=HEADERS, cookies=cookies, timeout=15, ) resp.raise_for_status() return resp.text # =========================== # HTML 解析:提取 Package 信息 # =========================== def extract_package_info(html: str): """ 从 HTML 中提取 KEY 所属程序包信息 返回 (包名, PackageID) 或 (None, None) """ m = re.search( r']+href="https://partner\.steamgames\.com/store/packagelanding/(\d+)"[^>]*>(.*?)', html, re.S, ) if not m: return None, None package_id = m.group(1) link_text = m.group(2).strip() package_name = re.sub(r"\s+", " ", link_text) return package_name, package_id # =========================== # 状态解析(颜色优先,文字兜底) # =========================== def parse_status(html: str): """ 状态判断优先级: 1)优先用颜色(语言无关): - 检测到 #67c1f5 → 已激活 - 检测到 #e24044 → 未激活(再用包信息区分是否有效) 2)如果颜色都没检测到,再用文字兜底: - “已激活” → 已激活(文字判断) - “未激活” + 有包信息 → 未激活(有效 KEY) - “未激活” + 无包信息 → 未激活(无效或不存在) 3)最后用英文文案兜底: - has not been redeemed / not yet registered → 未激活(英文提示) - already been redeemed / already registered → 已激活(英文提示) - invalid cd key → 无效 KEY(英文提示) """ package_name, package_id = extract_package_info(html) html_lower = html.lower() status = "未知" note = "" # ===== 1. 颜色优先(语言无关) ===== if "#67c1f5" in html_lower: # 蓝色提示:已激活 status = "已激活" elif "#e24044" in html_lower: # 红色提示:未激活,但区分有效/无效 if package_name: status = "未激活(有效 KEY)" else: status = "未激活(无效或不存在)" else: # ===== 2. 没检测到颜色,用文字兜底 ===== # 中文 UI if "已激活" in html: status = "已激活(文字判断)" elif "未激活" in html: if package_name: status = "未激活(有效 KEY,文字判断)" else: status = "未激活(无效或不存在,文字判断)" # 英文 UI 兜底 elif "has not been redeemed" in html_lower or "not yet registered" in html_lower: status = "未激活(英文提示)" elif "already been redeemed" in html_lower or "already registered" in html_lower: status = "已激活(英文提示)" elif "invalid cd key" in html_lower: status = "无效 KEY(英文提示)" if package_name and status.startswith("未知"): note = "检测到包信息,但状态未能匹配" return status, package_name, package_id, note # =========================== # 状态分类辅助 # =========================== def classify_status(status: str, counters: dict): """根据状态字符串更新计数器""" if status.startswith("已激活"): counters["activated"] += 1 elif status.startswith("未激活(有效"): counters["unactivated_valid"] += 1 elif "无效" in status: counters["unactivated_invalid"] += 1 else: counters["unknown"] += 1 def is_done_status(status: str) -> bool: """ 判定一个状态是否“已完成”,用于决定是否跳过重新查询: - 所有“已激活*” - 所有包含“无效”的状态(无效 KEY / 无效或不存在 等) """ if not status: return False return status.startswith("已激活") or ("无效" in status) # =========================== # 从已有结果中加载“已完成”的 KEY # =========================== def load_existing_results(output_file: Path): """ 如果结果文件存在,读取它: - 返回 prev_results: key -> row(dict) - 返回 done_keys: 已完成的 key(已激活 或 “无效”) """ prev_results = {} done_keys = set() if not output_file.exists(): return prev_results, done_keys with output_file.open("r", encoding="utf-8", newline="") as f: reader = csv.DictReader(f) for row in reader: key = (row.get("KEY") or "").strip() if not key: continue prev_results[key] = row status = (row.get("状态") or "").strip() if is_done_status(status): done_keys.add(key) return prev_results, done_keys # =========================== # 主流程 # =========================== def main(): # 参数解析 parser = argparse.ArgumentParser(description="批量查询 Steam CD Key 状态(支持断点续扫)") parser.add_argument( "-k", "--keys", default="keys.txt", help="待查询 key 文件路径(默认:keys.txt)" ) parser.add_argument( "-o", "--output", default="key_status.csv", help="结果输出文件路径(默认:key_status.csv)" ) args = parser.parse_args() keys_file = Path(args.keys) output_file = Path(args.output) # 1. 载入 cookie cookies = load_cookie_from_file() print("已成功载入 cookie.txt 的登录信息\n") # 2. 检查 key 文件 if not keys_file.exists(): print(f"未找到 key 文件:{keys_file},请创建后重试。") return # 3. 如果已有结果文件,先加载旧结果(用于跳过已完成的 key) prev_results, done_keys = load_existing_results(output_file) if prev_results: print(f"检测到已有结果文件:{output_file}") print(f"其中已完成(已激活/无效)的 key 数量:{len(done_keys)}\n") # 记录之前已激活的 key(用于统计“之前已激活”和“新激活”) prev_activated_keys = { key for key, row in prev_results.items() if (row.get("状态") or "").startswith("已激活") } # 4. 读取 key 列表 keys = [] with keys_file.open("r", encoding="utf-8") as f_in: for line in f_in: key = line.strip() if key: keys.append(key) if not keys: print(f"{keys_file} 中没有找到任何 key") return total = len(keys) # 统计变量 counters = { "activated": 0, "unactivated_valid": 0, "unactivated_invalid": 0, "unknown": 0, "previous_activated": 0, # 之前已激活 "new_activated": 0, # 本次新激活 } # 5. 重新写结果文件(避免重复行) with output_file.open("w", newline="", encoding="utf-8") as f_out: writer = csv.writer(f_out) writer.writerow(["KEY", "状态", "程序包名称", "PackageID", "备注"]) for idx, key in enumerate(keys, start=1): progress = f"[{idx}/{total} {idx / total * 100:5.1f}%]" # 如果 key 在旧结果里,并且已完成(已激活 / 无效),直接复用 if key in prev_results and key in done_keys: row = prev_results[key] status = row.get("状态", "") package_name = row.get("程序包名称", "") package_id = row.get("PackageID", "") note = row.get("备注", "") writer.writerow([key, status, package_name, package_id, note]) classify_status(status, counters) # 统计:之前已激活 if key in prev_activated_keys: counters["previous_activated"] += 1 print(f"{progress} 跳过(已完成):{key} -> {status}") continue # 否则重新查询 try: html = query_key(key, cookies) status, package_name, package_id, note = parse_status(html) writer.writerow([key, status, package_name or "", package_id or "", note]) classify_status(status, counters) # 本次新激活(之前没激活,现在是已激活) if status.startswith("已激活") and key not in prev_activated_keys: counters["new_activated"] += 1 if package_id: print(f"{progress} 查询:{key} -> {status} {package_name} ({package_id})") else: print(f"{progress} 查询:{key} -> {status}") except Exception as e: err = f"请求失败: {e}" writer.writerow([key, "请求失败", "", "", err]) counters["unknown"] += 1 print(f"{progress} 查询:{key} -> {err}") time.sleep(1) # 防止被风控 # ===== 统计结果展示 ===== total_count = ( counters["activated"] + counters["unactivated_valid"] + counters["unactivated_invalid"] + counters["unknown"] ) print("\n========== 查询统计 ==========") print(f"已激活: {counters['activated']}") print(f" ├─ 之前已激活: {counters['previous_activated']}") print(f" └─ 本次新激活: {counters['new_activated']}") print(f"未激活(有效KEY): {counters['unactivated_valid']}") print(f"未激活(无效KEY): {counters['unactivated_invalid']}") print(f"未知 / 请求失败: {counters['unknown']}") print(f"总计: {total_count}") print(f"\n结果已写入:{output_file}") if __name__ == "__main__": main()