from imapclient import IMAPClient from email import policy from email.parser import BytesParser from datetime import datetime class EmailClient: def __init__(self, email_config: dict): self.config = email_config self.imap_server = None def connect(self): # 连接到 IMAP 服务器 self.imap_server = IMAPClient(self.config['receive_server_address'], self.config['receive_server_port']) # 登录 self.imap_server.login(self.config['email_account'], self.config['email_password']) # 设置标识信息 self.imap_server.id_({"name": "IMAPClient", "version": "2.1.0"}) def check_new_email(self, last_check_uid=None): """ 检查新邮件 :return: """ if not self.imap_server: self.connect() self.imap_server.select_folder('INBOX') uids = self.imap_server.search('UNSEEN') if last_check_uid is not None: new_uid = [uid for uid in uids if uid > last_check_uid] else: new_uid = uids email_details = [] if new_uid: response = self.imap_server.fetch(new_uid, ['BODY[]']) for msgid, data in sorted(response.items(), key=lambda x: x[0]): email_message = BytesParser(policy=policy.default).parsebytes(data[b'BODY[]']) email_details.append({ 'uid': msgid, 'subject': email_message['subject'], 'sender': email_message['from'], 'date': datetime.strptime(email_message['date'], "%a, %d %b %Y %H:%M:%S %z").strftime( '%Y-%m-%d %H:%M:%S'), }) return email_details