| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- imap_curator_export.py
- ----------------------
- Fetch emails via IMAP, find Steam curator key requests since a chosen date,
- and export results to an Excel file.
- Features:
- - Connectivity test (--test) that does NOT read/write state.
- - Stateful resume via UID bookmarking (JSON). Next run scans only newer UIDs.
- - Extracts curator/social links and requested key counts (defaults to 2 if not found).
- - Filters OUT reply/forward messages (replies to you), detected via headers and subject prefixes.
- - Outputs Excel with requested column order:
- 1) No. (auto-increment)
- 2) Mailbox Key (UID of the message)
- 3) Requested Key Count
- 4) Date (Beijing time, "YYYY-MM-DD HH:MM")
- 5) Curator/Name
- 6) Email
- 7) Subject
- 8) Curator/Social Links
- 9) Body (preview)
- - Prints a summary after export:
- * Total keys requested
- * Keys with social links present
- * Keys without social links
- USAGE
- -----
- 1) Install deps (ideally in a virtualenv):
- pip install -r requirements.txt
- 2) Set environment variables (examples for macOS/Linux bash/zsh):
- export IMAP_HOST="imap.gmail.com" # e.g. imap.gmail.com, outlook.office365.com
- export IMAP_PORT="993" # usually 993
- export EMAIL_USER="your_email@example.com"
- export EMAIL_PASS="your_app_password" # For Gmail/Outlook, use an App Password or OAuth token string
- export MAILBOX="INBOX" # optional, default INBOX
- # Start date used ONLY for the first run (no state yet). IMAP format DD-Mon-YYYY.
- export START_DATE="18-Oct-2025"
- # Optional: where to store/read bookmark state per mailbox
- export STATE_FILE="curator_state.json"
- 3) Run a connectivity test (no state read/write):
- python imap_curator_export.py --test
- 4) Run extraction:
- python imap_curator_export.py
- 5) Reset bookmark (force full scan from START_DATE next time):
- python imap_curator_export.py --reset-state
- Output:
- ./curator_requests.xlsx
- """
- import os
- import re
- import sys
- import json
- import imaplib
- import email
- import argparse
- from email.header import decode_header, make_header
- from email.policy import default as default_policy
- from email.utils import parsedate_to_datetime
- from datetime import datetime, timezone
- from zoneinfo import ZoneInfo
- from typing import List, Dict, Any, Optional
- # Third-party
- from bs4 import BeautifulSoup # for HTML -> text
- import pandas as pd
- import re
- CURATOR_LINK_PATTERNS = [
- r"store\.steampowered\.com/curator/",
- r"steamcommunity\.com/groups/",
- r"steamcommunity\.com/id/",
- r"x\.com/|twitter\.com/",
- r"youtube\.com/|youtu\.be/",
- r"twitch\.tv/",
- r"discord\.gg/|discord\.com/invite/",
- r"facebook\.com/",
- r"instagram\.com/",
- r"tiktok\.com/",
- r"bilibili\.com/",
- r"weibo\.com/",
- ]
- # Heuristics for detecting a request for Steam keys
- KEY_REQUEST_PATTERNS = [
- r"\b(\d{1,3})\s*(?:keys?|key codes?|steam\s*keys?)\b",
- r"(?:需要|申请|索取|来点|要)\s*(\d{1,3})\s*(?:个)?\s*(?:key|激活码|序列号|钥匙)",
- r"\bup to\s*(\d{1,3})\s*keys?\b",
- r"\brequest(?:ing)?\s*(\d{1,3})\s*keys?\b",
- r"\b(\d{1,3})\s*x\s*keys?\b",
- r"\b(\d{1,3})-(\d{1,3})\s*keys?\b",
- ]
- CURATOR_KEYWORDS = [
- "curator", "steam curator", "reviewer",
- "鉴赏家", "评测", "媒体", "KOL", "influencer", "press",
- "key", "keys", "激活码", "序列号", "steam key",
- ]
- # Subject prefixes indicating replies/forwards (EN/ZH)
- REPLY_FWD_PREFIX = re.compile(
- r"^\s*(re(\[\d+\])?|回复|答复|答覆|转发|fw|fwd)\s*[::]\s*",
- re.IGNORECASE
- )
- def extract_name_from_body(body: str) -> str:
- """
- 从正文中根据常见短语提取名字:
- 例如 "My name is Alex", "I am John", "This is Lily"。
- """
- if not body:
- return ""
- text = body.strip().replace("\r", "").replace("\n", " ")
- # 常见英语自我介绍
- patterns = [
- r"\bmy name is ([A-Z][a-z]+(?: [A-Z][a-z]+)?)",
- r"\bi am ([A-Z][a-z]+(?: [A-Z][a-z]+)?)",
- r"\bthis is ([A-Z][a-z]+(?: [A-Z][a-z]+)?)",
- ]
- for pat in patterns:
- m = re.search(pat, text, flags=re.IGNORECASE)
- if m:
- # 返回匹配到的名字(首字母大写格式化)
- name = m.group(1).strip()
- return name.title()
- # 末尾签名行的简易提取(如 “Thanks, Alex”)
- m = re.search(r"(?:regards|thanks|cheers)[,:\s]+([A-Z][a-z]+(?: [A-Z][a-z]+)?)", text, flags=re.IGNORECASE)
- if m:
- return m.group(1).strip().title()
- return ""
- def norm_email(addr: str) -> str:
- return (addr or "").strip().lower()
- def chunked(seq, n):
- for i in range(0, len(seq), n):
- yield seq[i:i+n]
- def mark_seen_batch(M, mailbox: str, uids: list[str], batch_size: int = 500) -> int:
- """
- 以批为单位,把同一 mailbox 中的多封邮件标记为已读。
- 返回成功提交 STORE 的 UID 总数(不逐一校验)。
- """
- if not uids:
- return 0
- # 切换到可写
- typ, _ = M.select(mailbox, readonly=False)
- if typ != "OK":
- print(f"[WARN] cannot select {mailbox} in write mode; skip mark-read")
- return 0
- total = 0
- for batch in chunked(uids, batch_size):
- # UID 序列(逗号拼接)
- seqset = ",".join(str(u.decode() if isinstance(u, (bytes, bytearray)) else u) for u in batch)
- # 先静默,减少服务器返回
- typ, _ = M.uid("STORE", seqset, "+FLAGS.SILENT", r"(\Seen)")
- if typ != "OK":
- # 兼容某些服务器
- typ, _ = M.uid("STORE", seqset, "+FLAGS", r"(\Seen)")
- if typ == "OK":
- total += len(batch)
- else:
- print(f"[WARN] batch STORE failed for {len(batch)} UIDs (len={len(seqset)})")
- return total
- def env_get(name: str, default: Optional[str] = None, required: bool = False) -> str:
- v = os.environ.get(name, default)
- if required and not v:
- print(f"ERROR: Missing environment variable {name}", file=sys.stderr)
- sys.exit(1)
- return v
- def imap_login() -> imaplib.IMAP4_SSL:
- host = env_get("IMAP_HOST", required=True)
- port = int(env_get("IMAP_PORT", "993"))
- user = env_get("EMAIL_USER", required=True)
- pwd = env_get("EMAIL_PASS", required=True)
- M = imaplib.IMAP4_SSL(host, port)
- M.login(user, pwd)
- return M
- def select_mailbox(M: imaplib.IMAP4_SSL) -> str:
- mailbox = env_get("MAILBOX", "INBOX")
- typ, data = M.select(mailbox, readonly=True)
- if typ != "OK":
- raise RuntimeError(f"Cannot select mailbox {mailbox}: {typ} {data}")
- return mailbox
- def imap_date_string() -> str:
- start_date = env_get("START_DATE", "18-Oct-2025")
- try:
- datetime.strptime(start_date, "%d-%b-%Y")
- except Exception:
- print("WARNING: START_DATE is not in 'DD-Mon-YYYY' (e.g., 18-Oct-2025). Using 18-Oct-2025 by default.", file=sys.stderr)
- start_date = "18-Oct-2025"
- return start_date
- def load_state() -> Dict[str, Any]:
- path = env_get("STATE_FILE", "curator_state.json")
- if not os.path.exists(path):
- return {"_path": path, "mailboxes": {}}
- try:
- with open(path, "r", encoding="utf-8") as f:
- data = json.load(f)
- data["_path"] = path
- if "mailboxes" not in data or not isinstance(data["mailboxes"], dict):
- data["mailboxes"] = {}
- return data
- except Exception:
- return {"_path": path, "mailboxes": {}}
- def save_state(state: Dict[str, Any]) -> None:
- path = state.get("_path", env_get("STATE_FILE", "curator_state.json"))
- tmp = path + ".tmp"
- with open(tmp, "w", encoding="utf-8") as f:
- json.dump({k: v for k, v in state.items() if k != "_path"}, f, ensure_ascii=False, indent=2)
- os.replace(tmp, path)
- def get_mailbox_key(host: str, user: str, mailbox: str) -> str:
- return f"{host}|{user}|{mailbox}"
- def uid_search(M: imaplib.IMAP4_SSL, criterion: str) -> List[bytes]:
- typ, data = M.uid("search", None, criterion)
- if typ != "OK":
- raise RuntimeError(f"UID SEARCH failed: {typ} {data}")
- return data[0].split()
- def search_initial(M: imaplib.IMAP4_SSL, since_date: str) -> List[bytes]:
- typ, data = M.search(None, f'(SINCE "{since_date}")')
- if typ != "OK":
- raise RuntimeError(f"SEARCH failed: {typ} {data}")
- ids = data[0].split()
- uids: List[bytes] = []
- if not ids:
- return uids
- for i in range(0, len(ids), 500):
- batch = ids[i:i+500]
- typ2, data2 = M.fetch(b",".join(batch), "(UID)")
- if typ2 != "OK":
- continue
- for part in data2:
- if not isinstance(part, tuple):
- continue
- header = part[0].decode("utf-8", "ignore")
- m = re.search(r"UID\s+(\d+)", header)
- if m:
- uids.append(m.group(1).encode())
- return uids
- def decode_str(s: Optional[str]) -> str:
- if not s:
- return ""
- try:
- return str(make_header(decode_header(s)))
- except Exception:
- return s
- def get_address(msg: email.message.Message) -> (str, str):
- from_raw = msg.get("From", "")
- name = email.utils.parseaddr(from_raw)[0]
- addr = email.utils.parseaddr(from_raw)[1]
- return decode_str(name).strip(), addr.strip()
- def get_subject(msg: email.message.Message) -> str:
- return decode_str(msg.get("Subject", "")).strip()
- def get_payload_text(msg: email.message.Message) -> (str, str):
- plain_parts = []
- html_parts = []
- if msg.is_multipart():
- for part in msg.walk():
- ctype = part.get_content_type()
- disp = str(part.get_content_disposition() or "").lower()
- if disp == "attachment":
- continue
- try:
- payload = part.get_payload(decode=True)
- except Exception:
- payload = None
- if payload is None:
- continue
- charset = part.get_content_charset() or "utf-8"
- try:
- text = payload.decode(charset, errors="replace")
- except Exception:
- text = payload.decode("utf-8", errors="replace")
- if ctype == "text/plain":
- plain_parts.append(text)
- elif ctype == "text/html":
- html_parts.append(text)
- else:
- payload = msg.get_payload(decode=True) or b""
- charset = msg.get_content_charset() or "utf-8"
- try:
- text = payload.decode(charset, errors="replace")
- except Exception:
- text = payload.decode("utf-8", errors="replace")
- if msg.get_content_type() == "text/html":
- html_parts.append(text)
- else:
- plain_parts.append(text)
- return ("\n".join(plain_parts).strip(), "\n".join(html_parts).strip())
- def html_to_text(html: str) -> str:
- if not html:
- return ""
- soup = BeautifulSoup(html, "html.parser")
- for tag in soup(["script", "style"]):
- tag.decompose()
- return soup.get_text(separator="\n").strip()
- def extract_links(text: str) -> List[str]:
- if not text:
- return []
- urls = re.findall(r"https?://[^\s<>()\"\']+", text, flags=re.IGNORECASE)
- seen = set()
- out = []
- for u in urls:
- if u not in seen:
- seen.add(u)
- out.append(u)
- return out
- def filter_curator_links(urls: List[str]) -> List[str]:
- if not urls:
- return []
- combined = "|".join(CURATOR_LINK_PATTERNS)
- pat = re.compile(combined, re.IGNORECASE)
- return [u for u in urls if pat.search(u)]
- def detect_key_count(text: str) -> Optional[int]:
- if not text:
- return None
- best = None
- for pat in KEY_REQUEST_PATTERNS:
- m = re.search(pat, text, flags=re.IGNORECASE)
- if m:
- if m.lastindex and m.lastindex >= 2 and m.group(1) and m.group(2):
- try:
- a = int(m.group(1))
- b = int(m.group(2))
- best = max(a, b)
- break
- except Exception:
- continue
- else:
- nums = [int(g) for g in m.groups() if g and g.isdigit()]
- if nums:
- best = max(nums)
- break
- return best
- def looks_like_curator_request(subject: str, body_text: str) -> bool:
- blob = f"{subject}\n{body_text}".lower()
- return any(k.lower() in blob for k in CURATOR_KEYWORDS)
- def fetch_by_uid(M: imaplib.IMAP4_SSL, uid: bytes) -> email.message.Message:
- typ, data = M.uid("fetch", uid, "(RFC822)")
- if typ != "OK" or not data or not isinstance(data[0], tuple):
- raise RuntimeError(f"UID FETCH failed for {uid!r}: {typ} {data}")
- raw = data[0][1]
- msg = email.message_from_bytes(raw, policy=default_policy)
- return msg
- def parse_msg_date_bj(msg: email.message.Message) -> str:
- raw = msg.get("Date") or ""
- try:
- dt = parsedate_to_datetime(raw)
- if dt is None:
- raise ValueError("parsedate_to_datetime returned None")
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- dt_cst = dt.astimezone(ZoneInfo("Asia/Shanghai"))
- return dt_cst.strftime("%Y-%m-%d %H:%M")
- except Exception:
- return ""
- def is_reply_or_forward(msg: email.message.Message, subject: str) -> bool:
- # Header-based
- if msg.get("In-Reply-To") or msg.get("References"):
- return True
- # Subject-based
- if REPLY_FWD_PREFIX.search(subject):
- return True
- return False
- def fetch_and_parse(M: imaplib.IMAP4_SSL, uid: bytes) -> Dict[str, Any]:
- msg = fetch_by_uid(M, uid)
- name, addr = get_address(msg)
- subject = get_subject(msg)
- date_local = parse_msg_date_bj(msg)
- reply_flag = is_reply_or_forward(msg, subject)
- plain, html = get_payload_text(msg)
- merged_text = plain.strip()
- if html and (not merged_text or len(merged_text) < 20):
- merged_text = html_to_text(html)
- links_all = extract_links(plain + "\n" + html)
- curator_links = filter_curator_links(links_all)
- key_count = detect_key_count(merged_text)
- if key_count is None:
- key_count = 2 # default when not specified
- return {
- "uid": uid.decode() if isinstance(uid, (bytes, bytearray)) else str(uid),
- "from_name": name or "",
- "from_email": addr or "",
- "subject": subject,
- "date_local": date_local,
- "body_preview": (merged_text[:3000] + ("..." if len(merged_text) > 3000 else "")),
- "curator_links": curator_links,
- "key_count": int(key_count),
- "is_reply": reply_flag,
- }
- def connectivity_test() -> int:
- try:
- M = imap_login()
- try:
- mailbox = select_mailbox(M)
- uids = uid_search(M, "ALL")
- count = len(uids)
- if count == 0:
- print(f"[TEST] Connected to {mailbox}, but it has no messages.")
- return 0
- latest_uid = uids[-1]
- msg = fetch_by_uid(M, latest_uid)
- subject = get_subject(msg)
- from_name, from_addr = get_address(msg)
- date_local = parse_msg_date_bj(msg)
- print("[TEST] IMAP OK.")
- print(f"[TEST] Mailbox: {mailbox}")
- print(f"[TEST] Total messages: {count}")
- print(f"[TEST] Latest UID: {latest_uid.decode()}")
- print(f"[TEST] Latest From: {from_name} <{from_addr}>")
- print(f"[TEST] Latest Subject: {subject}")
- print(f"[TEST] Latest Date (BJ): {date_local}")
- return 0
- finally:
- try:
- M.logout()
- except Exception:
- pass
- except Exception as e:
- print(f"[TEST] IMAP failed: {e}", file=sys.stderr)
- return 2
- def run_export(reset_state: bool = False, mark_read: bool = False) -> int:
- since_date = imap_date_string()
- host = env_get("IMAP_HOST", required=True)
- user = env_get("EMAIL_USER", required=True)
- min_uid = int(env_get("MIN_UID", "125"))
- M = imap_login()
- state = load_state()
- try:
- mailbox = select_mailbox(M)
- mailbox_key = get_mailbox_key(host, user, mailbox)
- if reset_state and mailbox_key in state["mailboxes"]:
- del state["mailboxes"][mailbox_key]
- save_state(state)
- print(f"[STATE] Reset bookmark for {mailbox_key}")
- last_uid = None
- if mailbox_key in state["mailboxes"]:
- last_uid = state["mailboxes"][mailbox_key].get("last_uid")
- if last_uid:
- criterion = f"(UID {int(last_uid)+1}:*)"
- uids = uid_search(M, criterion)
- print(f"[SCAN] Using bookmark last_uid={last_uid}; new UIDs found: {len(uids)}")
- else:
- uids = search_initial(M, since_date)
- print(f"[SCAN] Initial run since {since_date}; candidate UIDs: {len(uids)}")
- rows: List[Dict[str, Any]] = []
- all_uids_for_mark: list[str] = []
- max_seen_uid = int(last_uid) if last_uid else 0
- row_no = 1
- best_by_email: dict[str, dict] = {}
- dup_skipped = 0
- for i, uid in enumerate(uids, 1):
- try:
- uid_int = int(uid)
- if uid_int < min_uid:
- continue
- rec = fetch_and_parse(M, uid)
- if uid_int > max_seen_uid:
- max_seen_uid = uid_int
- # Exclude replies/forwards
- if rec["is_reply"]:
- continue
- if looks_like_curator_request(rec["subject"], rec["body_preview"]):
- has_links = bool(rec["curator_links"])
- curator_email = norm_email(rec["from_email"])
- dedup_key = curator_email if curator_email else f"uid:{rec['uid']}"
- # 首选:邮件头中的名字
- if rec["from_name"]:
- curator_name = rec["from_name"].strip()
- else:
- # 尝试从正文里提取名字
- extracted = extract_name_from_body(rec["body_preview"])
- if extracted:
- curator_name = extracted
- #else:
- # # 兜底用邮箱前缀
- # curator_name = "Curator"
- candidate = {
- "uid_int": int(uid),
- "record": {
- "Mailbox Key": rec["uid"], # UID
- "Requested Key Count": rec["key_count"],
- "Date": rec["date_local"],
- "Curator/Name": curator_name,
- "Email": rec["from_email"],
- "Subject": rec["subject"],
- "Curator/Social Links": ", ".join(rec["curator_links"]) if has_links else "",
- "Body (preview)": rec["body_preview"],
- "_has_links": has_links,
- }
- }
-
- all_uids_for_mark.append(int(uid))
- prev = best_by_email.get(dedup_key)
- if prev is None or candidate["uid_int"] > prev["uid_int"]:
- best_by_email[dedup_key] = candidate
- else:
- dup_skipped += 1
-
- if i % 10 == 0:
- pct = (i / len(uids)) * 100
- print(f" Processed {pct:.1f}% ({i}/{len(uids)})")
- except Exception as e:
- print(f"[WARN] Failed to parse UID {uid!r}: {e}", file=sys.stderr)
- continue
- if mark_read:
- done = mark_seen_batch(M, mailbox, all_uids_for_mark, batch_size=500)
- print(f"[INFO] Marked {done} message(s) as read in batches.")
- rows = []
- row_no = 1
- selected_uids_for_mark = []
- for _, v in best_by_email.items():
- rec = v["record"]
- rows.append({
- "No.": row_no,
- "Mailbox Key": rec["Mailbox Key"],
- "Requested Key Count": rec["Requested Key Count"],
- "Date": rec["Date"],
- "Curator/Name": rec["Curator/Name"],
- "Email": rec["Email"],
- "Subject": rec["Subject"],
- "Curator/Social Links": rec["Curator/Social Links"],
- "Body (preview)": rec["Body (preview)"],
- "_has_links": rec["_has_links"],
- })
- selected_uids_for_mark.append(rec["Mailbox Key"])
- row_no += 1
- # Save bookmark even if no rows matched, so daily runs skip already-seen messages
- state["mailboxes"][mailbox_key] = {"last_uid": str(max_seen_uid)}
- save_state(state)
- print(f"[STATE] Updated last_uid={max_seen_uid} for {mailbox_key}")
- columns = [
- "No.", "Mailbox Key", "Requested Key Count", "Date", "Curator/Name",
- "Email", "Subject", "Curator/Social Links", "Body (preview)"
- ]
- if not rows:
- print("No curator key requests matched the filters.")
- df = pd.DataFrame(columns=columns)
- total_keys = with_links = without_links = 0
- else:
- df = pd.DataFrame(rows)
- df["Requested Key Count"] = pd.to_numeric(df["Requested Key Count"], errors="coerce").fillna(0).astype(int)
- total_keys = int(df["Requested Key Count"].sum())
- with_links = int(df.loc[df["_has_links"], "Requested Key Count"].sum()) if "_has_links" in df.columns else 0
- without_links = total_keys - with_links
- # drop helper
- if "_has_links" in df.columns:
- df = df.drop(columns=["_has_links"])
- df = df[columns]
- out_path = os.path.abspath("curator_requests.xlsx")
- with pd.ExcelWriter(out_path, engine="openpyxl") as writer:
- df.to_excel(writer, sheet_name="Requests", index=False)
- # Summary printout
- print("\n=== SUMMARY ===")
- print(f"Total requested keys: {total_keys}")
- print(f"With social links: {with_links}")
- print(f"Without social links: {without_links}")
- print(f"\nExported {len(df)} row(s) to {out_path}")
- return 0
- finally:
- try:
- M.logout()
- except Exception:
- pass
- def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace:
- p = argparse.ArgumentParser(description="Export Steam curator key requests to Excel via IMAP.")
- p.add_argument("--test", action="store_true", help="Run a quick IMAP connectivity test and exit (does not read/write state).")
- p.add_argument("--reset-state", action="store_true", help="Reset stored UID bookmark before running.")
- p.add_argument("--mark-read", action="store_true", help="After exporting, mark those emails as read on the IMAP server.")
- return p.parse_args(argv)
- def main(argv: Optional[List[str]] = None) -> int:
- args = parse_args(argv)
- if args.test:
- return connectivity_test() # no state read/write
- return run_export(reset_state=args.reset_state, mark_read=args.mark_read)
- if __name__ == "__main__":
- raise SystemExit(main())
|