#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ imap_curator_export.py ---------------------- Fetch emails via IMAP, find Steam curator key requests since a chosen date, and export results to an Excel file. Features: - Connectivity test (--test) that does NOT read/write state. - Stateful resume via UID bookmarking (JSON). Next run scans only newer UIDs. - Extracts curator/social links and requested key counts (defaults to 2 if not found). - Filters OUT reply/forward messages (replies to you), detected via headers and subject prefixes. - Outputs Excel with requested column order: 1) No. (auto-increment) 2) Mailbox Key (UID of the message) 3) Requested Key Count 4) Date (Beijing time, "YYYY-MM-DD HH:MM") 5) Curator/Name 6) Email 7) Subject 8) Curator/Social Links 9) Body (preview) - Prints a summary after export: * Total keys requested * Keys with social links present * Keys without social links USAGE ----- 1) Install deps (ideally in a virtualenv): pip install -r requirements.txt 2) Set environment variables (examples for macOS/Linux bash/zsh): export IMAP_HOST="imap.gmail.com" # e.g. imap.gmail.com, outlook.office365.com export IMAP_PORT="993" # usually 993 export EMAIL_USER="your_email@example.com" export EMAIL_PASS="your_app_password" # For Gmail/Outlook, use an App Password or OAuth token string export MAILBOX="INBOX" # optional, default INBOX # Start date used ONLY for the first run (no state yet). IMAP format DD-Mon-YYYY. export START_DATE="18-Oct-2025" # Optional: where to store/read bookmark state per mailbox export STATE_FILE="curator_state.json" 3) Run a connectivity test (no state read/write): python imap_curator_export.py --test 4) Run extraction: python imap_curator_export.py 5) Reset bookmark (force full scan from START_DATE next time): python imap_curator_export.py --reset-state Output: ./curator_requests.xlsx """ import os import re import sys import json import imaplib import email import argparse from email.header import decode_header, make_header from email.policy import default as default_policy from email.utils import parsedate_to_datetime from datetime import datetime, timezone from zoneinfo import ZoneInfo from typing import List, Dict, Any, Optional # Third-party from bs4 import BeautifulSoup # for HTML -> text import pandas as pd import re CURATOR_LINK_PATTERNS = [ r"store\.steampowered\.com/curator/", r"steamcommunity\.com/groups/", r"steamcommunity\.com/id/", r"x\.com/|twitter\.com/", r"youtube\.com/|youtu\.be/", r"twitch\.tv/", r"discord\.gg/|discord\.com/invite/", r"facebook\.com/", r"instagram\.com/", r"tiktok\.com/", r"bilibili\.com/", r"weibo\.com/", ] # Heuristics for detecting a request for Steam keys KEY_REQUEST_PATTERNS = [ r"\b(\d{1,3})\s*(?:keys?|key codes?|steam\s*keys?)\b", r"(?:需要|申请|索取|来点|要)\s*(\d{1,3})\s*(?:个)?\s*(?:key|激活码|序列号|钥匙)", r"\bup to\s*(\d{1,3})\s*keys?\b", r"\brequest(?:ing)?\s*(\d{1,3})\s*keys?\b", r"\b(\d{1,3})\s*x\s*keys?\b", r"\b(\d{1,3})-(\d{1,3})\s*keys?\b", ] CURATOR_KEYWORDS = [ "curator", "steam curator", "reviewer", "鉴赏家", "评测", "媒体", "KOL", "influencer", "press", "key", "keys", "激活码", "序列号", "steam key", ] # Subject prefixes indicating replies/forwards (EN/ZH) REPLY_FWD_PREFIX = re.compile( r"^\s*(re(\[\d+\])?|回复|答复|答覆|转发|fw|fwd)\s*[::]\s*", re.IGNORECASE ) def extract_name_from_body(body: str) -> str: """ 从正文中根据常见短语提取名字: 例如 "My name is Alex", "I am John", "This is Lily"。 """ if not body: return "" text = body.strip().replace("\r", "").replace("\n", " ") # 常见英语自我介绍 patterns = [ r"\bmy name is ([A-Z][a-z]+(?: [A-Z][a-z]+)?)", r"\bi am ([A-Z][a-z]+(?: [A-Z][a-z]+)?)", r"\bthis is ([A-Z][a-z]+(?: [A-Z][a-z]+)?)", ] for pat in patterns: m = re.search(pat, text, flags=re.IGNORECASE) if m: # 返回匹配到的名字(首字母大写格式化) name = m.group(1).strip() return name.title() # 末尾签名行的简易提取(如 “Thanks, Alex”) m = re.search(r"(?:regards|thanks|cheers)[,:\s]+([A-Z][a-z]+(?: [A-Z][a-z]+)?)", text, flags=re.IGNORECASE) if m: return m.group(1).strip().title() return "" def norm_email(addr: str) -> str: return (addr or "").strip().lower() def chunked(seq, n): for i in range(0, len(seq), n): yield seq[i:i+n] def mark_seen_batch(M, mailbox: str, uids: list[str], batch_size: int = 500) -> int: """ 以批为单位,把同一 mailbox 中的多封邮件标记为已读。 返回成功提交 STORE 的 UID 总数(不逐一校验)。 """ if not uids: return 0 # 切换到可写 typ, _ = M.select(mailbox, readonly=False) if typ != "OK": print(f"[WARN] cannot select {mailbox} in write mode; skip mark-read") return 0 total = 0 for batch in chunked(uids, batch_size): # UID 序列(逗号拼接) seqset = ",".join(str(u.decode() if isinstance(u, (bytes, bytearray)) else u) for u in batch) # 先静默,减少服务器返回 typ, _ = M.uid("STORE", seqset, "+FLAGS.SILENT", r"(\Seen)") if typ != "OK": # 兼容某些服务器 typ, _ = M.uid("STORE", seqset, "+FLAGS", r"(\Seen)") if typ == "OK": total += len(batch) else: print(f"[WARN] batch STORE failed for {len(batch)} UIDs (len={len(seqset)})") return total def env_get(name: str, default: Optional[str] = None, required: bool = False) -> str: v = os.environ.get(name, default) if required and not v: print(f"ERROR: Missing environment variable {name}", file=sys.stderr) sys.exit(1) return v def imap_login() -> imaplib.IMAP4_SSL: host = env_get("IMAP_HOST", required=True) port = int(env_get("IMAP_PORT", "993")) user = env_get("EMAIL_USER", required=True) pwd = env_get("EMAIL_PASS", required=True) M = imaplib.IMAP4_SSL(host, port) M.login(user, pwd) return M def select_mailbox(M: imaplib.IMAP4_SSL) -> str: mailbox = env_get("MAILBOX", "INBOX") typ, data = M.select(mailbox, readonly=True) if typ != "OK": raise RuntimeError(f"Cannot select mailbox {mailbox}: {typ} {data}") return mailbox def imap_date_string() -> str: start_date = env_get("START_DATE", "18-Oct-2025") try: datetime.strptime(start_date, "%d-%b-%Y") except Exception: print("WARNING: START_DATE is not in 'DD-Mon-YYYY' (e.g., 18-Oct-2025). Using 18-Oct-2025 by default.", file=sys.stderr) start_date = "18-Oct-2025" return start_date def load_state() -> Dict[str, Any]: path = env_get("STATE_FILE", "curator_state.json") if not os.path.exists(path): return {"_path": path, "mailboxes": {}} try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) data["_path"] = path if "mailboxes" not in data or not isinstance(data["mailboxes"], dict): data["mailboxes"] = {} return data except Exception: return {"_path": path, "mailboxes": {}} def save_state(state: Dict[str, Any]) -> None: path = state.get("_path", env_get("STATE_FILE", "curator_state.json")) tmp = path + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump({k: v for k, v in state.items() if k != "_path"}, f, ensure_ascii=False, indent=2) os.replace(tmp, path) def get_mailbox_key(host: str, user: str, mailbox: str) -> str: return f"{host}|{user}|{mailbox}" def uid_search(M: imaplib.IMAP4_SSL, criterion: str) -> List[bytes]: typ, data = M.uid("search", None, criterion) if typ != "OK": raise RuntimeError(f"UID SEARCH failed: {typ} {data}") return data[0].split() def search_initial(M: imaplib.IMAP4_SSL, since_date: str) -> List[bytes]: typ, data = M.search(None, f'(SINCE "{since_date}")') if typ != "OK": raise RuntimeError(f"SEARCH failed: {typ} {data}") ids = data[0].split() uids: List[bytes] = [] if not ids: return uids for i in range(0, len(ids), 500): batch = ids[i:i+500] typ2, data2 = M.fetch(b",".join(batch), "(UID)") if typ2 != "OK": continue for part in data2: if not isinstance(part, tuple): continue header = part[0].decode("utf-8", "ignore") m = re.search(r"UID\s+(\d+)", header) if m: uids.append(m.group(1).encode()) return uids def decode_str(s: Optional[str]) -> str: if not s: return "" try: return str(make_header(decode_header(s))) except Exception: return s def get_address(msg: email.message.Message) -> (str, str): from_raw = msg.get("From", "") name = email.utils.parseaddr(from_raw)[0] addr = email.utils.parseaddr(from_raw)[1] return decode_str(name).strip(), addr.strip() def get_subject(msg: email.message.Message) -> str: return decode_str(msg.get("Subject", "")).strip() def get_payload_text(msg: email.message.Message) -> (str, str): plain_parts = [] html_parts = [] if msg.is_multipart(): for part in msg.walk(): ctype = part.get_content_type() disp = str(part.get_content_disposition() or "").lower() if disp == "attachment": continue try: payload = part.get_payload(decode=True) except Exception: payload = None if payload is None: continue charset = part.get_content_charset() or "utf-8" try: text = payload.decode(charset, errors="replace") except Exception: text = payload.decode("utf-8", errors="replace") if ctype == "text/plain": plain_parts.append(text) elif ctype == "text/html": html_parts.append(text) else: payload = msg.get_payload(decode=True) or b"" charset = msg.get_content_charset() or "utf-8" try: text = payload.decode(charset, errors="replace") except Exception: text = payload.decode("utf-8", errors="replace") if msg.get_content_type() == "text/html": html_parts.append(text) else: plain_parts.append(text) return ("\n".join(plain_parts).strip(), "\n".join(html_parts).strip()) def html_to_text(html: str) -> str: if not html: return "" soup = BeautifulSoup(html, "html.parser") for tag in soup(["script", "style"]): tag.decompose() return soup.get_text(separator="\n").strip() def extract_links(text: str) -> List[str]: if not text: return [] urls = re.findall(r"https?://[^\s<>()\"\']+", text, flags=re.IGNORECASE) seen = set() out = [] for u in urls: if u not in seen: seen.add(u) out.append(u) return out def filter_curator_links(urls: List[str]) -> List[str]: if not urls: return [] combined = "|".join(CURATOR_LINK_PATTERNS) pat = re.compile(combined, re.IGNORECASE) return [u for u in urls if pat.search(u)] def detect_key_count(text: str) -> Optional[int]: if not text: return None best = None for pat in KEY_REQUEST_PATTERNS: m = re.search(pat, text, flags=re.IGNORECASE) if m: if m.lastindex and m.lastindex >= 2 and m.group(1) and m.group(2): try: a = int(m.group(1)) b = int(m.group(2)) best = max(a, b) break except Exception: continue else: nums = [int(g) for g in m.groups() if g and g.isdigit()] if nums: best = max(nums) break return best def looks_like_curator_request(subject: str, body_text: str) -> bool: blob = f"{subject}\n{body_text}".lower() return any(k.lower() in blob for k in CURATOR_KEYWORDS) def fetch_by_uid(M: imaplib.IMAP4_SSL, uid: bytes) -> email.message.Message: typ, data = M.uid("fetch", uid, "(RFC822)") if typ != "OK" or not data or not isinstance(data[0], tuple): raise RuntimeError(f"UID FETCH failed for {uid!r}: {typ} {data}") raw = data[0][1] msg = email.message_from_bytes(raw, policy=default_policy) return msg def parse_msg_date_bj(msg: email.message.Message) -> str: raw = msg.get("Date") or "" try: dt = parsedate_to_datetime(raw) if dt is None: raise ValueError("parsedate_to_datetime returned None") if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) dt_cst = dt.astimezone(ZoneInfo("Asia/Shanghai")) return dt_cst.strftime("%Y-%m-%d %H:%M") except Exception: return "" def is_reply_or_forward(msg: email.message.Message, subject: str) -> bool: # Header-based if msg.get("In-Reply-To") or msg.get("References"): return True # Subject-based if REPLY_FWD_PREFIX.search(subject): return True return False def fetch_and_parse(M: imaplib.IMAP4_SSL, uid: bytes) -> Dict[str, Any]: msg = fetch_by_uid(M, uid) name, addr = get_address(msg) subject = get_subject(msg) date_local = parse_msg_date_bj(msg) reply_flag = is_reply_or_forward(msg, subject) plain, html = get_payload_text(msg) merged_text = plain.strip() if html and (not merged_text or len(merged_text) < 20): merged_text = html_to_text(html) links_all = extract_links(plain + "\n" + html) curator_links = filter_curator_links(links_all) key_count = detect_key_count(merged_text) if key_count is None: key_count = 2 # default when not specified return { "uid": uid.decode() if isinstance(uid, (bytes, bytearray)) else str(uid), "from_name": name or "", "from_email": addr or "", "subject": subject, "date_local": date_local, "body_preview": (merged_text[:3000] + ("..." if len(merged_text) > 3000 else "")), "curator_links": curator_links, "key_count": int(key_count), "is_reply": reply_flag, } def connectivity_test() -> int: try: M = imap_login() try: mailbox = select_mailbox(M) uids = uid_search(M, "ALL") count = len(uids) if count == 0: print(f"[TEST] Connected to {mailbox}, but it has no messages.") return 0 latest_uid = uids[-1] msg = fetch_by_uid(M, latest_uid) subject = get_subject(msg) from_name, from_addr = get_address(msg) date_local = parse_msg_date_bj(msg) print("[TEST] IMAP OK.") print(f"[TEST] Mailbox: {mailbox}") print(f"[TEST] Total messages: {count}") print(f"[TEST] Latest UID: {latest_uid.decode()}") print(f"[TEST] Latest From: {from_name} <{from_addr}>") print(f"[TEST] Latest Subject: {subject}") print(f"[TEST] Latest Date (BJ): {date_local}") return 0 finally: try: M.logout() except Exception: pass except Exception as e: print(f"[TEST] IMAP failed: {e}", file=sys.stderr) return 2 def run_export(reset_state: bool = False, mark_read: bool = False) -> int: since_date = imap_date_string() host = env_get("IMAP_HOST", required=True) user = env_get("EMAIL_USER", required=True) min_uid = int(env_get("MIN_UID", "125")) M = imap_login() state = load_state() try: mailbox = select_mailbox(M) mailbox_key = get_mailbox_key(host, user, mailbox) if reset_state and mailbox_key in state["mailboxes"]: del state["mailboxes"][mailbox_key] save_state(state) print(f"[STATE] Reset bookmark for {mailbox_key}") last_uid = None if mailbox_key in state["mailboxes"]: last_uid = state["mailboxes"][mailbox_key].get("last_uid") if last_uid: criterion = f"(UID {int(last_uid)+1}:*)" uids = uid_search(M, criterion) print(f"[SCAN] Using bookmark last_uid={last_uid}; new UIDs found: {len(uids)}") else: uids = search_initial(M, since_date) print(f"[SCAN] Initial run since {since_date}; candidate UIDs: {len(uids)}") rows: List[Dict[str, Any]] = [] all_uids_for_mark: list[str] = [] max_seen_uid = int(last_uid) if last_uid else 0 row_no = 1 best_by_email: dict[str, dict] = {} dup_skipped = 0 for i, uid in enumerate(uids, 1): try: uid_int = int(uid) if uid_int < min_uid: continue rec = fetch_and_parse(M, uid) if uid_int > max_seen_uid: max_seen_uid = uid_int # Exclude replies/forwards if rec["is_reply"]: continue if looks_like_curator_request(rec["subject"], rec["body_preview"]): has_links = bool(rec["curator_links"]) curator_email = norm_email(rec["from_email"]) dedup_key = curator_email if curator_email else f"uid:{rec['uid']}" # 首选:邮件头中的名字 if rec["from_name"]: curator_name = rec["from_name"].strip() else: # 尝试从正文里提取名字 extracted = extract_name_from_body(rec["body_preview"]) if extracted: curator_name = extracted #else: # # 兜底用邮箱前缀 # curator_name = "Curator" candidate = { "uid_int": int(uid), "record": { "Mailbox Key": rec["uid"], # UID "Requested Key Count": rec["key_count"], "Date": rec["date_local"], "Curator/Name": curator_name, "Email": rec["from_email"], "Subject": rec["subject"], "Curator/Social Links": ", ".join(rec["curator_links"]) if has_links else "", "Body (preview)": rec["body_preview"], "_has_links": has_links, } } all_uids_for_mark.append(int(uid)) prev = best_by_email.get(dedup_key) if prev is None or candidate["uid_int"] > prev["uid_int"]: best_by_email[dedup_key] = candidate else: dup_skipped += 1 if i % 10 == 0: pct = (i / len(uids)) * 100 print(f" Processed {pct:.1f}% ({i}/{len(uids)})") except Exception as e: print(f"[WARN] Failed to parse UID {uid!r}: {e}", file=sys.stderr) continue if mark_read: done = mark_seen_batch(M, mailbox, all_uids_for_mark, batch_size=500) print(f"[INFO] Marked {done} message(s) as read in batches.") rows = [] row_no = 1 selected_uids_for_mark = [] for _, v in best_by_email.items(): rec = v["record"] rows.append({ "No.": row_no, "Mailbox Key": rec["Mailbox Key"], "Requested Key Count": rec["Requested Key Count"], "Date": rec["Date"], "Curator/Name": rec["Curator/Name"], "Email": rec["Email"], "Subject": rec["Subject"], "Curator/Social Links": rec["Curator/Social Links"], "Body (preview)": rec["Body (preview)"], "_has_links": rec["_has_links"], }) selected_uids_for_mark.append(rec["Mailbox Key"]) row_no += 1 # Save bookmark even if no rows matched, so daily runs skip already-seen messages state["mailboxes"][mailbox_key] = {"last_uid": str(max_seen_uid)} save_state(state) print(f"[STATE] Updated last_uid={max_seen_uid} for {mailbox_key}") columns = [ "No.", "Mailbox Key", "Requested Key Count", "Date", "Curator/Name", "Email", "Subject", "Curator/Social Links", "Body (preview)" ] if not rows: print("No curator key requests matched the filters.") df = pd.DataFrame(columns=columns) total_keys = with_links = without_links = 0 else: df = pd.DataFrame(rows) df["Requested Key Count"] = pd.to_numeric(df["Requested Key Count"], errors="coerce").fillna(0).astype(int) total_keys = int(df["Requested Key Count"].sum()) with_links = int(df.loc[df["_has_links"], "Requested Key Count"].sum()) if "_has_links" in df.columns else 0 without_links = total_keys - with_links # drop helper if "_has_links" in df.columns: df = df.drop(columns=["_has_links"]) df = df[columns] out_path = os.path.abspath("curator_requests.xlsx") with pd.ExcelWriter(out_path, engine="openpyxl") as writer: df.to_excel(writer, sheet_name="Requests", index=False) # Summary printout print("\n=== SUMMARY ===") print(f"Total requested keys: {total_keys}") print(f"With social links: {with_links}") print(f"Without social links: {without_links}") print(f"\nExported {len(df)} row(s) to {out_path}") return 0 finally: try: M.logout() except Exception: pass def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace: p = argparse.ArgumentParser(description="Export Steam curator key requests to Excel via IMAP.") p.add_argument("--test", action="store_true", help="Run a quick IMAP connectivity test and exit (does not read/write state).") p.add_argument("--reset-state", action="store_true", help="Reset stored UID bookmark before running.") p.add_argument("--mark-read", action="store_true", help="After exporting, mark those emails as read on the IMAP server.") return p.parse_args(argv) def main(argv: Optional[List[str]] = None) -> int: args = parse_args(argv) if args.test: return connectivity_test() # no state read/write return run_export(reset_state=args.reset_state, mark_read=args.mark_read) if __name__ == "__main__": raise SystemExit(main())