imap_flags_inspector.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. imap_flags_inspector.py
  5. Inspect FLAGS for messages on an IMAP server (e.g., Gmail),
  6. to compare what the client recognizes as read/unread/flagged etc.
  7. Usage examples:
  8. # 1) List FLAGS for last 200 messages in INBOX
  9. python imap_flags_inspector.py --host imap.gmail.com --user you@gmail.com --mailbox INBOX --search ALL --limit 200
  10. # 2) Only UNSEEN messages, show mailbox meta and server capabilities
  11. python imap_flags_inspector.py --host imap.gmail.com --user you@gmail.com --mailbox INBOX --search UNSEEN --show-mailbox-meta --show-capabilities
  12. # 3) Specify UIDs explicitly (comma-separated)
  13. python imap_flags_inspector.py --host imap.gmail.com --user you@gmail.com --mailbox INBOX --uids 12345,12346
  14. Env vars (fallbacks for args):
  15. IMAP_HOST, IMAP_PORT, IMAP_SSL, EMAIL_USER, EMAIL_PASS, MAILBOX
  16. """
  17. import os
  18. import sys
  19. import imaplib
  20. import argparse
  21. from email import message_from_bytes
  22. from email.header import decode_header, make_header
  23. def env_default(name, default=None, cast=str):
  24. v = os.environ.get(name, None)
  25. if v is None:
  26. return default
  27. try:
  28. return cast(v)
  29. except Exception:
  30. return default
  31. def decode_field(raw):
  32. if not raw:
  33. return ""
  34. try:
  35. # raw is str already?
  36. if isinstance(raw, str):
  37. return str(make_header(decode_header(raw)))
  38. # bytes
  39. return str(make_header(decode_header(raw.decode("utf-8", errors="replace"))))
  40. except Exception:
  41. try:
  42. return raw.decode("utf-8", errors="replace")
  43. except Exception:
  44. return str(raw)
  45. def fetch_headers(M, uid):
  46. # Only fetch small subset of headers for readability
  47. typ, data = M.uid("FETCH", uid, '(BODY.PEEK[HEADER.FIELDS (SUBJECT FROM DATE)])')
  48. subject, from_, date_ = "", "", ""
  49. if typ == "OK" and data and isinstance(data[0], tuple):
  50. msg = message_from_bytes(data[0][1])
  51. subject = decode_field(msg.get("Subject", ""))
  52. from_ = decode_field(msg.get("From", ""))
  53. date_ = decode_field(msg.get("Date", ""))
  54. replyto = decode_field(msg.get("Reply-To", ""))
  55. references = decode_field(msg.get("References", ""))
  56. return subject, from_, date_, replyto, references
  57. def fetch_flags(M, uid):
  58. typ, data = M.uid("FETCH", uid, "(FLAGS)")
  59. flags = ""
  60. if typ == "OK" and data:
  61. # data could be [(b'123 (FLAGS (\Seen \Flagged))', b'')]
  62. blob = b" ".join(part if isinstance(part, (bytes, bytearray)) else b""
  63. for part in data)
  64. flags = blob.decode("utf-8", errors="replace")
  65. # extract inside FLAGS (...) if present
  66. # not strictly required; printing the blob helps debugging servers
  67. return flags
  68. def main():
  69. ap = argparse.ArgumentParser(description="Inspect IMAP FLAGS for messages.")
  70. ap.add_argument("--host", default=env_default("IMAP_HOST", "imap.gmail.com"))
  71. ap.add_argument("--port", type=int, default=env_default("IMAP_PORT", 993, int))
  72. ap.add_argument("--ssl", type=str, default=env_default("IMAP_SSL", "1"), help="1/true/on = IMAP over SSL; 0/false = plain + STARTTLS if --starttls")
  73. ap.add_argument("--starttls", action="store_true", help="Use STARTTLS if not using SSL")
  74. ap.add_argument("--user", default=env_default("EMAIL_USER", ""))
  75. ap.add_argument("--password", default=env_default("EMAIL_PASS", ""))
  76. ap.add_argument("--mailbox", default=env_default("MAILBOX", "INBOX"))
  77. ap.add_argument("--readonly", action="store_true", help="Select mailbox read-only (safe for inspection).")
  78. ap.add_argument("--search", action="append", help="IMAP SEARCH key(s), e.g. ALL / UNSEEN / SINCE 01-Oct-2025. Can be repeated.")
  79. ap.add_argument("--uids", help="Comma-separated UID list to fetch (skips SEARCH).")
  80. ap.add_argument("--limit", type=int, default=200, help="Max messages to print (after search).")
  81. ap.add_argument("--show-capabilities", action="store_true")
  82. ap.add_argument("--show-mailbox-meta", action="store_true", help="Show PERMANENTFLAGS / UIDNEXT / UIDVALIDITY / HIGHESTMODSEQ")
  83. args = ap.parse_args()
  84. use_ssl = str(args.ssl).strip().lower() in ("1", "true", "yes", "on")
  85. # Connect
  86. if use_ssl:
  87. M = imaplib.IMAP4_SSL(args.host, args.port)
  88. else:
  89. M = imaplib.IMAP4(args.host, args.port)
  90. if args.starttls:
  91. M.starttls()
  92. # Capabilities
  93. if args.show_capabilities:
  94. try:
  95. caps = M.capabilities
  96. print("[CAPABILITIES]", caps)
  97. except Exception as e:
  98. print("[WARN] Get capabilities failed:", e, file=sys.stderr)
  99. # Login
  100. if not args.user or not args.password:
  101. print("ERROR: --user/--password (or EMAIL_USER/EMAIL_PASS) is required.", file=sys.stderr)
  102. sys.exit(2)
  103. M.login(args.user, args.password)
  104. # Select mailbox
  105. typ, data = M.select(args.mailbox, readonly=args.readonly)
  106. if typ != "OK":
  107. print(f"ERROR: Cannot select mailbox {args.mailbox}: {typ} {data}", file=sys.stderr)
  108. sys.exit(3)
  109. # Show mailbox meta
  110. if args.show_mailbox_meta:
  111. try:
  112. # UIDNEXT
  113. typ1, d1 = M.status(args.mailbox, "(UIDNEXT UIDVALIDITY UNSEEN MESSAGES)")
  114. print("[MAILBOX STATUS]", d1[0].decode() if d1 and d1[0] else d1)
  115. except Exception as e:
  116. print("[WARN] STATUS failed:", e, file=sys.stderr)
  117. try:
  118. # PERMANENTFLAGS / HIGHESTMODSEQ
  119. typ2, d2 = M.response("PERMANENTFLAGS")
  120. print("[PERMANENTFLAGS]", d2)
  121. except Exception as e:
  122. print("[WARN] PERMANENTFLAGS failed:", e, file=sys.stderr)
  123. try:
  124. typ3, d3 = M.response("HIGHESTMODSEQ")
  125. print("[HIGHESTMODSEQ]", d3)
  126. except Exception as e:
  127. print("[WARN] HIGHESTMODSEQ failed:", e, file=sys.stderr)
  128. # Build UID list
  129. uids = []
  130. if args.uids:
  131. uids = [u.strip() for u in args.uids.split(",") if u.strip()]
  132. else:
  133. # SEARCH
  134. search_criteria = args.search or ["ALL"]
  135. # imaplib expects a sequence of *strings* as separate args; join basic tokens by space if user passed multiple
  136. # For simplicity we join each provided item into a single string; you can pass things like:
  137. # --search UNSEEN
  138. # --search 'SINCE 01-Oct-2025'
  139. for crit in search_criteria:
  140. typ, data = M.uid("SEARCH", None, crit)
  141. if typ == "OK" and data and data[0]:
  142. ids = data[0].decode().split()
  143. uids.extend(ids)
  144. # de-duplicate and keep order
  145. seen = set()
  146. uids = [u for u in uids if (u not in seen and not seen.add(u))]
  147. if not uids:
  148. print("[INFO] No messages matched.")
  149. M.close(); M.logout()
  150. return
  151. # Limit from the end (usually newer messages have larger UIDs)
  152. if args.limit and len(uids) > args.limit:
  153. uids = uids[-args.limit:]
  154. print(f"[INFO] Inspecting {len(uids)} message(s) in {args.mailbox}")
  155. print("-" * 80)
  156. for uid in uids:
  157. flags = fetch_flags(M, uid)
  158. subj, from_, date_, rep, ref = fetch_headers(M, uid)
  159. print(f"UID: {uid}")
  160. print(f"FLAGS: {flags}")
  161. print(f"From: {from_}")
  162. print(f"Date: {date_}")
  163. print(f"Subj: {subj}")
  164. print(f"Reply-To: {rep}")
  165. print(f"References: {ref}")
  166. print("-" * 80)
  167. try:
  168. M.close()
  169. except Exception:
  170. pass
  171. M.logout()
  172. if __name__ == "__main__":
  173. main()