fill_sent_log.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 临时脚本:读取 configs/settings.json 里的 IMAP 配置,从指定的 Sent 邮箱抓取所有邮件,
  5. 把其中出现过的收件人邮箱写入 configs/curator_status.json 的 sent_emails 字段。
  6. 使用方式(在仓库根目录):
  7. python backfill_sent_emails.py
  8. """
  9. import os
  10. import json
  11. import sys
  12. import imaplib
  13. import email
  14. from email.utils import getaddresses
  15. SETTINGS_PATH = os.path.join("configs", "settings.json")
  16. def load_settings(path: str) -> dict:
  17. if not os.path.exists(path):
  18. print(f"ERROR: settings file not found: {path}", file=sys.stderr)
  19. sys.exit(2)
  20. with open(path, "r", encoding="utf-8") as f:
  21. data = json.load(f)
  22. files = data.get("files", {})
  23. for key, val in list(files.items()):
  24. if not os.path.isabs(val):
  25. files[key] = os.path.abspath(val)
  26. data["files"] = files
  27. return data
  28. def load_state(path: str) -> dict:
  29. if not os.path.exists(path):
  30. return {"_path": path, "mailboxes": {}, "sent_emails": []}
  31. with open(path, "r", encoding="utf-8") as f:
  32. data = json.load(f)
  33. if "sent_emails" not in data or not isinstance(data["sent_emails"], list):
  34. data["sent_emails"] = []
  35. data["_path"] = path
  36. return data
  37. def save_state(state: dict) -> None:
  38. path = state.get("_path")
  39. if not path:
  40. return
  41. os.makedirs(os.path.dirname(path), exist_ok=True)
  42. tmp = path + ".tmp"
  43. with open(tmp, "w", encoding="utf-8") as f:
  44. json.dump({k: v for k, v in state.items() if k != "_path"}, f, ensure_ascii=False, indent=2)
  45. os.replace(tmp, path)
  46. def fetch_sent_addresses(imap_conf: dict) -> set[str]:
  47. host = imap_conf.get("host")
  48. port = int(imap_conf.get("port", 993))
  49. user = imap_conf.get("user")
  50. pwd = imap_conf.get("pass")
  51. mailbox = imap_conf.get("sent_mailbox") or imap_conf.get("mailbox") or "Sent"
  52. if not (host and user and pwd):
  53. print("ERROR: settings.json 缺少 IMAP 登录配置", file=sys.stderr)
  54. sys.exit(2)
  55. conn = imaplib.IMAP4_SSL(host, port)
  56. conn.login(user, pwd)
  57. typ, _ = conn.select(mailbox, readonly=True)
  58. if typ != "OK":
  59. print(f"ERROR: 无法选择邮箱 {mailbox}", file=sys.stderr)
  60. conn.logout()
  61. sys.exit(3)
  62. typ, data = conn.search(None, "ALL")
  63. if typ != "OK" or not data or not data[0]:
  64. conn.close(); conn.logout()
  65. return set()
  66. addresses: set[str] = set()
  67. uids = data[0].split()
  68. for idx, uid in enumerate(uids, 1):
  69. typ, msg_data = conn.fetch(uid, "(RFC822.HEADER)")
  70. if typ != "OK" or not msg_data:
  71. continue
  72. raw = msg_data[0][1]
  73. msg = email.message_from_bytes(raw)
  74. fields = []
  75. target_headers = [
  76. "To", "Cc", "Bcc", "Resent-To", "Resent-Cc", "Resent-Bcc",
  77. "X-Original-To", "Delivered-To", "X-Real-To", "X-Forwarded-To"
  78. ]
  79. for field in target_headers:
  80. values = msg.get_all(field, [])
  81. if not values:
  82. continue
  83. if isinstance(values, str):
  84. fields.append(values)
  85. else:
  86. fields.extend(values)
  87. for _, addr in getaddresses(fields):
  88. if addr:
  89. addresses.add(addr.strip().lower())
  90. if idx % 200 == 0:
  91. print(f"[INFO] processed {idx}/{len(uids)} messages ...")
  92. try:
  93. conn.close()
  94. except Exception:
  95. pass
  96. conn.logout()
  97. return addresses
  98. def main() -> None:
  99. settings = load_settings(SETTINGS_PATH)
  100. files = settings.get("files", {})
  101. state_path = os.path.abspath(files.get("state", os.path.join("configs", "curator_status.json")))
  102. imap_conf = settings.get("imap", {})
  103. new_addresses = fetch_sent_addresses(imap_conf)
  104. print(f"[INFO] collected {len(new_addresses)} address(es) from Sent mailbox.")
  105. state = load_state(state_path)
  106. existing = {addr.strip().lower() for addr in state.get("sent_emails", []) if addr}
  107. merged = sorted(existing.union(new_addresses))
  108. state["sent_emails"] = merged
  109. save_state(state)
  110. print(f"[DONE] sent_emails updated to {len(merged)} unique entries in {state_path}.")
  111. if __name__ == "__main__":
  112. main()