#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ imap_flags_inspector.py Inspect FLAGS for messages on an IMAP server (e.g., Gmail), to compare what the client recognizes as read/unread/flagged etc. Usage examples: # 1) List FLAGS for last 200 messages in INBOX python imap_flags_inspector.py --host imap.gmail.com --user you@gmail.com --mailbox INBOX --search ALL --limit 200 # 2) Only UNSEEN messages, show mailbox meta and server capabilities python imap_flags_inspector.py --host imap.gmail.com --user you@gmail.com --mailbox INBOX --search UNSEEN --show-mailbox-meta --show-capabilities # 3) Specify UIDs explicitly (comma-separated) python imap_flags_inspector.py --host imap.gmail.com --user you@gmail.com --mailbox INBOX --uids 12345,12346 Env vars (fallbacks for args): IMAP_HOST, IMAP_PORT, IMAP_SSL, EMAIL_USER, EMAIL_PASS, MAILBOX """ import os import sys import imaplib import argparse from email import message_from_bytes from email.header import decode_header, make_header def env_default(name, default=None, cast=str): v = os.environ.get(name, None) if v is None: return default try: return cast(v) except Exception: return default def decode_field(raw): if not raw: return "" try: # raw is str already? if isinstance(raw, str): return str(make_header(decode_header(raw))) # bytes return str(make_header(decode_header(raw.decode("utf-8", errors="replace")))) except Exception: try: return raw.decode("utf-8", errors="replace") except Exception: return str(raw) def fetch_headers(M, uid): # Only fetch small subset of headers for readability typ, data = M.uid("FETCH", uid, '(BODY.PEEK[HEADER.FIELDS (SUBJECT FROM DATE)])') subject, from_, date_ = "", "", "" if typ == "OK" and data and isinstance(data[0], tuple): msg = message_from_bytes(data[0][1]) subject = decode_field(msg.get("Subject", "")) from_ = decode_field(msg.get("From", "")) date_ = decode_field(msg.get("Date", "")) replyto = decode_field(msg.get("Reply-To", "")) references = decode_field(msg.get("References", "")) return subject, from_, date_, replyto, references def fetch_flags(M, uid): typ, data = M.uid("FETCH", uid, "(FLAGS)") flags = "" if typ == "OK" and data: # data could be [(b'123 (FLAGS (\Seen \Flagged))', b'')] blob = b" ".join(part if isinstance(part, (bytes, bytearray)) else b"" for part in data) flags = blob.decode("utf-8", errors="replace") # extract inside FLAGS (...) if present # not strictly required; printing the blob helps debugging servers return flags def main(): ap = argparse.ArgumentParser(description="Inspect IMAP FLAGS for messages.") ap.add_argument("--host", default=env_default("IMAP_HOST", "imap.gmail.com")) ap.add_argument("--port", type=int, default=env_default("IMAP_PORT", 993, int)) ap.add_argument("--ssl", type=str, default=env_default("IMAP_SSL", "1"), help="1/true/on = IMAP over SSL; 0/false = plain + STARTTLS if --starttls") ap.add_argument("--starttls", action="store_true", help="Use STARTTLS if not using SSL") ap.add_argument("--user", default=env_default("EMAIL_USER", "")) ap.add_argument("--password", default=env_default("EMAIL_PASS", "")) ap.add_argument("--mailbox", default=env_default("MAILBOX", "INBOX")) ap.add_argument("--readonly", action="store_true", help="Select mailbox read-only (safe for inspection).") ap.add_argument("--search", action="append", help="IMAP SEARCH key(s), e.g. ALL / UNSEEN / SINCE 01-Oct-2025. Can be repeated.") ap.add_argument("--uids", help="Comma-separated UID list to fetch (skips SEARCH).") ap.add_argument("--limit", type=int, default=200, help="Max messages to print (after search).") ap.add_argument("--show-capabilities", action="store_true") ap.add_argument("--show-mailbox-meta", action="store_true", help="Show PERMANENTFLAGS / UIDNEXT / UIDVALIDITY / HIGHESTMODSEQ") args = ap.parse_args() use_ssl = str(args.ssl).strip().lower() in ("1", "true", "yes", "on") # Connect if use_ssl: M = imaplib.IMAP4_SSL(args.host, args.port) else: M = imaplib.IMAP4(args.host, args.port) if args.starttls: M.starttls() # Capabilities if args.show_capabilities: try: caps = M.capabilities print("[CAPABILITIES]", caps) except Exception as e: print("[WARN] Get capabilities failed:", e, file=sys.stderr) # Login if not args.user or not args.password: print("ERROR: --user/--password (or EMAIL_USER/EMAIL_PASS) is required.", file=sys.stderr) sys.exit(2) M.login(args.user, args.password) # Select mailbox typ, data = M.select(args.mailbox, readonly=args.readonly) if typ != "OK": print(f"ERROR: Cannot select mailbox {args.mailbox}: {typ} {data}", file=sys.stderr) sys.exit(3) # Show mailbox meta if args.show_mailbox_meta: try: # UIDNEXT typ1, d1 = M.status(args.mailbox, "(UIDNEXT UIDVALIDITY UNSEEN MESSAGES)") print("[MAILBOX STATUS]", d1[0].decode() if d1 and d1[0] else d1) except Exception as e: print("[WARN] STATUS failed:", e, file=sys.stderr) try: # PERMANENTFLAGS / HIGHESTMODSEQ typ2, d2 = M.response("PERMANENTFLAGS") print("[PERMANENTFLAGS]", d2) except Exception as e: print("[WARN] PERMANENTFLAGS failed:", e, file=sys.stderr) try: typ3, d3 = M.response("HIGHESTMODSEQ") print("[HIGHESTMODSEQ]", d3) except Exception as e: print("[WARN] HIGHESTMODSEQ failed:", e, file=sys.stderr) # Build UID list uids = [] if args.uids: uids = [u.strip() for u in args.uids.split(",") if u.strip()] else: # SEARCH search_criteria = args.search or ["ALL"] # imaplib expects a sequence of *strings* as separate args; join basic tokens by space if user passed multiple # For simplicity we join each provided item into a single string; you can pass things like: # --search UNSEEN # --search 'SINCE 01-Oct-2025' for crit in search_criteria: typ, data = M.uid("SEARCH", None, crit) if typ == "OK" and data and data[0]: ids = data[0].decode().split() uids.extend(ids) # de-duplicate and keep order seen = set() uids = [u for u in uids if (u not in seen and not seen.add(u))] if not uids: print("[INFO] No messages matched.") M.close(); M.logout() return # Limit from the end (usually newer messages have larger UIDs) if args.limit and len(uids) > args.limit: uids = uids[-args.limit:] print(f"[INFO] Inspecting {len(uids)} message(s) in {args.mailbox}") print("-" * 80) for uid in uids: flags = fetch_flags(M, uid) subj, from_, date_, rep, ref = fetch_headers(M, uid) print(f"UID: {uid}") print(f"FLAGS: {flags}") print(f"From: {from_}") print(f"Date: {date_}") print(f"Subj: {subj}") print(f"Reply-To: {rep}") print(f"References: {ref}") print("-" * 80) try: M.close() except Exception: pass M.logout() if __name__ == "__main__": main()