email_client.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. from imapclient import IMAPClient
  2. from email import policy
  3. from email.parser import BytesParser
  4. from datetime import datetime
  5. class EmailClient:
  6. def __init__(self, email_config: dict):
  7. self.config = email_config
  8. self.imap_server = None
  9. def connect(self):
  10. # 连接到 IMAP 服务器
  11. self.imap_server = IMAPClient(self.config['receive_server_address'], self.config['receive_server_port'])
  12. # 登录
  13. self.imap_server.login(self.config['email_account'], self.config['email_password'])
  14. # 设置标识信息
  15. self.imap_server.id_({"name": "IMAPClient", "version": "2.1.0"})
  16. def check_new_email(self, last_check_uid=None):
  17. """
  18. 检查新邮件
  19. :return:
  20. """
  21. if not self.imap_server:
  22. self.connect()
  23. self.imap_server.select_folder('INBOX')
  24. uids = self.imap_server.search('UNSEEN')
  25. if last_check_uid is not None:
  26. new_uid = [uid for uid in uids if uid > last_check_uid]
  27. else:
  28. new_uid = uids
  29. email_details = []
  30. if new_uid:
  31. response = self.imap_server.fetch(new_uid, ['BODY[]'])
  32. for msgid, data in sorted(response.items(), key=lambda x: x[0]):
  33. email_message = BytesParser(policy=policy.default).parsebytes(data[b'BODY[]'])
  34. email_details.append({
  35. 'uid': msgid,
  36. 'subject': email_message['subject'],
  37. 'sender': email_message['from'],
  38. 'date': datetime.strptime(email_message['date'], "%a, %d %b %Y %H:%M:%S %z").strftime(
  39. '%Y-%m-%d %H:%M:%S'),
  40. })
  41. return email_details