| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- from imapclient import IMAPClient
- import email
- from email.utils import parsedate_to_datetime, getaddresses, parseaddr
- from email.header import decode_header
- from datetime import datetime
- from bs4 import BeautifulSoup
- import re
- class EmailClient:
- def __init__(self, email_config: dict):
- self.config = email_config
- self.imap_server = None
- def connect(self):
- # 连接到 IMAP 服务器
- self.imap_server = IMAPClient(self.config['receive_server_address'], self.config['receive_server_port'])
- # 登录
- self.imap_server.login(self.config['email_account'], self.config['email_password'])
- # 设置标识信息
- self.imap_server.id_({"name": "IMAPClient", "version": "2.1.0"})
- def extract_links_from_html(self, html_content):
- """
- 从HTML内容中提取超链接
- :param html_content: HTML内容
- :return: 链接列表
- """
- links = []
- try:
- soup = BeautifulSoup(html_content, 'html.parser')
- # 提取所有带href属性的<a>标签
- for link in soup.find_all('a', href=True):
- links.append({
- 'url': link['href'],
- 'text': link.get_text(strip=True)
- })
- except Exception as e:
- print(f"解析HTML链接时出错: {e}")
- return links
- def extract_links_from_text(self, text_content):
- """
- 从文本内容中提取链接
- :param text_content: 文本内容
- :return: 链接列表
- """
- # 使用正则表达式匹配URL
- url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
- urls = re.findall(url_pattern, text_content)
- return [{'url': url, 'text': url} for url in urls]
- def decode_mime_header(self, header):
- """解码 MIME 编码的邮件头"""
- if header is None:
- return ""
- decoded_parts = decode_header(header)
- decoded_str = ""
- for part, charset in decoded_parts:
- if isinstance(part, bytes):
- try:
- # 尝试使用指定的字符集解码
- if charset:
- decoded_str += part.decode(charset, errors='replace')
- else:
- # 如果没有指定字符集,尝试常见字符集
- try:
- decoded_str += part.decode('utf-8', errors='replace')
- except:
- decoded_str += part.decode('latin-1', errors='replace')
- except Exception as e:
- # 如果所有解码都失败,使用替代表示
- decoded_str += part.decode('utf-8', errors='replace')
- else:
- decoded_str += part
- return decoded_str
- def check_new_email(self, last_check_uid=None):
- """
- 检查新邮件
- :return:
- """
- if not self.imap_server:
- self.connect()
- self.imap_server.select_folder('INBOX')
- uids = self.imap_server.search('UNSEEN')
- if last_check_uid is not None:
- new_uid = [uid for uid in uids if uid > last_check_uid]
- else:
- new_uid = uids
- email_details = []
- if new_uid:
- response = self.imap_server.fetch(new_uid, ['BODY[]'])
- for msgid, data in sorted(response.items(), key=lambda x: x[0]):
- try:
- raw_email = data[b'BODY[]']
- email_message = email.message_from_bytes(raw_email)
- # 使用解码函数处理邮件头
- subject = self.decode_mime_header(email_message.get('Subject'))
- from_header = self.decode_mime_header(email_message.get('From'))
- # 解析日期
- date_str = email_message.get('Date')
- date_obj = None
- formatted_date = ""
- if date_str:
- try:
- date_obj = parsedate_to_datetime(date_str)
- formatted_date = date_obj.strftime('%Y-%m-%d %H:%M:%S')
- except Exception as e:
- print(f"日期解析错误: {e}, 原始值: {date_str}")
- formatted_date = date_str # 保留原始日期字符串
- # 解析发件人
- sender_name, sender_email = parseaddr(from_header) if from_header else ('', '')
- # 解析收件人信息
- to_header = self.decode_mime_header(email_message.get('To', ''))
- recipient_name, recipient_email = parseaddr(to_header) if to_header else ('', '')
- # 提取邮件内容
- text_content = ""
- html_content = ""
- # 统一的内容提取函数
- def extract_content(part):
- charset = part.get_content_charset('utf-8')
- payload = part.get_payload(decode=True)
- try:
- content = payload.decode(charset, errors='replace')
- except Exception as decode_error:
- print(f"解码错误: {decode_error}, 尝试其他编码")
- try:
- content = payload.decode('latin-1', errors='replace')
- except:
- content = payload.decode('utf-8', errors='replace')
- return content
- # 处理邮件内容
- if email_message.is_multipart():
- for part in email_message.walk():
- content_type = part.get_content_type()
- # 跳过附件
- content_disposition = part.get("Content-Disposition", "")
- if "attachment" in content_disposition:
- continue
- # 处理正文内容
- if content_type == "text/plain":
- text_content = extract_content(part)
- elif content_type == "text/html":
- html_content = extract_content(part)
- else:
- content = extract_content(email_message)
- content_type = email_message.get_content_type()
- if "html" in content_type.lower():
- html_content = content
- else:
- text_content = content
- # 确定主要内容和类型
- if html_content:
- content = html_content
- content_type = "html"
- elif text_content:
- content = text_content
- content_type = "text"
- else:
- content = ""
- content_type = ""
- email_details.append({
- 'uid': msgid,
- 'subject': subject, # 使用解码后的主题
- 'sender': from_header, # 使用解码后的发件人信息
- 'sender_name': sender_name,
- 'sender_email': sender_email,
- 'recipient': to_header, # 使用解码后的收件人信息
- 'recipient_name': recipient_name,
- 'recipient_email': recipient_email,
- 'date': formatted_date,
- 'content': content,
- 'content_type': content_type,
- })
- except Exception as e:
- print(f"处理邮件 {msgid} 时出错: {e}")
- email_details.append({
- 'uid': msgid,
- 'error': str(e)
- })
- return email_details
- # 添加邮件头解码函数
|