query_keys.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import argparse
  4. import csv
  5. import time
  6. import re
  7. import requests
  8. from pathlib import Path
  9. # ===========================
  10. # 配置:文件路径
  11. # ===========================
  12. COOKIE_FILE = Path("cookie.txt") # 你的 cookie 存在这里(不建议动这个)
  13. # ===========================
  14. # Cookie 解析函数
  15. # ===========================
  16. def load_cookie_from_file() -> dict:
  17. if not COOKIE_FILE.exists():
  18. raise RuntimeError(
  19. "未找到 cookie.txt,请在脚本同目录创建 cookie.txt,内容为浏览器抓到的 Cookie 字符串"
  20. )
  21. cookie_str = COOKIE_FILE.read_text().strip()
  22. if not cookie_str:
  23. raise RuntimeError("cookie.txt 内容为空,请把 curl -b 'xxx' 里的内容贴进去")
  24. cookies = {}
  25. for part in cookie_str.split(";"):
  26. part = part.strip()
  27. if not part:
  28. continue
  29. if "=" in part:
  30. k, v = part.split("=", 1)
  31. cookies[k.strip()] = v.strip()
  32. return cookies
  33. # ===========================
  34. # HTTP 相关
  35. # ===========================
  36. BASE_URL = "https://partner.steamgames.com/querycdkey/cdkey"
  37. HEADERS = {
  38. "User-Agent": (
  39. "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
  40. "AppleWebKit/537.36 (KHTML, like Gecko) "
  41. "Chrome/142.0.0.0 Safari/537.36"
  42. ),
  43. "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
  44. }
  45. def query_key(key: str, cookies: dict) -> str:
  46. """查询单个 KEY,返回 HTML 文本"""
  47. params = {"cdkey": key, "method": "query"}
  48. resp = requests.get(
  49. BASE_URL,
  50. params=params,
  51. headers=HEADERS,
  52. cookies=cookies,
  53. timeout=15,
  54. )
  55. resp.raise_for_status()
  56. return resp.text
  57. # ===========================
  58. # HTML 解析:提取 Package 信息
  59. # ===========================
  60. def extract_package_info(html: str):
  61. """
  62. 从 HTML 中提取 KEY 所属程序包信息
  63. 返回 (包名, PackageID) 或 (None, None)
  64. """
  65. m = re.search(
  66. r'<a[^>]+href="https://partner\.steamgames\.com/store/packagelanding/(\d+)"[^>]*>(.*?)</a>',
  67. html,
  68. re.S,
  69. )
  70. if not m:
  71. return None, None
  72. package_id = m.group(1)
  73. link_text = m.group(2).strip()
  74. package_name = re.sub(r"\s+", " ", link_text)
  75. return package_name, package_id
  76. # ===========================
  77. # 状态解析(颜色优先,文字兜底)
  78. # ===========================
  79. def parse_status(html: str):
  80. """
  81. 状态判断优先级:
  82. 1)优先用颜色(语言无关):
  83. - 检测到 #67c1f5 → 已激活
  84. - 检测到 #e24044 → 未激活(再用包信息区分是否有效)
  85. 2)如果颜色都没检测到,再用文字兜底:
  86. - “已激活” → 已激活(文字判断)
  87. - “未激活” + 有包信息 → 未激活(有效 KEY)
  88. - “未激活” + 无包信息 → 未激活(无效或不存在)
  89. 3)最后用英文文案兜底:
  90. - has not been redeemed / not yet registered → 未激活(英文提示)
  91. - already been redeemed / already registered → 已激活(英文提示)
  92. - invalid cd key → 无效 KEY(英文提示)
  93. """
  94. package_name, package_id = extract_package_info(html)
  95. html_lower = html.lower()
  96. status = "未知"
  97. note = ""
  98. # ===== 1. 颜色优先(语言无关) =====
  99. if "#67c1f5" in html_lower:
  100. # 蓝色提示:已激活
  101. status = "已激活"
  102. elif "#e24044" in html_lower:
  103. # 红色提示:未激活,但区分有效/无效
  104. if package_name:
  105. status = "未激活(有效 KEY)"
  106. else:
  107. status = "未激活(无效或不存在)"
  108. else:
  109. # ===== 2. 没检测到颜色,用文字兜底 =====
  110. # 中文 UI
  111. if "已激活" in html:
  112. status = "已激活(文字判断)"
  113. elif "未激活" in html:
  114. if package_name:
  115. status = "未激活(有效 KEY,文字判断)"
  116. else:
  117. status = "未激活(无效或不存在,文字判断)"
  118. # 英文 UI 兜底
  119. elif "has not been redeemed" in html_lower or "not yet registered" in html_lower:
  120. status = "未激活(英文提示)"
  121. elif "already been redeemed" in html_lower or "already registered" in html_lower:
  122. status = "已激活(英文提示)"
  123. elif "invalid cd key" in html_lower:
  124. status = "无效 KEY(英文提示)"
  125. if package_name and status.startswith("未知"):
  126. note = "检测到包信息,但状态未能匹配"
  127. return status, package_name, package_id, note
  128. # ===========================
  129. # 状态分类辅助
  130. # ===========================
  131. def classify_status(status: str, counters: dict):
  132. """根据状态字符串更新计数器"""
  133. if status.startswith("已激活"):
  134. counters["activated"] += 1
  135. elif status.startswith("未激活(有效"):
  136. counters["unactivated_valid"] += 1
  137. elif "无效" in status:
  138. counters["unactivated_invalid"] += 1
  139. else:
  140. counters["unknown"] += 1
  141. def is_done_status(status: str) -> bool:
  142. """
  143. 判定一个状态是否“已完成”,用于决定是否跳过重新查询:
  144. - 所有“已激活*”
  145. - 所有包含“无效”的状态(无效 KEY / 无效或不存在 等)
  146. """
  147. if not status:
  148. return False
  149. return status.startswith("已激活") or ("无效" in status)
  150. # ===========================
  151. # 从已有结果中加载“已完成”的 KEY
  152. # ===========================
  153. def load_existing_results(output_file: Path):
  154. """
  155. 如果结果文件存在,读取它:
  156. - 返回 prev_results: key -> row(dict)
  157. - 返回 done_keys: 已完成的 key(已激活 或 “无效”)
  158. """
  159. prev_results = {}
  160. done_keys = set()
  161. if not output_file.exists():
  162. return prev_results, done_keys
  163. with output_file.open("r", encoding="utf-8", newline="") as f:
  164. reader = csv.DictReader(f)
  165. for row in reader:
  166. key = (row.get("KEY") or "").strip()
  167. if not key:
  168. continue
  169. prev_results[key] = row
  170. status = (row.get("状态") or "").strip()
  171. if is_done_status(status):
  172. done_keys.add(key)
  173. return prev_results, done_keys
  174. # ===========================
  175. # 主流程
  176. # ===========================
  177. def main():
  178. # 参数解析
  179. parser = argparse.ArgumentParser(description="批量查询 Steam CD Key 状态(支持断点续扫)")
  180. parser.add_argument(
  181. "-k", "--keys",
  182. default="keys.txt",
  183. help="待查询 key 文件路径(默认:keys.txt)"
  184. )
  185. parser.add_argument(
  186. "-o", "--output",
  187. default="key_status.csv",
  188. help="结果输出文件路径(默认:key_status.csv)"
  189. )
  190. args = parser.parse_args()
  191. keys_file = Path(args.keys)
  192. output_file = Path(args.output)
  193. # 1. 载入 cookie
  194. cookies = load_cookie_from_file()
  195. print("已成功载入 cookie.txt 的登录信息\n")
  196. # 2. 检查 key 文件
  197. if not keys_file.exists():
  198. print(f"未找到 key 文件:{keys_file},请创建后重试。")
  199. return
  200. # 3. 如果已有结果文件,先加载旧结果(用于跳过已完成的 key)
  201. prev_results, done_keys = load_existing_results(output_file)
  202. if prev_results:
  203. print(f"检测到已有结果文件:{output_file}")
  204. print(f"其中已完成(已激活/无效)的 key 数量:{len(done_keys)}\n")
  205. # 记录之前已激活的 key(用于统计“之前已激活”和“新激活”)
  206. prev_activated_keys = {
  207. key for key, row in prev_results.items()
  208. if (row.get("状态") or "").startswith("已激活")
  209. }
  210. # 4. 读取 key 列表
  211. keys = []
  212. with keys_file.open("r", encoding="utf-8") as f_in:
  213. for line in f_in:
  214. key = line.strip()
  215. if key:
  216. keys.append(key)
  217. if not keys:
  218. print(f"{keys_file} 中没有找到任何 key")
  219. return
  220. total = len(keys)
  221. # 统计变量
  222. counters = {
  223. "activated": 0,
  224. "unactivated_valid": 0,
  225. "unactivated_invalid": 0,
  226. "unknown": 0,
  227. "previous_activated": 0, # 之前已激活
  228. "new_activated": 0, # 本次新激活
  229. }
  230. # 5. 重新写结果文件(避免重复行)
  231. with output_file.open("w", newline="", encoding="utf-8") as f_out:
  232. writer = csv.writer(f_out)
  233. writer.writerow(["KEY", "状态", "程序包名称", "PackageID", "备注"])
  234. for idx, key in enumerate(keys, start=1):
  235. progress = f"[{idx}/{total} {idx / total * 100:5.1f}%]"
  236. # 如果 key 在旧结果里,并且已完成(已激活 / 无效),直接复用
  237. if key in prev_results and key in done_keys:
  238. row = prev_results[key]
  239. status = row.get("状态", "")
  240. package_name = row.get("程序包名称", "")
  241. package_id = row.get("PackageID", "")
  242. note = row.get("备注", "")
  243. writer.writerow([key, status, package_name, package_id, note])
  244. classify_status(status, counters)
  245. # 统计:之前已激活
  246. if key in prev_activated_keys:
  247. counters["previous_activated"] += 1
  248. print(f"{progress} 跳过(已完成):{key} -> {status}")
  249. continue
  250. # 否则重新查询
  251. try:
  252. html = query_key(key, cookies)
  253. status, package_name, package_id, note = parse_status(html)
  254. writer.writerow([key, status, package_name or "", package_id or "", note])
  255. classify_status(status, counters)
  256. # 本次新激活(之前没激活,现在是已激活)
  257. if status.startswith("已激活") and key not in prev_activated_keys:
  258. counters["new_activated"] += 1
  259. if package_id:
  260. print(f"{progress} 查询:{key} -> {status} {package_name} ({package_id})")
  261. else:
  262. print(f"{progress} 查询:{key} -> {status}")
  263. except Exception as e:
  264. err = f"请求失败: {e}"
  265. writer.writerow([key, "请求失败", "", "", err])
  266. counters["unknown"] += 1
  267. print(f"{progress} 查询:{key} -> {err}")
  268. time.sleep(1) # 防止被风控
  269. # ===== 统计结果展示 =====
  270. total_count = (
  271. counters["activated"]
  272. + counters["unactivated_valid"]
  273. + counters["unactivated_invalid"]
  274. + counters["unknown"]
  275. )
  276. print("\n========== 查询统计 ==========")
  277. print(f"已激活: {counters['activated']}")
  278. print(f" ├─ 之前已激活: {counters['previous_activated']}")
  279. print(f" └─ 本次新激活: {counters['new_activated']}")
  280. print(f"未激活(有效KEY): {counters['unactivated_valid']}")
  281. print(f"未激活(无效KEY): {counters['unactivated_invalid']}")
  282. print(f"未知 / 请求失败: {counters['unknown']}")
  283. print(f"总计: {total_count}")
  284. print(f"\n结果已写入:{output_file}")
  285. if __name__ == "__main__":
  286. main()