query_keys.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import argparse
  4. import csv
  5. import time
  6. import re
  7. import requests
  8. from pathlib import Path
  9. # ===========================
  10. # 配置:文件路径
  11. # ===========================
  12. COOKIE_FILE = Path("cookie.txt") # 你的 cookie 存在这里(不建议动这个)
  13. # ===========================
  14. # Cookie 解析函数
  15. # ===========================
  16. def load_cookie_from_file() -> dict:
  17. if not COOKIE_FILE.exists():
  18. raise RuntimeError(
  19. "未找到 cookie.txt,请在脚本同目录创建 cookie.txt,内容为浏览器抓到的 Cookie 字符串"
  20. )
  21. cookie_str = COOKIE_FILE.read_text().strip()
  22. if not cookie_str:
  23. raise RuntimeError("cookie.txt 内容为空,请把 curl -b 'xxx' 里的内容贴进去")
  24. cookies = {}
  25. for part in cookie_str.split(";"):
  26. part = part.strip()
  27. if not part:
  28. continue
  29. if "=" in part:
  30. k, v = part.split("=", 1)
  31. cookies[k.strip()] = v.strip()
  32. return cookies
  33. # ===========================
  34. # HTTP 相关
  35. # ===========================
  36. BASE_URL = "https://partner.steamgames.com/querycdkey/cdkey"
  37. HEADERS = {
  38. "User-Agent": (
  39. "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
  40. "AppleWebKit/537.36 (KHTML, like Gecko) "
  41. "Chrome/142.0.0.0 Safari/537.36"
  42. ),
  43. "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
  44. }
  45. def query_key(key: str, cookies: dict) -> str:
  46. """查询单个 KEY,返回 HTML 文本"""
  47. params = {"cdkey": key, "method": "query"}
  48. resp = requests.get(
  49. BASE_URL,
  50. params=params,
  51. headers=HEADERS,
  52. cookies=cookies,
  53. timeout=15,
  54. )
  55. resp.raise_for_status()
  56. return resp.text
  57. # ===========================
  58. # HTML 解析:提取 Package 信息
  59. # ===========================
  60. def extract_package_info(html: str):
  61. """
  62. 从 HTML 中提取 KEY 所属程序包信息
  63. 返回 (包名, PackageID) 或 (None, None)
  64. """
  65. m = re.search(
  66. r'<a[^>]+href="https://partner\.steamgames\.com/store/packagelanding/(\d+)"[^>]*>(.*?)</a>',
  67. html,
  68. re.S,
  69. )
  70. if not m:
  71. return None, None
  72. package_id = m.group(1)
  73. link_text = m.group(2).strip()
  74. package_name = re.sub(r"\s+", " ", link_text)
  75. return package_name, package_id
  76. # ===========================
  77. # 状态解析(颜色优先,文字兜底)
  78. # ===========================
  79. def parse_status(html: str):
  80. """
  81. 状态判断优先级:
  82. 1)优先用颜色(语言无关):
  83. - 检测到 #67c1f5 → 已激活
  84. - 检测到 #e24044 → 未激活(再用包信息区分是否有效)
  85. 2)如果颜色都没检测到,再用文字兜底:
  86. - “已激活” → 已激活(文字判断)
  87. - “未激活” + 有包信息 → 未激活(有效 KEY)
  88. - “未激活” + 无包信息 → 未激活(无效或不存在)
  89. 3)最后用英文文案兜底:
  90. - has not been redeemed / not yet registered → 未激活(英文提示)
  91. - already been redeemed / already registered → 已激活(英文提示)
  92. - invalid cd key → 无效 KEY(英文提示)
  93. """
  94. package_name, package_id = extract_package_info(html)
  95. html_lower = html.lower()
  96. status = "未知"
  97. note = ""
  98. # ===== 1. 颜色优先(语言无关) =====
  99. if "#67c1f5" in html_lower:
  100. # 蓝色提示:已激活
  101. status = "已激活"
  102. elif "#e24044" in html_lower:
  103. # 红色提示:未激活,但区分有效/无效
  104. if package_name:
  105. status = "未激活(有效 KEY)"
  106. else:
  107. status = "未激活(无效或不存在)"
  108. else:
  109. # ===== 2. 没检测到颜色,用文字兜底 =====
  110. # 中文 UI
  111. if "已激活" in html:
  112. status = "已激活(文字判断)"
  113. elif "未激活" in html:
  114. if package_name:
  115. status = "未激活(有效 KEY,文字判断)"
  116. else:
  117. status = "未激活(无效或不存在,文字判断)"
  118. # 英文 UI 兜底
  119. elif "has not been redeemed" in html_lower or "not yet registered" in html_lower:
  120. status = "未激活(英文提示)"
  121. elif "already been redeemed" in html_lower or "already registered" in html_lower:
  122. status = "已激活(英文提示)"
  123. elif "invalid cd key" in html_lower:
  124. status = "无效 KEY(英文提示)"
  125. if package_name and status.startswith("未知"):
  126. note = "检测到包信息,但状态未能匹配"
  127. return status, package_name, package_id, note
  128. # ===========================
  129. # 状态分类辅助
  130. # ===========================
  131. def classify_status(status: str, counters: dict):
  132. """根据状态字符串更新计数器"""
  133. if status.startswith("已激活"):
  134. counters["activated"] += 1
  135. elif status.startswith("未激活(有效"):
  136. counters["unactivated_valid"] += 1
  137. elif "无效" in status:
  138. counters["unactivated_invalid"] += 1
  139. else:
  140. counters["unknown"] += 1
  141. def is_done_status(status: str) -> bool:
  142. """
  143. 判定一个状态是否“已完成”,用于决定是否跳过重新查询:
  144. - 所有“已激活*”
  145. - 所有包含“无效”的状态(无效 KEY / 无效或不存在 等)
  146. """
  147. if not status:
  148. return False
  149. return status.startswith("已激活") or ("无效" in status)
  150. # ===========================
  151. # 从已有结果中加载“已完成”的 KEY
  152. # ===========================
  153. def load_existing_results(output_file: Path):
  154. """
  155. 如果结果文件存在,读取它:
  156. - 返回 prev_results: key -> row(dict)
  157. - 返回 done_keys: 已完成的 key(已激活 或 “无效”)
  158. """
  159. prev_results = {}
  160. done_keys = set()
  161. if not output_file.exists():
  162. return prev_results, done_keys
  163. with output_file.open("r", encoding="utf-8", newline="") as f:
  164. reader = csv.DictReader(f)
  165. for row in reader:
  166. key = (row.get("KEY") or "").strip()
  167. if not key:
  168. continue
  169. prev_results[key] = row
  170. status = (row.get("状态") or "").strip()
  171. if is_done_status(status):
  172. done_keys.add(key)
  173. return prev_results, done_keys
  174. def print_statistics(counters: dict):
  175. """统一打印统计信息"""
  176. total_count = (
  177. counters["activated"]
  178. + counters["unactivated_valid"]
  179. + counters["unactivated_invalid"]
  180. + counters["unknown"]
  181. )
  182. print("\n========== 查询统计 ==========")
  183. print(f"已激活: {counters['activated']}")
  184. print(f" ├─ 之前已激活: {counters['previous_activated']}")
  185. print(f" └─ 本次新激活: {counters['new_activated']}")
  186. print(f"未激活(有效KEY): {counters['unactivated_valid']}")
  187. print(f"未激活(无效KEY): {counters['unactivated_invalid']}")
  188. print(f"未知 / 请求失败: {counters['unknown']}")
  189. print(f"总计: {total_count}")
  190. # ===========================
  191. # 主流程
  192. # ===========================
  193. def main():
  194. # 参数解析
  195. parser = argparse.ArgumentParser(description="批量查询 Steam CD Key 状态(支持断点续扫)")
  196. parser.add_argument(
  197. "-k", "--keys",
  198. default="keys.txt",
  199. help="待查询 key 文件路径(默认:keys.txt)"
  200. )
  201. parser.add_argument(
  202. "-o", "--output",
  203. default="key_status.csv",
  204. help="结果输出文件路径(默认:key_status.csv)"
  205. )
  206. args = parser.parse_args()
  207. keys_file = Path(args.keys)
  208. output_file = Path(args.output)
  209. # 1. 载入 cookie(对缺失/空内容给出友好提示)
  210. try:
  211. cookies = load_cookie_from_file()
  212. except RuntimeError as exc:
  213. print(f"载入 cookie.txt 失败:{exc}")
  214. print("请在 cookie.txt 中粘贴完整的 Cookie 字符串后重试。")
  215. return
  216. print("已成功载入 cookie.txt 的登录信息\n")
  217. # 2. 检查 key 文件
  218. if not keys_file.exists():
  219. print(f"未找到 key 文件:{keys_file},请创建后重试。")
  220. return
  221. # 3. 如果已有结果文件,先加载旧结果(用于跳过已完成的 key)
  222. prev_results, done_keys = load_existing_results(output_file)
  223. if prev_results:
  224. print(f"检测到已有结果文件:{output_file}")
  225. print(f"其中已完成(已激活/无效)的 key 数量:{len(done_keys)}\n")
  226. # 记录之前已激活的 key(用于统计“之前已激活”和“新激活”)
  227. prev_activated_keys = {
  228. key for key, row in prev_results.items()
  229. if (row.get("状态") or "").startswith("已激活")
  230. }
  231. # 4. 读取 key 列表
  232. keys = []
  233. with keys_file.open("r", encoding="utf-8") as f_in:
  234. for line in f_in:
  235. key = line.strip()
  236. if key:
  237. keys.append(key)
  238. if not keys:
  239. print(f"{keys_file} 中没有找到任何 key")
  240. return
  241. total = len(keys)
  242. # 统计变量
  243. counters = {
  244. "activated": 0,
  245. "unactivated_valid": 0,
  246. "unactivated_invalid": 0,
  247. "unknown": 0,
  248. "previous_activated": 0, # 之前已激活
  249. "new_activated": 0, # 本次新激活
  250. }
  251. interrupted = False
  252. processed = 0
  253. try:
  254. # 5. 重新写结果文件(避免重复行)
  255. with output_file.open("w", newline="", encoding="utf-8") as f_out:
  256. writer = csv.writer(f_out)
  257. writer.writerow(["KEY", "状态", "程序包名称", "PackageID", "备注"])
  258. f_out.flush()
  259. for idx, key in enumerate(keys, start=1):
  260. progress = f"[{idx}/{total} {idx / total * 100:5.1f}%]"
  261. # 如果 key 在旧结果里,并且已完成(已激活 / 无效),直接复用
  262. if key in prev_results and key in done_keys:
  263. row = prev_results[key]
  264. status = row.get("状态", "")
  265. package_name = row.get("程序包名称", "")
  266. package_id = row.get("PackageID", "")
  267. note = row.get("备注", "")
  268. writer.writerow([key, status, package_name, package_id, note])
  269. classify_status(status, counters)
  270. # 统计:之前已激活
  271. if key in prev_activated_keys:
  272. counters["previous_activated"] += 1
  273. print(f"{progress} 跳过(已完成):{key} -> {status}")
  274. processed = idx
  275. f_out.flush()
  276. continue
  277. # 否则重新查询
  278. try:
  279. html = query_key(key, cookies)
  280. status, package_name, package_id, note = parse_status(html)
  281. writer.writerow([key, status, package_name or "", package_id or "", note])
  282. classify_status(status, counters)
  283. # 本次新激活(之前没激活,现在是已激活)
  284. if status.startswith("已激活") and key not in prev_activated_keys:
  285. counters["new_activated"] += 1
  286. if package_id:
  287. print(f"{progress} 查询:{key} -> {status} {package_name} ({package_id})")
  288. else:
  289. print(f"{progress} 查询:{key} -> {status}")
  290. except Exception as e:
  291. err = f"请求失败: {e}"
  292. writer.writerow([key, "请求失败", "", "", err])
  293. counters["unknown"] += 1
  294. print(f"{progress} 查询:{key} -> {err}")
  295. processed = idx
  296. f_out.flush()
  297. time.sleep(1) # 防止被风控
  298. except KeyboardInterrupt:
  299. interrupted = True
  300. print(f"\n检测到用户中断(Ctrl+C)。已处理 {processed} / {total} 个 KEY,正在整理统计信息...\n")
  301. # ===== 统计结果展示 =====
  302. print_statistics(counters)
  303. if interrupted:
  304. print(f"\n查询已被用户中断,当前结果已写入:{output_file}")
  305. else:
  306. print(f"\n结果已写入:{output_file}")
  307. if __name__ == "__main__":
  308. main()