| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import argparse
- import csv
- import time
- import re
- import requests
- from pathlib import Path
- # ===========================
- # 配置:文件路径
- # ===========================
- COOKIE_FILE = Path("cookie.txt") # 你的 cookie 存在这里(不建议动这个)
- # ===========================
- # Cookie 解析函数
- # ===========================
- def load_cookie_from_file() -> dict:
- if not COOKIE_FILE.exists():
- raise RuntimeError(
- "未找到 cookie.txt,请在脚本同目录创建 cookie.txt,内容为浏览器抓到的 Cookie 字符串"
- )
- cookie_str = COOKIE_FILE.read_text().strip()
- if not cookie_str:
- raise RuntimeError("cookie.txt 内容为空,请把 curl -b 'xxx' 里的内容贴进去")
- cookies = {}
- for part in cookie_str.split(";"):
- part = part.strip()
- if not part:
- continue
- if "=" in part:
- k, v = part.split("=", 1)
- cookies[k.strip()] = v.strip()
- return cookies
- # ===========================
- # HTTP 相关
- # ===========================
- BASE_URL = "https://partner.steamgames.com/querycdkey/cdkey"
- HEADERS = {
- "User-Agent": (
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
- "AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/142.0.0.0 Safari/537.36"
- ),
- "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
- }
- def query_key(key: str, cookies: dict) -> str:
- """查询单个 KEY,返回 HTML 文本"""
- params = {"cdkey": key, "method": "query"}
- resp = requests.get(
- BASE_URL,
- params=params,
- headers=HEADERS,
- cookies=cookies,
- timeout=15,
- )
- resp.raise_for_status()
- return resp.text
- # ===========================
- # HTML 解析:提取 Package 信息
- # ===========================
- def extract_package_info(html: str):
- """
- 从 HTML 中提取 KEY 所属程序包信息
- 返回 (包名, PackageID) 或 (None, None)
- """
- m = re.search(
- r'<a[^>]+href="https://partner\.steamgames\.com/store/packagelanding/(\d+)"[^>]*>(.*?)</a>',
- html,
- re.S,
- )
- if not m:
- return None, None
- package_id = m.group(1)
- link_text = m.group(2).strip()
- package_name = re.sub(r"\s+", " ", link_text)
- return package_name, package_id
- # ===========================
- # 状态解析(颜色优先,文字兜底)
- # ===========================
- def parse_status(html: str):
- """
- 状态判断优先级:
- 1)优先用颜色(语言无关):
- - 检测到 #67c1f5 → 已激活
- - 检测到 #e24044 → 未激活(再用包信息区分是否有效)
- 2)如果颜色都没检测到,再用文字兜底:
- - “已激活” → 已激活(文字判断)
- - “未激活” + 有包信息 → 未激活(有效 KEY)
- - “未激活” + 无包信息 → 未激活(无效或不存在)
- 3)最后用英文文案兜底:
- - has not been redeemed / not yet registered → 未激活(英文提示)
- - already been redeemed / already registered → 已激活(英文提示)
- - invalid cd key → 无效 KEY(英文提示)
- """
- package_name, package_id = extract_package_info(html)
- html_lower = html.lower()
- status = "未知"
- note = ""
- # ===== 1. 颜色优先(语言无关) =====
- if "#67c1f5" in html_lower:
- # 蓝色提示:已激活
- status = "已激活"
- elif "#e24044" in html_lower:
- # 红色提示:未激活,但区分有效/无效
- if package_name:
- status = "未激活(有效 KEY)"
- else:
- status = "未激活(无效或不存在)"
- else:
- # ===== 2. 没检测到颜色,用文字兜底 =====
- # 中文 UI
- if "已激活" in html:
- status = "已激活(文字判断)"
- elif "未激活" in html:
- if package_name:
- status = "未激活(有效 KEY,文字判断)"
- else:
- status = "未激活(无效或不存在,文字判断)"
- # 英文 UI 兜底
- elif "has not been redeemed" in html_lower or "not yet registered" in html_lower:
- status = "未激活(英文提示)"
- elif "already been redeemed" in html_lower or "already registered" in html_lower:
- status = "已激活(英文提示)"
- elif "invalid cd key" in html_lower:
- status = "无效 KEY(英文提示)"
- if package_name and status.startswith("未知"):
- note = "检测到包信息,但状态未能匹配"
- return status, package_name, package_id, note
- # ===========================
- # 状态分类辅助
- # ===========================
- def classify_status(status: str, counters: dict):
- """根据状态字符串更新计数器"""
- if status.startswith("已激活"):
- counters["activated"] += 1
- elif status.startswith("未激活(有效"):
- counters["unactivated_valid"] += 1
- elif "无效" in status:
- counters["unactivated_invalid"] += 1
- else:
- counters["unknown"] += 1
- def is_done_status(status: str) -> bool:
- """
- 判定一个状态是否“已完成”,用于决定是否跳过重新查询:
- - 所有“已激活*”
- - 所有包含“无效”的状态(无效 KEY / 无效或不存在 等)
- """
- if not status:
- return False
- return status.startswith("已激活") or ("无效" in status)
- # ===========================
- # 从已有结果中加载“已完成”的 KEY
- # ===========================
- def load_existing_results(output_file: Path):
- """
- 如果结果文件存在,读取它:
- - 返回 prev_results: key -> row(dict)
- - 返回 done_keys: 已完成的 key(已激活 或 “无效”)
- """
- prev_results = {}
- done_keys = set()
- if not output_file.exists():
- return prev_results, done_keys
- with output_file.open("r", encoding="utf-8", newline="") as f:
- reader = csv.DictReader(f)
- for row in reader:
- key = (row.get("KEY") or "").strip()
- if not key:
- continue
- prev_results[key] = row
- status = (row.get("状态") or "").strip()
- if is_done_status(status):
- done_keys.add(key)
- return prev_results, done_keys
- # ===========================
- # 主流程
- # ===========================
- def main():
- # 参数解析
- parser = argparse.ArgumentParser(description="批量查询 Steam CD Key 状态(支持断点续扫)")
- parser.add_argument(
- "-k", "--keys",
- default="keys.txt",
- help="待查询 key 文件路径(默认:keys.txt)"
- )
- parser.add_argument(
- "-o", "--output",
- default="key_status.csv",
- help="结果输出文件路径(默认:key_status.csv)"
- )
- args = parser.parse_args()
- keys_file = Path(args.keys)
- output_file = Path(args.output)
- # 1. 载入 cookie
- cookies = load_cookie_from_file()
- print("已成功载入 cookie.txt 的登录信息\n")
- # 2. 检查 key 文件
- if not keys_file.exists():
- print(f"未找到 key 文件:{keys_file},请创建后重试。")
- return
- # 3. 如果已有结果文件,先加载旧结果(用于跳过已完成的 key)
- prev_results, done_keys = load_existing_results(output_file)
- if prev_results:
- print(f"检测到已有结果文件:{output_file}")
- print(f"其中已完成(已激活/无效)的 key 数量:{len(done_keys)}\n")
- # 记录之前已激活的 key(用于统计“之前已激活”和“新激活”)
- prev_activated_keys = {
- key for key, row in prev_results.items()
- if (row.get("状态") or "").startswith("已激活")
- }
- # 4. 读取 key 列表
- keys = []
- with keys_file.open("r", encoding="utf-8") as f_in:
- for line in f_in:
- key = line.strip()
- if key:
- keys.append(key)
- if not keys:
- print(f"{keys_file} 中没有找到任何 key")
- return
- total = len(keys)
- # 统计变量
- counters = {
- "activated": 0,
- "unactivated_valid": 0,
- "unactivated_invalid": 0,
- "unknown": 0,
- "previous_activated": 0, # 之前已激活
- "new_activated": 0, # 本次新激活
- }
- # 5. 重新写结果文件(避免重复行)
- with output_file.open("w", newline="", encoding="utf-8") as f_out:
- writer = csv.writer(f_out)
- writer.writerow(["KEY", "状态", "程序包名称", "PackageID", "备注"])
- for idx, key in enumerate(keys, start=1):
- progress = f"[{idx}/{total} {idx / total * 100:5.1f}%]"
- # 如果 key 在旧结果里,并且已完成(已激活 / 无效),直接复用
- if key in prev_results and key in done_keys:
- row = prev_results[key]
- status = row.get("状态", "")
- package_name = row.get("程序包名称", "")
- package_id = row.get("PackageID", "")
- note = row.get("备注", "")
- writer.writerow([key, status, package_name, package_id, note])
- classify_status(status, counters)
- # 统计:之前已激活
- if key in prev_activated_keys:
- counters["previous_activated"] += 1
- print(f"{progress} 跳过(已完成):{key} -> {status}")
- continue
- # 否则重新查询
- try:
- html = query_key(key, cookies)
- status, package_name, package_id, note = parse_status(html)
- writer.writerow([key, status, package_name or "", package_id or "", note])
- classify_status(status, counters)
- # 本次新激活(之前没激活,现在是已激活)
- if status.startswith("已激活") and key not in prev_activated_keys:
- counters["new_activated"] += 1
- if package_id:
- print(f"{progress} 查询:{key} -> {status} {package_name} ({package_id})")
- else:
- print(f"{progress} 查询:{key} -> {status}")
- except Exception as e:
- err = f"请求失败: {e}"
- writer.writerow([key, "请求失败", "", "", err])
- counters["unknown"] += 1
- print(f"{progress} 查询:{key} -> {err}")
- time.sleep(1) # 防止被风控
- # ===== 统计结果展示 =====
- total_count = (
- counters["activated"]
- + counters["unactivated_valid"]
- + counters["unactivated_invalid"]
- + counters["unknown"]
- )
- print("\n========== 查询统计 ==========")
- print(f"已激活: {counters['activated']}")
- print(f" ├─ 之前已激活: {counters['previous_activated']}")
- print(f" └─ 本次新激活: {counters['new_activated']}")
- print(f"未激活(有效KEY): {counters['unactivated_valid']}")
- print(f"未激活(无效KEY): {counters['unactivated_invalid']}")
- print(f"未知 / 请求失败: {counters['unknown']}")
- print(f"总计: {total_count}")
- print(f"\n结果已写入:{output_file}")
- if __name__ == "__main__":
- main()
|