用Python通过IMAP读取邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# -*- coding: utf-8 -*-
import imaplib
imaplib._MAXLINE = 10000000
import email
import email.utils
from email import policy
import re
import os
import pytz
import traceback


def decode(s, charset):
if type(s) is str:
return s
try:
return s.decode(charset)
except Exception:
pass
try:
return s.decode('utf-8')
except Exception:
pass
try:
return s.decode('latin1')
except Exception as e:
pass
return s.decode('gbk')


class Attachment:
def __init__(self, part):
self.content_type = part.get_content_type()
raw_filename = part.get_filename() # .strip()
# print(dir(part), raw_filename)
if raw_filename.startswith("=?") and raw_filename.endswith("?="):
dh = email.header.decode_header(raw_filename)
self.filename = decode(dh[0][0], dh[0][1])
else:
h = email.header.Header(raw_filename)
dh = email.header.decode_header(h)
self.filename = decode(dh[0][0], dh[0][1])
self.data = part.get_payload(decode=True) #下载附件

def __repr__(self):
return f"Attachment(content_type='{self.content_type}', filename='{self.filename}', size={len(self.data)})"

def save_to(self, path):
if os.path.exists(path):
if os.path.isdir(path): # 已附件原文件名保存到目录下
path = os.path.join(path, self.filename)
with open(path, 'wb') as fp:
fp.write(self.data)
else: # 覆盖已存在文件
with open(path, 'wb') as fp:
fp.write(self.data)
else: # 新建文件
with open(path, 'wb') as fp:
fp.write(self.data)


class Mail:
def __init__(self, num, msg):
# 这些字段是在读取邮件列表时就解析的
self.num = num
self.subject: str = self._decode_value(msg.get("subject"))
date = email.utils.parsedate_to_datetime(msg.get("date"))
if date:
timezone = pytz.timezone('Asia/Shanghai')
date = date.astimezone(timezone) # 设置时区为+8区
date = date.replace(tzinfo=None) # 移除时区信息
self.date: str = str(date) if date else msg.get("date")

# 修复:先解码整个头字段再解析地址
from_header = self._decode_value(msg.get("from"))
to_header = self._decode_value(msg.get("to"))

from_name, self.from_addr = email.utils.parseaddr(from_header)
self.from_name = self._decode_value(from_name)
to_name, self.to_addr = email.utils.parseaddr(to_header)
self.to_name = self._decode_value(to_name)

# 这些字段是延迟到需要访问时才解析的
self._plain: str = ""
self._html: str = ""
self._attachments: list = []

self._msg: str = msg
self._parsed: bool = False

def _decode_value(self, value):
if value is None:
return ""
try:
# 1. 先解码 RFC 2047 编码部分
decoded_parts = []
for part, charset in email.header.decode_header(value):
if isinstance(part, bytes):
# 优先用原始字符集,失败则尝试 UTF-8/GBK
try:
charset = charset or 'utf-8'
decoded = part.decode(charset)
except (LookupError, UnicodeDecodeError):
try:
decoded = part.decode('gbk')
except UnicodeDecodeError:
decoded = part.decode('utf-8', errors='replace')
decoded_parts.append(decoded)
else:
decoded_parts.append(part)

# 2. 合并分段并清理所有多余空格
full_str = ''.join(decoded_parts)
# 移除头折叠遗留的换行符和连续空格
full_str = re.sub(r'\s+', ' ', full_str) # 合并连续空白
return full_str.strip() # 移除首尾空格

except Exception as e:
print(f"Header decode error: {e}")
return str(value).strip() # 返回并清理空格

@property
def plain(self):
# 为了延迟解析邮件内容
if not self._parsed:
self.parse_content()
return self._plain

@property
def html(self):
if not self._parsed:
self.parse_content()
return self._html

@property
def attachments(self):
if not self._parsed:
self.parse_content()
return self._attachments

# 解析mail的内容
def parse_content(self):
self._attachments = []
for part in self._msg.walk():
if part.is_multipart():
continue
if part.get_content_type() == "text/plain":
charset = part.get_content_charset()
content = decode(part.get_payload(decode=True), charset)
self._plain = content
if part.get_content_type() == "text/html":
charset = part.get_content_charset()
content = decode(part.get_payload(decode=True), charset)
self._html = content

if part.get_content_disposition():
if part.get_content_disposition() == "inline":
# HTML内容引用的图片之类的
pass
elif part.get_content_disposition() == "attachment":
# 附件
self._attachments.append(Attachment(part))
if self._plain:
self._html = ""
self._parsed = True


class ImapMailBox:
def __init__(self, host, port, username, password):
self.host = host
self.port = port
self.username = username
self.password = password
# connecting to host via SSL
self.conn = imaplib.IMAP4_SSL(host=host, port=port)
# logging in to servers
self.conn.login(username, password)

def get_mail_count(self):
# Selecting the inbox of the logged in account
self.conn.select('Inbox')
state, data = self.conn.search(None, 'ALL')
mail_list = []
mails = data[0].split()
return len(mails)

def get_mail_list(self, page=1, page_size=50):
# Selecting the inbox of the logged in account
self.conn.select('Inbox')
state, data = self.conn.search(None, 'ALL')
mail_list = []
mails = data[0].split()[::-1]
if page_size:
mails = mails[(page-1)*page_size: page*page_size]
for num in mails:
state, data = self.conn.fetch(num, '(RFC822)')
raw_email = data[0][1]
try:
msg = email.message_from_bytes(raw_email, policy=policy.default)
mail = Mail(num, msg)
mail_list.append(mail)
except Exception as e:
print(f"Parse raw data failed. [raw_data] '{raw_email}'")
traceback.print_exc()
return mail_list

def mark_as_seen(self, mail):
self.conn.store(mail.num, '+FLAGS', '\\seen')

if __name__ == '__main__':
mailbox = ImapMailBox(
host='imap.aliyun.com', port=993,
username="******", password="******"
)
count = mailbox.get_mail_count()
# 收件箱里的邮件数
print(count)
# 分页获取邮件
for mail in mailbox.get_mail_list(page=1, page_size=25):
# 打印 日期、发件人、标题、纯文本内容
print(mail.date, mail.from_addr, mail.subject, mail.plain)

# 如果有附件,就下载保存到本地
if mail.attachments:
for attachment in mail.attachments:
attachment.save_to("./")