email_client.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. from imapclient import IMAPClient
  2. import email
  3. from email.utils import parsedate_to_datetime, getaddresses, parseaddr
  4. from email.header import decode_header
  5. from datetime import datetime
  6. from bs4 import BeautifulSoup
  7. import re
  8. class EmailClient:
  9. def __init__(self, email_config: dict):
  10. self.config = email_config
  11. self.imap_server = None
  12. def connect(self):
  13. # 连接到 IMAP 服务器
  14. self.imap_server = IMAPClient(self.config['receive_server_address'], self.config['receive_server_port'])
  15. # 登录
  16. self.imap_server.login(self.config['email_account'], self.config['email_password'])
  17. # 设置标识信息
  18. self.imap_server.id_({"name": "IMAPClient", "version": "2.1.0"})
  19. def extract_links_from_html(self, html_content):
  20. """
  21. 从HTML内容中提取超链接
  22. :param html_content: HTML内容
  23. :return: 链接列表
  24. """
  25. links = []
  26. try:
  27. soup = BeautifulSoup(html_content, 'html.parser')
  28. # 提取所有带href属性的<a>标签
  29. for link in soup.find_all('a', href=True):
  30. links.append({
  31. 'url': link['href'],
  32. 'text': link.get_text(strip=True)
  33. })
  34. except Exception as e:
  35. print(f"解析HTML链接时出错: {e}")
  36. return links
  37. def extract_links_from_text(self, text_content):
  38. """
  39. 从文本内容中提取链接
  40. :param text_content: 文本内容
  41. :return: 链接列表
  42. """
  43. # 使用正则表达式匹配URL
  44. url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
  45. urls = re.findall(url_pattern, text_content)
  46. return [{'url': url, 'text': url} for url in urls]
  47. def decode_mime_header(self, header):
  48. """解码 MIME 编码的邮件头"""
  49. if header is None:
  50. return ""
  51. decoded_parts = decode_header(header)
  52. decoded_str = ""
  53. for part, charset in decoded_parts:
  54. if isinstance(part, bytes):
  55. try:
  56. # 尝试使用指定的字符集解码
  57. if charset:
  58. decoded_str += part.decode(charset, errors='replace')
  59. else:
  60. # 如果没有指定字符集,尝试常见字符集
  61. try:
  62. decoded_str += part.decode('utf-8', errors='replace')
  63. except:
  64. decoded_str += part.decode('latin-1', errors='replace')
  65. except Exception as e:
  66. # 如果所有解码都失败,使用替代表示
  67. decoded_str += part.decode('utf-8', errors='replace')
  68. else:
  69. decoded_str += part
  70. return decoded_str
  71. def check_new_email(self, last_check_uid=None):
  72. """
  73. 检查新邮件
  74. :return:
  75. """
  76. if not self.imap_server:
  77. self.connect()
  78. self.imap_server.select_folder('INBOX')
  79. uids = self.imap_server.search('UNSEEN')
  80. if last_check_uid is not None:
  81. new_uid = [uid for uid in uids if uid > last_check_uid]
  82. else:
  83. new_uid = uids
  84. email_details = []
  85. if new_uid:
  86. response = self.imap_server.fetch(new_uid, ['BODY[]'])
  87. for msgid, data in sorted(response.items(), key=lambda x: x[0]):
  88. try:
  89. raw_email = data[b'BODY[]']
  90. email_message = email.message_from_bytes(raw_email)
  91. # 使用解码函数处理邮件头
  92. subject = self.decode_mime_header(email_message.get('Subject'))
  93. from_header = self.decode_mime_header(email_message.get('From'))
  94. # 解析日期
  95. date_str = email_message.get('Date')
  96. date_obj = None
  97. formatted_date = ""
  98. if date_str:
  99. try:
  100. date_obj = parsedate_to_datetime(date_str)
  101. formatted_date = date_obj.strftime('%Y-%m-%d %H:%M:%S')
  102. except Exception as e:
  103. print(f"日期解析错误: {e}, 原始值: {date_str}")
  104. formatted_date = date_str # 保留原始日期字符串
  105. # 解析发件人
  106. sender_name, sender_email = parseaddr(from_header) if from_header else ('', '')
  107. # 解析收件人信息
  108. to_header = self.decode_mime_header(email_message.get('To', ''))
  109. recipient_name, recipient_email = parseaddr(to_header) if to_header else ('', '')
  110. # 提取邮件内容
  111. text_content = ""
  112. html_content = ""
  113. # 统一的内容提取函数
  114. def extract_content(part):
  115. charset = part.get_content_charset('utf-8')
  116. payload = part.get_payload(decode=True)
  117. try:
  118. content = payload.decode(charset, errors='replace')
  119. except Exception as decode_error:
  120. print(f"解码错误: {decode_error}, 尝试其他编码")
  121. try:
  122. content = payload.decode('latin-1', errors='replace')
  123. except:
  124. content = payload.decode('utf-8', errors='replace')
  125. return content
  126. # 处理邮件内容
  127. if email_message.is_multipart():
  128. for part in email_message.walk():
  129. content_type = part.get_content_type()
  130. # 跳过附件
  131. content_disposition = part.get("Content-Disposition", "")
  132. if "attachment" in content_disposition:
  133. continue
  134. # 处理正文内容
  135. if content_type == "text/plain":
  136. text_content = extract_content(part)
  137. elif content_type == "text/html":
  138. html_content = extract_content(part)
  139. else:
  140. content = extract_content(email_message)
  141. content_type = email_message.get_content_type()
  142. if "html" in content_type.lower():
  143. html_content = content
  144. else:
  145. text_content = content
  146. # 确定主要内容和类型
  147. if html_content:
  148. content = html_content
  149. content_type = "html"
  150. elif text_content:
  151. content = text_content
  152. content_type = "text"
  153. else:
  154. content = ""
  155. content_type = ""
  156. email_details.append({
  157. 'uid': msgid,
  158. 'subject': subject, # 使用解码后的主题
  159. 'sender': from_header, # 使用解码后的发件人信息
  160. 'sender_name': sender_name,
  161. 'sender_email': sender_email,
  162. 'recipient': to_header, # 使用解码后的收件人信息
  163. 'recipient_name': recipient_name,
  164. 'recipient_email': recipient_email,
  165. 'date': formatted_date,
  166. 'content': content,
  167. 'content_type': content_type,
  168. })
  169. except Exception as e:
  170. print(f"处理邮件 {msgid} 时出错: {e}")
  171. email_details.append({
  172. 'uid': msgid,
  173. 'error': str(e)
  174. })
  175. return email_details
  176. # 添加邮件头解码函数