| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- imap_flags_inspector.py
- Inspect FLAGS for messages on an IMAP server (e.g., Gmail),
- to compare what the client recognizes as read/unread/flagged etc.
- Usage examples:
- # 1) List FLAGS for last 200 messages in INBOX
- python imap_flags_inspector.py --host imap.gmail.com --user you@gmail.com --mailbox INBOX --search ALL --limit 200
- # 2) Only UNSEEN messages, show mailbox meta and server capabilities
- python imap_flags_inspector.py --host imap.gmail.com --user you@gmail.com --mailbox INBOX --search UNSEEN --show-mailbox-meta --show-capabilities
- # 3) Specify UIDs explicitly (comma-separated)
- python imap_flags_inspector.py --host imap.gmail.com --user you@gmail.com --mailbox INBOX --uids 12345,12346
- Env vars (fallbacks for args):
- IMAP_HOST, IMAP_PORT, IMAP_SSL, EMAIL_USER, EMAIL_PASS, MAILBOX
- """
- import os
- import sys
- import imaplib
- import argparse
- from email import message_from_bytes
- from email.header import decode_header, make_header
- def env_default(name, default=None, cast=str):
- v = os.environ.get(name, None)
- if v is None:
- return default
- try:
- return cast(v)
- except Exception:
- return default
- def decode_field(raw):
- if not raw:
- return ""
- try:
- # raw is str already?
- if isinstance(raw, str):
- return str(make_header(decode_header(raw)))
- # bytes
- return str(make_header(decode_header(raw.decode("utf-8", errors="replace"))))
- except Exception:
- try:
- return raw.decode("utf-8", errors="replace")
- except Exception:
- return str(raw)
- def fetch_headers(M, uid):
- # Only fetch small subset of headers for readability
- typ, data = M.uid("FETCH", uid, '(BODY.PEEK[HEADER.FIELDS (SUBJECT FROM DATE)])')
- subject, from_, date_ = "", "", ""
- if typ == "OK" and data and isinstance(data[0], tuple):
- msg = message_from_bytes(data[0][1])
- subject = decode_field(msg.get("Subject", ""))
- from_ = decode_field(msg.get("From", ""))
- date_ = decode_field(msg.get("Date", ""))
- replyto = decode_field(msg.get("Reply-To", ""))
- references = decode_field(msg.get("References", ""))
- return subject, from_, date_, replyto, references
- def fetch_flags(M, uid):
- typ, data = M.uid("FETCH", uid, "(FLAGS)")
- flags = ""
- if typ == "OK" and data:
- # data could be [(b'123 (FLAGS (\Seen \Flagged))', b'')]
- blob = b" ".join(part if isinstance(part, (bytes, bytearray)) else b""
- for part in data)
- flags = blob.decode("utf-8", errors="replace")
- # extract inside FLAGS (...) if present
- # not strictly required; printing the blob helps debugging servers
- return flags
- def main():
- ap = argparse.ArgumentParser(description="Inspect IMAP FLAGS for messages.")
- ap.add_argument("--host", default=env_default("IMAP_HOST", "imap.gmail.com"))
- ap.add_argument("--port", type=int, default=env_default("IMAP_PORT", 993, int))
- ap.add_argument("--ssl", type=str, default=env_default("IMAP_SSL", "1"), help="1/true/on = IMAP over SSL; 0/false = plain + STARTTLS if --starttls")
- ap.add_argument("--starttls", action="store_true", help="Use STARTTLS if not using SSL")
- ap.add_argument("--user", default=env_default("EMAIL_USER", ""))
- ap.add_argument("--password", default=env_default("EMAIL_PASS", ""))
- ap.add_argument("--mailbox", default=env_default("MAILBOX", "INBOX"))
- ap.add_argument("--readonly", action="store_true", help="Select mailbox read-only (safe for inspection).")
- ap.add_argument("--search", action="append", help="IMAP SEARCH key(s), e.g. ALL / UNSEEN / SINCE 01-Oct-2025. Can be repeated.")
- ap.add_argument("--uids", help="Comma-separated UID list to fetch (skips SEARCH).")
- ap.add_argument("--limit", type=int, default=200, help="Max messages to print (after search).")
- ap.add_argument("--show-capabilities", action="store_true")
- ap.add_argument("--show-mailbox-meta", action="store_true", help="Show PERMANENTFLAGS / UIDNEXT / UIDVALIDITY / HIGHESTMODSEQ")
- args = ap.parse_args()
- use_ssl = str(args.ssl).strip().lower() in ("1", "true", "yes", "on")
- # Connect
- if use_ssl:
- M = imaplib.IMAP4_SSL(args.host, args.port)
- else:
- M = imaplib.IMAP4(args.host, args.port)
- if args.starttls:
- M.starttls()
- # Capabilities
- if args.show_capabilities:
- try:
- caps = M.capabilities
- print("[CAPABILITIES]", caps)
- except Exception as e:
- print("[WARN] Get capabilities failed:", e, file=sys.stderr)
- # Login
- if not args.user or not args.password:
- print("ERROR: --user/--password (or EMAIL_USER/EMAIL_PASS) is required.", file=sys.stderr)
- sys.exit(2)
- M.login(args.user, args.password)
- # Select mailbox
- typ, data = M.select(args.mailbox, readonly=args.readonly)
- if typ != "OK":
- print(f"ERROR: Cannot select mailbox {args.mailbox}: {typ} {data}", file=sys.stderr)
- sys.exit(3)
- # Show mailbox meta
- if args.show_mailbox_meta:
- try:
- # UIDNEXT
- typ1, d1 = M.status(args.mailbox, "(UIDNEXT UIDVALIDITY UNSEEN MESSAGES)")
- print("[MAILBOX STATUS]", d1[0].decode() if d1 and d1[0] else d1)
- except Exception as e:
- print("[WARN] STATUS failed:", e, file=sys.stderr)
- try:
- # PERMANENTFLAGS / HIGHESTMODSEQ
- typ2, d2 = M.response("PERMANENTFLAGS")
- print("[PERMANENTFLAGS]", d2)
- except Exception as e:
- print("[WARN] PERMANENTFLAGS failed:", e, file=sys.stderr)
- try:
- typ3, d3 = M.response("HIGHESTMODSEQ")
- print("[HIGHESTMODSEQ]", d3)
- except Exception as e:
- print("[WARN] HIGHESTMODSEQ failed:", e, file=sys.stderr)
- # Build UID list
- uids = []
- if args.uids:
- uids = [u.strip() for u in args.uids.split(",") if u.strip()]
- else:
- # SEARCH
- search_criteria = args.search or ["ALL"]
- # imaplib expects a sequence of *strings* as separate args; join basic tokens by space if user passed multiple
- # For simplicity we join each provided item into a single string; you can pass things like:
- # --search UNSEEN
- # --search 'SINCE 01-Oct-2025'
- for crit in search_criteria:
- typ, data = M.uid("SEARCH", None, crit)
- if typ == "OK" and data and data[0]:
- ids = data[0].decode().split()
- uids.extend(ids)
- # de-duplicate and keep order
- seen = set()
- uids = [u for u in uids if (u not in seen and not seen.add(u))]
- if not uids:
- print("[INFO] No messages matched.")
- M.close(); M.logout()
- return
- # Limit from the end (usually newer messages have larger UIDs)
- if args.limit and len(uids) > args.limit:
- uids = uids[-args.limit:]
- print(f"[INFO] Inspecting {len(uids)} message(s) in {args.mailbox}")
- print("-" * 80)
- for uid in uids:
- flags = fetch_flags(M, uid)
- subj, from_, date_, rep, ref = fetch_headers(M, uid)
- print(f"UID: {uid}")
- print(f"FLAGS: {flags}")
- print(f"From: {from_}")
- print(f"Date: {date_}")
- print(f"Subj: {subj}")
- print(f"Reply-To: {rep}")
- print(f"References: {ref}")
- print("-" * 80)
- try:
- M.close()
- except Exception:
- pass
- M.logout()
- if __name__ == "__main__":
- main()
|