#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 临时脚本:读取 configs/settings.json 里的 IMAP 配置,从指定的 Sent 邮箱抓取所有邮件, 把其中出现过的收件人邮箱写入 configs/curator_status.json 的 sent_emails 字段。 使用方式(在仓库根目录): python backfill_sent_emails.py """ import os import json import sys import imaplib import email from email.utils import getaddresses SETTINGS_PATH = os.path.join("configs", "settings.json") def load_settings(path: str) -> dict: if not os.path.exists(path): print(f"ERROR: settings file not found: {path}", file=sys.stderr) sys.exit(2) with open(path, "r", encoding="utf-8") as f: data = json.load(f) files = data.get("files", {}) for key, val in list(files.items()): if not os.path.isabs(val): files[key] = os.path.abspath(val) data["files"] = files return data def load_state(path: str) -> dict: if not os.path.exists(path): return {"_path": path, "mailboxes": {}, "sent_emails": []} with open(path, "r", encoding="utf-8") as f: data = json.load(f) if "sent_emails" not in data or not isinstance(data["sent_emails"], list): data["sent_emails"] = [] data["_path"] = path return data def save_state(state: dict) -> None: path = state.get("_path") if not path: return os.makedirs(os.path.dirname(path), exist_ok=True) tmp = path + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump({k: v for k, v in state.items() if k != "_path"}, f, ensure_ascii=False, indent=2) os.replace(tmp, path) def fetch_sent_addresses(imap_conf: dict) -> set[str]: host = imap_conf.get("host") port = int(imap_conf.get("port", 993)) user = imap_conf.get("user") pwd = imap_conf.get("pass") mailbox = imap_conf.get("sent_mailbox") or imap_conf.get("mailbox") or "Sent" if not (host and user and pwd): print("ERROR: settings.json 缺少 IMAP 登录配置", file=sys.stderr) sys.exit(2) conn = imaplib.IMAP4_SSL(host, port) conn.login(user, pwd) typ, _ = conn.select(mailbox, readonly=True) if typ != "OK": print(f"ERROR: 无法选择邮箱 {mailbox}", file=sys.stderr) conn.logout() sys.exit(3) typ, data = conn.search(None, "ALL") if typ != "OK" or not data or not data[0]: conn.close(); conn.logout() return set() addresses: set[str] = set() uids = data[0].split() for idx, uid in enumerate(uids, 1): typ, msg_data = conn.fetch(uid, "(RFC822.HEADER)") if typ != "OK" or not msg_data: continue raw = msg_data[0][1] msg = email.message_from_bytes(raw) fields = [] target_headers = [ "To", "Cc", "Bcc", "Resent-To", "Resent-Cc", "Resent-Bcc", "X-Original-To", "Delivered-To", "X-Real-To", "X-Forwarded-To" ] for field in target_headers: values = msg.get_all(field, []) if not values: continue if isinstance(values, str): fields.append(values) else: fields.extend(values) for _, addr in getaddresses(fields): if addr: addresses.add(addr.strip().lower()) if idx % 200 == 0: print(f"[INFO] processed {idx}/{len(uids)} messages ...") try: conn.close() except Exception: pass conn.logout() return addresses def main() -> None: settings = load_settings(SETTINGS_PATH) files = settings.get("files", {}) state_path = os.path.abspath(files.get("state", os.path.join("configs", "curator_status.json"))) imap_conf = settings.get("imap", {}) new_addresses = fetch_sent_addresses(imap_conf) print(f"[INFO] collected {len(new_addresses)} address(es) from Sent mailbox.") state = load_state(state_path) existing = {addr.strip().lower() for addr in state.get("sent_emails", []) if addr} merged = sorted(existing.union(new_addresses)) state["sent_emails"] = merged save_state(state) print(f"[DONE] sent_emails updated to {len(merged)} unique entries in {state_path}.") if __name__ == "__main__": main()