| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 临时脚本:读取 configs/settings.json 里的 IMAP 配置,从指定的 Sent 邮箱抓取所有邮件,
- 把其中出现过的收件人邮箱写入 configs/curator_status.json 的 sent_emails 字段。
- 使用方式(在仓库根目录):
- python backfill_sent_emails.py
- """
- import os
- import json
- import sys
- import imaplib
- import email
- from email.utils import getaddresses
- SETTINGS_PATH = os.path.join("configs", "settings.json")
- def load_settings(path: str) -> dict:
- if not os.path.exists(path):
- print(f"ERROR: settings file not found: {path}", file=sys.stderr)
- sys.exit(2)
- with open(path, "r", encoding="utf-8") as f:
- data = json.load(f)
- files = data.get("files", {})
- for key, val in list(files.items()):
- if not os.path.isabs(val):
- files[key] = os.path.abspath(val)
- data["files"] = files
- return data
- def load_state(path: str) -> dict:
- if not os.path.exists(path):
- return {"_path": path, "mailboxes": {}, "sent_emails": []}
- with open(path, "r", encoding="utf-8") as f:
- data = json.load(f)
- if "sent_emails" not in data or not isinstance(data["sent_emails"], list):
- data["sent_emails"] = []
- data["_path"] = path
- return data
- def save_state(state: dict) -> None:
- path = state.get("_path")
- if not path:
- return
- os.makedirs(os.path.dirname(path), exist_ok=True)
- tmp = path + ".tmp"
- with open(tmp, "w", encoding="utf-8") as f:
- json.dump({k: v for k, v in state.items() if k != "_path"}, f, ensure_ascii=False, indent=2)
- os.replace(tmp, path)
- def fetch_sent_addresses(imap_conf: dict) -> set[str]:
- host = imap_conf.get("host")
- port = int(imap_conf.get("port", 993))
- user = imap_conf.get("user")
- pwd = imap_conf.get("pass")
- mailbox = imap_conf.get("sent_mailbox") or imap_conf.get("mailbox") or "Sent"
- if not (host and user and pwd):
- print("ERROR: settings.json 缺少 IMAP 登录配置", file=sys.stderr)
- sys.exit(2)
- conn = imaplib.IMAP4_SSL(host, port)
- conn.login(user, pwd)
- typ, _ = conn.select(mailbox, readonly=True)
- if typ != "OK":
- print(f"ERROR: 无法选择邮箱 {mailbox}", file=sys.stderr)
- conn.logout()
- sys.exit(3)
- typ, data = conn.search(None, "ALL")
- if typ != "OK" or not data or not data[0]:
- conn.close(); conn.logout()
- return set()
- addresses: set[str] = set()
- uids = data[0].split()
- for idx, uid in enumerate(uids, 1):
- typ, msg_data = conn.fetch(uid, "(RFC822.HEADER)")
- if typ != "OK" or not msg_data:
- continue
- raw = msg_data[0][1]
- msg = email.message_from_bytes(raw)
- fields = []
- target_headers = [
- "To", "Cc", "Bcc", "Resent-To", "Resent-Cc", "Resent-Bcc",
- "X-Original-To", "Delivered-To", "X-Real-To", "X-Forwarded-To"
- ]
- for field in target_headers:
- values = msg.get_all(field, [])
- if not values:
- continue
- if isinstance(values, str):
- fields.append(values)
- else:
- fields.extend(values)
- for _, addr in getaddresses(fields):
- if addr:
- addresses.add(addr.strip().lower())
- if idx % 200 == 0:
- print(f"[INFO] processed {idx}/{len(uids)} messages ...")
- try:
- conn.close()
- except Exception:
- pass
- conn.logout()
- return addresses
- def main() -> None:
- settings = load_settings(SETTINGS_PATH)
- files = settings.get("files", {})
- state_path = os.path.abspath(files.get("state", os.path.join("configs", "curator_status.json")))
- imap_conf = settings.get("imap", {})
- new_addresses = fetch_sent_addresses(imap_conf)
- print(f"[INFO] collected {len(new_addresses)} address(es) from Sent mailbox.")
- state = load_state(state_path)
- existing = {addr.strip().lower() for addr in state.get("sent_emails", []) if addr}
- merged = sorted(existing.union(new_addresses))
- state["sent_emails"] = merged
- save_state(state)
- print(f"[DONE] sent_emails updated to {len(merged)} unique entries in {state_path}.")
- if __name__ == "__main__":
- main()
|