| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- from imapclient import IMAPClient
- from email import policy
- from email.parser import BytesParser
- from datetime import datetime
- class EmailClient:
- def __init__(self, email_config: dict):
- self.config = email_config
- self.imap_server = None
- def connect(self):
- # 连接到 IMAP 服务器
- self.imap_server = IMAPClient(self.config['receive_server_address'], self.config['receive_server_port'])
- # 登录
- self.imap_server.login(self.config['email_account'], self.config['email_password'])
- # 设置标识信息
- self.imap_server.id_({"name": "IMAPClient", "version": "2.1.0"})
- def check_new_email(self, last_check_uid=None):
- """
- 检查新邮件
- :return:
- """
- if not self.imap_server:
- self.connect()
- self.imap_server.select_folder('INBOX')
- uids = self.imap_server.search('UNSEEN')
- if last_check_uid is not None:
- new_uid = [uid for uid in uids if uid > last_check_uid]
- else:
- new_uid = uids
- email_details = []
- if new_uid:
- response = self.imap_server.fetch(new_uid, ['BODY[]'])
- for msgid, data in sorted(response.items(), key=lambda x: x[0]):
- email_message = BytesParser(policy=policy.default).parsebytes(data[b'BODY[]'])
- email_details.append({
- 'uid': msgid,
- 'subject': email_message['subject'],
- 'sender': email_message['from'],
- 'date': datetime.strptime(email_message['date'], "%a, %d %b %Y %H:%M:%S %z").strftime(
- '%Y-%m-%d %H:%M:%S'),
- })
- return email_details
|