#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ send_keys.py ------------ Read an Excel exported by fetch_requests.py, allocate Steam keys from a TXT pool (one per line), and send HTML emails to curators using a fixed template. This version: - Logs ONE ROW PER KEY (not per email). - Log column order (front): 1) No. 2) Send Date (Beijing, YYYY-MM-DD) 3) Channel (auto from social link; default "Steam") 4) Curator/Name 5) Purpose (fixed "评测") 6) Social Link - Other columns kept (and adjusted for per-key row): Key, Mailbox Key, To, Requested Key Count, Subject, Status, Sent At, Test Mode, Actual Recipient - Subject uses: --subject + " RE: " + original email subject (if present) - Test mode does NOT consume keys; dry-run does not send/consume. - Optional external HTML template via --template (fallback to built-in). Usage examples: python send_keys.py --excel curator_requests.xlsx --keys key_pool.txt --out send_log.xlsx \ --subject "Steam Keys for Such A Guy" --dry-run --limit 2 python send_keys.py --excel curator_requests.xlsx --keys key_pool.txt --out send_log.xlsx \ --subject "Steam Keys for Such A Guy" --test --test-email krystic@such-one.com --limit 1 """ import os import sys import json import argparse import smtplib import ssl import imaplib from email.message import EmailMessage from typing import List, Dict, Any, Optional, Tuple import pandas as pd from openpyxl import load_workbook from datetime import datetime from zoneinfo import ZoneInfo DEFAULT_CONFIG_PATH = os.path.join("configs", "settings.json") def load_settings(config_path: Optional[str]) -> Dict[str, Any]: path = config_path or DEFAULT_CONFIG_PATH if not os.path.isabs(path): path = os.path.abspath(path) if not os.path.exists(path): print(f"ERROR: settings file not found: {path}", file=sys.stderr) sys.exit(2) try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) except Exception as e: print(f"ERROR: failed to read settings.json: {e}", file=sys.stderr) sys.exit(2) files = data.get("files", {}) normalized = {} for key, val in files.items(): if not os.path.isabs(val): normalized[key] = os.path.abspath(val) else: normalized[key] = val data["files"] = normalized return data def resolve_path(arg_value: Optional[str], default_path: str) -> str: default_dir = os.path.dirname(default_path) if arg_value is None or arg_value == "": target = default_path else: if os.path.isabs(arg_value) or os.path.dirname(arg_value): target = arg_value else: base = default_dir if default_dir else "." target = os.path.join(base, arg_value) return os.path.abspath(target) def ensure_parent_dir(path: str) -> None: directory = os.path.dirname(path) if directory: os.makedirs(directory, exist_ok=True) # Built-in fallback template (can be overridden by --template file) FALLBACK_TEMPLATE = """\

Hi {curator_name},

Thank you for your interest in Such A Guy! We’re happy to provide you with {key_num} Steam key(s):

{keys_block}

Included materials:
Press kit (capsules, screenshots, short trailer):
Google Drive Press Kit
Review notes (features, estimated length, content warnings):
Google Docs Review Notes

We’d really appreciate your honest impressions after you’ve tried the game — both on your Steam Curator page and as a store review on Steam. If you enjoyed the experience, even a short recommendation would help more players discover it. 💫

Best,
Krystic
SUCH ONE STUDIO

""" def load_template(path: Optional[str]) -> str: if path and os.path.exists(path): with open(path, "r", encoding="utf-8") as f: return f.read() return FALLBACK_TEMPLATE def load_keys(path: str) -> List[str]: with open(path, "r", encoding="utf-8") as f: keys = [line.strip() for line in f if line.strip()] return keys def save_remaining_keys(path: str, keys: List[str]) -> None: ensure_parent_dir(path) tmp = path + ".tmp" with open(tmp, "w", encoding="utf-8") as f: for k in keys: f.write(k + "\n") os.replace(tmp, path) def render_email_html(template_html: str, curator_name: str, key_num: int, keys: List[str]) -> str: keys_lines = "
\n".join([k for k in keys]) return template_html.format(curator_name=curator_name or "there", key_num=key_num, keys_block=keys_lines) def send_email(smtp_host: str, smtp_port: int, smtp_user: str, smtp_pass: str, from_name: str, from_email: str, original_message_id: Optional[str], to_email: str, subject: str, html: str, dry_run: bool = False) -> None: if dry_run: print(f"\n--- DRY RUN (no send) ---\nTO: {to_email}\nSUBJECT: {subject}\nHTML:\n{html}\n-------------------------\n") return msg = EmailMessage() msg["Subject"] = subject msg["From"] = f"{from_name} <{from_email}>" if from_name else from_email msg["To"] = to_email if original_message_id: msg["In-Reply-To"] = original_message_id msg["References"] = original_message_id msg.set_content("HTML email - please view in a mail client that supports HTML.") msg.add_alternative(html, subtype="html") context = ssl.create_default_context() with smtplib.SMTP(smtp_host, smtp_port) as server: server.ehlo() if smtp_port == 587: server.starttls(context=context) server.ehlo() server.login(smtp_user, smtp_pass) server.send_message(msg) def parse_channel_and_link(links_str: str) -> Tuple[str, str]: """Return (channel, link) based on first recognizable URL; default ('Steam','').""" if not isinstance(links_str, str) or not links_str.strip(): return ("Steam", "") # Take first URL if comma-separated first = links_str.split(",")[0].strip() low = first.lower() mapping = [ ("store.steampowered.com/curator", "Steam"), ("steamcommunity.com", "Steam"), ("youtu.be", "YouTube"), ("youtube.com", "YouTube"), ("x.com", "Twitter"), ("twitter.com", "Twitter"), ("twitch.tv", "Twitch"), ("discord.gg", "Discord"), ("discord.com", "Discord"), ("facebook.com", "Facebook"), ("instagram.com", "Instagram"), ("tiktok.com", "TikTok"), ("bilibili.com", "Bilibili"), ("weibo.com", "Weibo"), ] for needle, label in mapping: if needle in low: return (label, first) return ("Steam", first) # default channel name def load_global_state(path: str) -> Dict[str, Any]: if not os.path.exists(path): return {"mailboxes": {}, "sent_emails": []} try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) except Exception: return {"mailboxes": {}, "sent_emails": []} if "mailboxes" not in data or not isinstance(data["mailboxes"], dict): data["mailboxes"] = {} if "sent_emails" not in data or not isinstance(data["sent_emails"], list): data["sent_emails"] = [] return data def save_global_state(path: str, data: Dict[str, Any]) -> None: ensure_parent_dir(path) tmp = path + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) os.replace(tmp, path) def norm_email(addr: str) -> str: return (addr or "").strip().lower() def cleanup_log_sheets(log_path: str, keep_only_real: bool = False) -> None: if not os.path.exists(log_path): return try: wb = load_workbook(log_path) except Exception: return removed = False for name in list(wb.sheetnames): upper = name.upper() if (("DRYRUN" in upper) or ("TEST" in upper)) and keep_only_real: ws = wb[name] wb.remove(ws) removed = True if removed: wb.save(log_path) wb.close() def _norm_uid(u) -> str: if isinstance(u, (bytes, bytearray)): return u.decode("utf-8", errors="ignore").strip() return str(u).strip() def _chunked(seq, n): for i in range(0, len(seq), n): yield seq[i:i+n] def imap_mark_answered_batch(host: str, port: int, user: str, pwd: str, mailbox: str, uids: list[str], batch_size: int = 500) -> int: """把一批 UID 标记为 \\Answered,返回成功提交的数量。""" uids = [_norm_uid(u) for u in uids if _norm_uid(u)] if not uids: return 0 M = imaplib.IMAP4_SSL(host, port) M.login(user, pwd) typ, _ = M.select(mailbox, readonly=False) if typ != "OK": try: M.close() except Exception: pass M.logout() return 0 total_ok = 0 for batch in _chunked(uids, batch_size): seqset = ",".join(batch) # e.g. "444,445,446" typ1, _ = M.uid("STORE", seqset, "+FLAGS.SILENT", r"(\Answered)") if typ1 != "OK": typ2, resp2 = M.uid("STORE", seqset, "+FLAGS", r"(\Answered)") if typ2 == "OK": total_ok += len(batch) else: # 逐封回退 for uid in batch: t3, _ = M.uid("STORE", uid, "+FLAGS.SILENT", r"(\Answered)") if t3 == "OK": total_ok += 1 else: t4, _ = M.uid("STORE", uid, "+FLAGS", r"(\Answered)") if t4 == "OK": total_ok += 1 else: total_ok += len(batch) try: M.close() except Exception: pass M.logout() return total_ok def main(): parser = argparse.ArgumentParser(description="Bulk send Steam keys to curators from Excel (one row per key).") parser.add_argument("--config", help="Path to settings.json (default: configs/settings.json).") parser.add_argument("--excel", help="Path to input Excel (default from settings).") parser.add_argument("--keys", help="Path to key pool TXT (default from settings).") parser.add_argument("--out", help="Path to output Excel log (default from settings).") parser.add_argument("--subject", help="Base email subject (default from settings).") parser.add_argument("--sheet", default=None, help="Worksheet name to read within the Excel file (default: first sheet).") parser.add_argument("--template", help="Path to HTML email template file (default from settings).") parser.add_argument("--limit", type=int, default=None, help="Max rows to process from Excel.") parser.add_argument("--dry-run", action="store_true", help="Render emails only; do not send.") parser.add_argument("--test", action="store_true", help="Send to test address instead of recipients.") parser.add_argument("--test-email", default=os.environ.get("TEST_EMAIL", ""), help="Test recipient (with --test).") parser.add_argument("--skip-sent", action="store_true", help="Skip rows already present (by UID) in the output log.") parser.add_argument("--no-sentemail", action="store_true", help="Do not record sent emails into STATE_FILE (useful for testing).") parser.add_argument("--no-consume", action="store_true", help="Do not modify key pool file (do not remove used keys).") # SMTP config (env or CLI) parser.add_argument("--smtp-host") parser.add_argument("--smtp-port", type=int) parser.add_argument("--smtp-user") parser.add_argument("--smtp-pass") parser.add_argument("--from-name") parser.add_argument("--from-email") # IMAP config for optional marking as answered parser.add_argument("--mark-answered", action="store_true", help="After real sends, mark original messages as \\Answered via IMAP.") parser.add_argument("--imap-host") parser.add_argument("--imap-port", type=int) parser.add_argument("--imap-user") parser.add_argument("--imap-pass") parser.add_argument("--imap-mailbox") args = parser.parse_args() config_path = args.config or os.environ.get("SETTINGS_FILE") or DEFAULT_CONFIG_PATH settings = load_settings(config_path) files_conf = settings.get("files", {}) required_files = ["excel", "keys", "log", "template", "state"] missing_files = [k for k in required_files if k not in files_conf] if missing_files: print(f"ERROR: settings.json missing file paths for: {', '.join(missing_files)}", file=sys.stderr) sys.exit(2) args.excel = resolve_path(args.excel, files_conf["excel"]) args.keys = resolve_path(args.keys, files_conf["keys"]) args.out = resolve_path(args.out, files_conf["log"]) args.template = resolve_path(args.template, files_conf["template"]) state_file = os.path.abspath(files_conf["state"]) subject_default = settings.get("subject", "") args.subject = args.subject if args.subject is not None else subject_default smtp_conf = settings.get("smtp", {}) args.smtp_host = args.smtp_host or os.environ.get("SMTP_HOST") or smtp_conf.get("host", "") if args.smtp_port is None: env_port = os.environ.get("SMTP_PORT") if env_port: try: args.smtp_port = int(env_port) except Exception: pass if args.smtp_port is None: args.smtp_port = int(smtp_conf.get("port", 587)) args.smtp_user = args.smtp_user or os.environ.get("SMTP_USER") or smtp_conf.get("user", "") args.smtp_pass = args.smtp_pass or os.environ.get("SMTP_PASS") or smtp_conf.get("pass", "") args.from_name = args.from_name or os.environ.get("FROM_NAME") or smtp_conf.get("from_name", "Krystic") args.from_email = args.from_email or os.environ.get("FROM_EMAIL") or smtp_conf.get("from_email", args.smtp_user) imap_conf = settings.get("imap", {}) args.imap_host = args.imap_host or os.environ.get("IMAP_HOST") or imap_conf.get("host", "") if args.imap_port is None: env_imap_port = os.environ.get("IMAP_PORT") if env_imap_port: try: args.imap_port = int(env_imap_port) except Exception: pass if args.imap_port is None: args.imap_port = int(imap_conf.get("port", 993)) args.imap_user = args.imap_user or os.environ.get("EMAIL_USER") or imap_conf.get("user", args.smtp_user) args.imap_pass = args.imap_pass or os.environ.get("EMAIL_PASS") or imap_conf.get("pass", args.smtp_pass) args.imap_mailbox = args.imap_mailbox or os.environ.get("MAILBOX") or imap_conf.get("mailbox", "INBOX") global_state = load_global_state(state_file) sent_emails_global = set() for existing_email in global_state.get("sent_emails", []): normalized = norm_email(existing_email) if normalized: sent_emails_global.add(normalized) run_sent_emails: set[str] = set() state_emails_run: set[str] = set() if not args.dry_run and not args.test: cleanup_log_sheets(args.out, keep_only_real=True) # Validate SMTP when not dry-run if not args.dry_run: for vname in ["smtp_host", "smtp_user", "smtp_pass", "from_email"]: if not getattr(args, vname): print(f"ERROR: Missing SMTP config --{vname.replace('_','-')} (or env var). Use --dry-run to preview.", file=sys.stderr) sys.exit(2) # Load template (external file if exists, otherwise fallback) template_html = load_template(args.template) # Load Excel sheet_to_read: Optional[str | int] if args.sheet: sheet_to_read = args.sheet else: sheet_to_read = 0 df = pd.read_excel(args.excel, sheet_name=sheet_to_read) required_cols = ["Mailbox Key", "Email", "Curator/Name", "Requested Key Count", "Subject", "Curator/Social Links"] for c in required_cols: if c not in df.columns: print(f"ERROR: Excel missing column: {c}", file=sys.stderr) sys.exit(3) # Load existing log to support --skip-sent (by UID) sent_uids = set() if args.skip_sent and os.path.exists(args.out): try: logdfs = pd.read_excel(args.out, sheet_name=None) for _, logdf in logdfs.items(): if "Mailbox Key" in logdf.columns: sent_uids.update(str(x) for x in logdf["Mailbox Key"].astype(str).tolist()) except Exception: pass # Load key pool pool = load_keys(args.keys) # Prepare per-key logging log_rows: List[Dict[str, Any]] = [] row_no = 1 processed = 0 # ===== 进度与汇总统计 ===== # 估算计划处理的“邮件行”总数(考虑 --limit 与 --skip-sent) if args.limit is not None: total_target = min(len(df), args.limit) else: total_target = len(df) if args.skip_sent and os.path.exists(args.out): try: # 粗略估算,已发过的行会被跳过(只是估算,实际略有出入也没关系) total_target = max(0, total_target - len(sent_uids)) except Exception: pass attempt_rows = 0 # 实际尝试发送的“邮件行”(有成功分配到 key 才算一次尝试) emails_ok = 0 # 发送成功(SENT 或 SENT_TEST) emails_fail = 0 # 失败/跳过(ERROR 或 SKIPPED_NO_KEYS 等) keys_assigned_total = 0 # 实际分配(写进邮件里的)key 数(dry-run/test 也会统计) duplicate_skipped = 0 # 因邮箱重复而跳过的行数 uids_to_mark_answered: list[str] = [] interrupted = False # Iterate over Excel rows (one email row) try: for idx, row in df.iterrows(): try: uid = str(row.get("Mailbox Key", "")).strip() email_to = str(row.get("Email", "")).strip() email_norm = norm_email(email_to) msg_id_val = row.get("Original Message-ID", "") if pd.isna(msg_id_val): original_message_id = "" else: original_message_id = str(msg_id_val).strip() name_val = row.get("Curator/Name", "") if pd.isna(name_val) or not str(name_val).strip(): curator_name = "Curator" else: curator_name = str(name_val).strip() if args.test: curator_name = curator_name + "(" + email_to + ")" req_num = row.get("Requested Key Count") try: key_num = int(req_num) if pd.notna(req_num) else 2 except Exception: key_num = 2 if key_num <= 0: key_num = 2 if args.skip_sent and uid and uid in sent_uids: continue if args.limit is not None and processed >= args.limit: break # Channel & link detection (robust against NaN) val = row.get("Curator/Social Links", "") if pd.isna(val): links_str = "" else: links_str = str(val).strip() channel, chosen_link = parse_channel_and_link(links_str) # ✅ 一律将 None/NaN/空白 归一为 "" safe_link = "" if (chosen_link is None or (isinstance(chosen_link, float) and pd.isna(chosen_link)) or not str(chosen_link).strip()) else str(chosen_link).strip() orig_subject = str(row.get("Subject", "")).strip() final_subject = args.subject assigned: List[str] = [] to_addr = "" duplicate_email = bool(email_norm and (email_norm in sent_emails_global or email_norm in run_sent_emails)) if duplicate_email: status = "SKIPPED_DUP_EMAIL" duplicate_skipped += 1 elif len(pool) < key_num: print(f"WARNING: Not enough keys left for UID {uid}. Needed {key_num}, have {len(pool)}. Skipping.", file=sys.stderr) status = "SKIPPED_NO_KEYS" else: assigned = pool[:key_num] if orig_subject: final_subject = f"{args.subject} RE: {orig_subject}" html = render_email_html(template_html, curator_name, key_num, assigned) # Decide recipient to_addr = args.test_email if args.test and args.test_email else (os.environ.get("TEST_EMAIL") if args.test else email_to) if args.test and not to_addr: print("ERROR: --test specified but no test email provided. Use --test-email or TEST_EMAIL env.", file=sys.stderr) sys.exit(4) # Send (or dry-run) send_email( smtp_host=args.smtp_host, smtp_port=args.smtp_port, smtp_user=args.smtp_user, smtp_pass=args.smtp_pass, from_name=args.from_name, from_email=args.from_email, original_message_id=original_message_id or None, to_email=to_addr, subject=final_subject, html=html, dry_run=args.dry_run ) status = "SENT_TEST" if args.test else ("DRY_RUN" if args.dry_run else "SENT") if email_norm and status in ("SENT", "SENT_TEST", "DRY_RUN"): run_sent_emails.add(email_norm) if email_norm and status in ("SENT", "SENT_TEST"): state_emails_run.add(email_norm) if status == "SENT": uids_to_mark_answered.append(uid) if assigned: attempt_rows += 1 if status in ("SENT", "SENT_TEST"): emails_ok += 1 keys_assigned_total += len(assigned) else: emails_fail += 1 if not args.dry_run: pct = (processed / total_target * 100) if total_target else 0 print(f"[{processed}/{total_target} | {pct:.1f}%] {status} UID={uid} to={to_addr or email_to} keys={len(assigned)}") if not args.no_consume: pool = pool[key_num:] # Prepare BJ timestamps now_bj = datetime.now(ZoneInfo("Asia/Shanghai")) send_date = now_bj.strftime("%Y-%m-%d") # YYYY-MM-DD sent_at = now_bj.strftime("%Y-%m-%d %H:%M:%S") # detailed # Log ONE ROW PER KEY if assigned: for k in assigned: log_rows.append({ "No.": row_no, "Send Date": send_date, "Channel": channel or "Steam", "Curator/Name": curator_name, "Purpose": "评测", "Social Link": safe_link, "Key": k, "Mailbox Key": uid, "To": email_to, "Requested Key Count": key_num, "Subject": final_subject, "Status": status, "Sent At": sent_at, "Test Mode": bool(args.test), "Actual Recipient": to_addr if (args.test or args.dry_run) else email_to, }) row_no += 1 else: # Even if skipped/no keys, write a single row for traceability (without Key) log_rows.append({ "No.": row_no, "Send Date": send_date, "Channel": channel or "Steam", "Curator/Name": curator_name, "Purpose": "评测", "Social Link": chosen_link, "Key": "", "Mailbox Key": uid, "To": email_to, "Requested Key Count": key_num, "Subject": final_subject, "Status": status, "Sent At": sent_at, "Test Mode": bool(args.test), "Actual Recipient": to_addr if (args.test or args.dry_run) else email_to, }) row_no += 1 processed += 1 except Exception as e: now_bj = datetime.now(ZoneInfo("Asia/Shanghai")) log_rows.append({ "No.": row_no, "Send Date": now_bj.strftime("%Y-%m-%d"), "Channel": "Steam", "Curator/Name": str(row.get("Curator/Name", "")), "Purpose": "评测", "Social Link": str(row.get("Curator/Social Links", "")), "Key": "", "Mailbox Key": str(row.get("Mailbox Key", "")), "To": str(row.get("Email", "")), "Requested Key Count": row.get("Requested Key Count"), "Subject": str(row.get("Subject", "")), "Status": f"ERROR:{e}", "Sent At": now_bj.strftime("%Y-%m-%d %H:%M:%S"), "Test Mode": bool(args.test), "Actual Recipient": "", }) row_no += 1 print(f"ERROR processing row {idx}: {e}", file=sys.stderr) continue except KeyboardInterrupt: interrupted = True print("\n[INTERRUPTED] Sending loop stopped by user. Finalizing current progress.") if args.mark_answered and not args.dry_run and not args.test and uids_to_mark_answered: done = imap_mark_answered_batch( host=args.imap_host, port=args.imap_port, user=args.imap_user, pwd=args.imap_pass, mailbox=args.imap_mailbox, uids=uids_to_mark_answered, batch_size=500 ) print(f"[INFO] Marked {done} message(s) as \\Answered.") # Build DataFrame with desired column order columns = [ "No.", "Send Date", "Channel", "Curator/Name", "Purpose", "Social Link", "Key", "Mailbox Key", "To", "Requested Key Count", "Subject", "Status", "Sent At", "Test Mode", "Actual Recipient" ] log_df = pd.DataFrame(log_rows, columns=columns) log_path = os.path.abspath(args.out) ensure_parent_dir(log_path) date_tag = datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d") sheet_prefix = f"sendlog_{date_tag}" if args.dry_run: sheet_prefix += "_DRYRUN" elif args.test: sheet_prefix += "_TEST" existing_sheets = set() if os.path.exists(log_path): try: wb = load_workbook(log_path, read_only=True) existing_sheets = set(wb.sheetnames) wb.close() except Exception: existing_sheets = set() suffix = 1 while True: sheet_name = f"{sheet_prefix}{suffix:02d}" if sheet_name not in existing_sheets: break suffix += 1 writer_args: Dict[str, Any] = {"engine": "openpyxl"} if os.path.exists(log_path): writer_args["mode"] = "a" with pd.ExcelWriter(log_path, **writer_args) as writer: log_df.to_excel(writer, sheet_name=sheet_name, index=False) book = writer.book if sheet_name in book.sheetnames and book.sheetnames[0] != sheet_name: ws = book[sheet_name] sheets = book._sheets # type: ignore[attr-defined] sheets.insert(0, sheets.pop(sheets.index(ws))) # ===== 汇总报告 ===== mode = "REAL" if args.dry_run: mode = "DRY-RUN" elif args.test: mode = "TEST" print("\n=== RUN SUMMARY ===") print(f"Mode: {mode}") print(f"Email rows processed: {processed}") print(f"Attempted sends: {attempt_rows} (rows that had keys assigned)") print(f"Successful sends: {emails_ok}") print(f"Failed/Skipped: {emails_fail}") if duplicate_skipped: print(f"Skipped duplicates: {duplicate_skipped}") print(f"Keys assigned total: {keys_assigned_total}") print(f"Log file: {log_path}") if interrupted: print("[INTERRUPTED] Run stopped early; review the log for partial results.") if (not args.dry_run) and state_emails_run and (not args.no_sentemail): merged_emails = sorted(sent_emails_global.union(state_emails_run)) global_state["sent_emails"] = merged_emails save_global_state(state_file, global_state) # Save remaining keys if consuming (not dry-run/test) if not args.no_consume and not args.dry_run and not args.test: save_remaining_keys(args.keys, pool) print(f"Done. Processed {processed} email row(s). Logged {len(log_df)} key row(s). File: {log_path}") if not args.no_consume: print(f"Current in-memory remaining keys (not saved if test/dry-run): {len(pool)}") if __name__ == "__main__": main()