from imapclient import IMAPClient import email from email.utils import parsedate_to_datetime, getaddresses, parseaddr from email.header import decode_header from datetime import datetime from bs4 import BeautifulSoup import re class EmailClient: def __init__(self, email_config: dict): self.config = email_config self.imap_server = None def connect(self): # 连接到 IMAP 服务器 self.imap_server = IMAPClient(self.config['receive_server_address'], self.config['receive_server_port']) # 登录 self.imap_server.login(self.config['email_account'], self.config['email_password']) # 设置标识信息 self.imap_server.id_({"name": "IMAPClient", "version": "2.1.0"}) def extract_links_from_html(self, html_content): """ 从HTML内容中提取超链接 :param html_content: HTML内容 :return: 链接列表 """ links = [] try: soup = BeautifulSoup(html_content, 'html.parser') # 提取所有带href属性的标签 for link in soup.find_all('a', href=True): links.append({ 'url': link['href'], 'text': link.get_text(strip=True) }) except Exception as e: print(f"解析HTML链接时出错: {e}") return links def extract_links_from_text(self, text_content): """ 从文本内容中提取链接 :param text_content: 文本内容 :return: 链接列表 """ # 使用正则表达式匹配URL url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' urls = re.findall(url_pattern, text_content) return [{'url': url, 'text': url} for url in urls] def decode_mime_header(self, header): """解码 MIME 编码的邮件头""" if header is None: return "" decoded_parts = decode_header(header) decoded_str = "" for part, charset in decoded_parts: if isinstance(part, bytes): try: # 尝试使用指定的字符集解码 if charset: decoded_str += part.decode(charset, errors='replace') else: # 如果没有指定字符集,尝试常见字符集 try: decoded_str += part.decode('utf-8', errors='replace') except: decoded_str += part.decode('latin-1', errors='replace') except Exception as e: # 如果所有解码都失败,使用替代表示 decoded_str += part.decode('utf-8', errors='replace') else: decoded_str += part return decoded_str def check_new_email(self, last_check_uid=None): """ 检查新邮件 :return: """ if not self.imap_server: self.connect() self.imap_server.select_folder('INBOX') uids = self.imap_server.search('UNSEEN') if last_check_uid is not None: new_uid = [uid for uid in uids if uid > last_check_uid] else: new_uid = uids email_details = [] if new_uid: response = self.imap_server.fetch(new_uid, ['BODY[]']) for msgid, data in sorted(response.items(), key=lambda x: x[0]): try: raw_email = data[b'BODY[]'] email_message = email.message_from_bytes(raw_email) # 使用解码函数处理邮件头 subject = self.decode_mime_header(email_message.get('Subject')) from_header = self.decode_mime_header(email_message.get('From')) # 解析日期 date_str = email_message.get('Date') date_obj = None formatted_date = "" if date_str: try: date_obj = parsedate_to_datetime(date_str) formatted_date = date_obj.strftime('%Y-%m-%d %H:%M:%S') except Exception as e: print(f"日期解析错误: {e}, 原始值: {date_str}") formatted_date = date_str # 保留原始日期字符串 # 解析发件人 sender_name, sender_email = parseaddr(from_header) if from_header else ('', '') # 解析收件人信息 to_header = self.decode_mime_header(email_message.get('To', '')) recipient_name, recipient_email = parseaddr(to_header) if to_header else ('', '') # 提取邮件内容 text_content = "" html_content = "" # 统一的内容提取函数 def extract_content(part): charset = part.get_content_charset('utf-8') payload = part.get_payload(decode=True) try: content = payload.decode(charset, errors='replace') except Exception as decode_error: print(f"解码错误: {decode_error}, 尝试其他编码") try: content = payload.decode('latin-1', errors='replace') except: content = payload.decode('utf-8', errors='replace') return content # 处理邮件内容 if email_message.is_multipart(): for part in email_message.walk(): content_type = part.get_content_type() # 跳过附件 content_disposition = part.get("Content-Disposition", "") if "attachment" in content_disposition: continue # 处理正文内容 if content_type == "text/plain": text_content = extract_content(part) elif content_type == "text/html": html_content = extract_content(part) else: content = extract_content(email_message) content_type = email_message.get_content_type() if "html" in content_type.lower(): html_content = content else: text_content = content # 确定主要内容和类型 if html_content: content = html_content content_type = "html" elif text_content: content = text_content content_type = "text" else: content = "" content_type = "" email_details.append({ 'uid': msgid, 'subject': subject, # 使用解码后的主题 'sender': from_header, # 使用解码后的发件人信息 'sender_name': sender_name, 'sender_email': sender_email, 'recipient': to_header, # 使用解码后的收件人信息 'recipient_name': recipient_name, 'recipient_email': recipient_email, 'date': formatted_date, 'content': content, 'content_type': content_type, }) except Exception as e: print(f"处理邮件 {msgid} 时出错: {e}") email_details.append({ 'uid': msgid, 'error': str(e) }) return email_details # 添加邮件头解码函数