Browse Source

feat(notice): 重构通知模块并支持 Yo 推送

- 重构了 notice 模块,引入了 PushMessage 和 Email 模型
- 新增 Yo 推送支持
- 修改了从邮件到推送消息的转换逻辑
- 优化了推送消息的发送方式
孔明 1 month ago
parent
commit
ef547fa332
3 changed files with 270 additions and 31 deletions
  1. 13 8
      check_new_email.py
  2. 166 11
      email_client.py
  3. 91 12
      notice.py

+ 13 - 8
check_new_email.py

@@ -1,6 +1,8 @@
+import json
+
 from mysql_db import DBManager
 from email_client import EmailClient
-from notice import AiYuFeiFei
+from notice import AiYuFeiFei, from_email, PushFactory
 
 EMAIL_CONFIG_SQL = '''
             select email_address,    -- 邮件地址
@@ -8,7 +10,7 @@ EMAIL_CONFIG_SQL = '''
                 connection_type,        -- 连接类型
                 receive_server_address, -- 接收邮件服务器地址
                 receive_server_port,    -- 接收邮件服务器端口号
-                iyuu_token,             -- 爱语飞飞token
+                push_config,             -- 爱语飞飞token
                 last_check_uid          -- 上次检查的UID
             from check_email_config;
         '''
@@ -39,7 +41,7 @@ def fetch_all_data(db_manager):
                     "connection_type": record[2],
                     "receive_server_address": record[3],
                     "receive_server_port": record[4],
-                    "iyuu_token": record[5],
+                    "push_config": record[5],
                     "last_check_uid": record[6]
                 }
                 for record in records
@@ -74,6 +76,12 @@ def check_new_email(email_config: dict):
     :param email_config: 邮箱配置
     :return:
     """
+    if 'push_config' not in email_config:
+        print("没有配置推送服务")
+        return
+
+    push_config = json.loads(email_config['push_config'])
+
     email_account = email_config['email_account']
     print(f"正在检查 {email_account} 的邮件...")
 
@@ -85,13 +93,10 @@ def check_new_email(email_config: dict):
     # 发送通知
     current_email = None
     try:
-        notice = AiYuFeiFei(email_config['iyuu_token'])
-        index = email_account.index("@")
-        title = f"【{email_account[:index]}】新邮件通知"
         for email in new_emails:
             current_email = email
-            context = f'发件人:{email["sender"]} \n主题:{email["subject"]} \n发送时间:{email["date"]}'
-            notice.send_notification(title, context)
+            message = from_email(email)
+            PushFactory.create_push(push_config).send_notification(message)
 
     except Exception as e:
         print(f"邮箱账户{email_account} 发送邮件[{current_email['subject']}]通知失败: {e}")

+ 166 - 11
email_client.py

@@ -1,7 +1,10 @@
 from imapclient import IMAPClient
-from email import policy
-from email.parser import BytesParser
+import email
+from email.utils import parsedate_to_datetime, getaddresses, parseaddr
+from email.header import decode_header
 from datetime import datetime
+from bs4 import BeautifulSoup
+import re
 
 
 class EmailClient:
@@ -19,6 +22,64 @@ class EmailClient:
         # 设置标识信息
         self.imap_server.id_({"name": "IMAPClient", "version": "2.1.0"})
 
+    def extract_links_from_html(self, html_content):
+        """
+        从HTML内容中提取超链接
+        :param html_content: HTML内容
+        :return: 链接列表
+        """
+        links = []
+        try:
+            soup = BeautifulSoup(html_content, 'html.parser')
+            # 提取所有带href属性的<a>标签
+            for link in soup.find_all('a', href=True):
+                links.append({
+                    'url': link['href'],
+                    'text': link.get_text(strip=True)
+                })
+        except Exception as e:
+            print(f"解析HTML链接时出错: {e}")
+        return links
+
+    def extract_links_from_text(self, text_content):
+        """
+        从文本内容中提取链接
+        :param text_content: 文本内容
+        :return: 链接列表
+        """
+        # 使用正则表达式匹配URL
+        url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
+        urls = re.findall(url_pattern, text_content)
+        return [{'url': url, 'text': url} for url in urls]
+
+    def decode_mime_header(self, header):
+        """解码 MIME 编码的邮件头"""
+        if header is None:
+            return ""
+
+        decoded_parts = decode_header(header)
+        decoded_str = ""
+
+        for part, charset in decoded_parts:
+            if isinstance(part, bytes):
+                try:
+                    # 尝试使用指定的字符集解码
+                    if charset:
+                        decoded_str += part.decode(charset, errors='replace')
+                    else:
+                        # 如果没有指定字符集,尝试常见字符集
+                        try:
+                            decoded_str += part.decode('utf-8', errors='replace')
+                        except:
+                            decoded_str += part.decode('latin-1', errors='replace')
+                except Exception as e:
+                    # 如果所有解码都失败,使用替代表示
+                    decoded_str += part.decode('utf-8', errors='replace')
+            else:
+                decoded_str += part
+
+        return decoded_str
+
     def check_new_email(self, last_check_uid=None):
         """
         检查新邮件
@@ -38,14 +99,108 @@ class EmailClient:
         if new_uid:
             response = self.imap_server.fetch(new_uid, ['BODY[]'])
             for msgid, data in sorted(response.items(), key=lambda x: x[0]):
-                email_message = BytesParser(policy=policy.default).parsebytes(data[b'BODY[]'])
-
-                email_details.append({
-                    'uid': msgid,
-                    'subject': email_message['subject'],
-                    'sender': email_message['from'],
-                    'date': datetime.strptime(email_message['date'], "%a, %d %b %Y %H:%M:%S %z").strftime(
-                        '%Y-%m-%d %H:%M:%S'),
-                })
+                try:
+                    raw_email = data[b'BODY[]']
+                    email_message = email.message_from_bytes(raw_email)
+
+                    # 使用解码函数处理邮件头
+                    subject = self.decode_mime_header(email_message.get('Subject'))
+                    from_header = self.decode_mime_header(email_message.get('From'))
+
+                    # 解析日期
+                    date_str = email_message.get('Date')
+                    date_obj = None
+                    formatted_date = ""
+                    if date_str:
+                        try:
+                            date_obj = parsedate_to_datetime(date_str)
+                            formatted_date = date_obj.strftime('%Y-%m-%d %H:%M:%S')
+                        except Exception as e:
+                            print(f"日期解析错误: {e}, 原始值: {date_str}")
+                            formatted_date = date_str  # 保留原始日期字符串
+
+                    # 解析发件人
+                    sender_name, sender_email = parseaddr(from_header) if from_header else ('', '')
+
+                    # 解析收件人信息
+                    to_header = self.decode_mime_header(email_message.get('To', ''))
+                    recipient_name, recipient_email = parseaddr(to_header) if to_header else ('', '')
+
+                    # 提取邮件内容
+                    text_content = ""
+                    html_content = ""
+
+                    # 统一的内容提取函数
+                    def extract_content(part):
+                        charset = part.get_content_charset('utf-8')
+                        payload = part.get_payload(decode=True)
+
+                        try:
+                            content = payload.decode(charset, errors='replace')
+                        except Exception as decode_error:
+                            print(f"解码错误: {decode_error}, 尝试其他编码")
+                            try:
+                                content = payload.decode('latin-1', errors='replace')
+                            except:
+                                content = payload.decode('utf-8', errors='replace')
+
+                        return content
+
+                    # 处理邮件内容
+                    if email_message.is_multipart():
+                        for part in email_message.walk():
+                            content_type = part.get_content_type()
+
+                            # 跳过附件
+                            content_disposition = part.get("Content-Disposition", "")
+                            if "attachment" in content_disposition:
+                                continue
+
+                            # 处理正文内容
+                            if content_type == "text/plain":
+                                text_content = extract_content(part)
+                            elif content_type == "text/html":
+                                html_content = extract_content(part)
+                    else:
+                        content = extract_content(email_message)
+                        content_type = email_message.get_content_type()
+                        if "html" in content_type.lower():
+                            html_content = content
+                        else:
+                            text_content = content
+
+                    # 确定主要内容和类型
+                    if html_content:
+                        content = html_content
+                        content_type = "html"
+                    elif text_content:
+                        content = text_content
+                        content_type = "text"
+                    else:
+                        content = ""
+                        content_type = ""
+
+                    email_details.append({
+                        'uid': msgid,
+                        'subject': subject,  # 使用解码后的主题
+                        'sender': from_header,  # 使用解码后的发件人信息
+                        'sender_name': sender_name,
+                        'sender_email': sender_email,
+                        'recipient': to_header,  # 使用解码后的收件人信息
+                        'recipient_name': recipient_name,
+                        'recipient_email': recipient_email,
+                        'date': formatted_date,
+                        'content': content,
+                        'content_type': content_type,
+                    })
+
+                except Exception as e:
+                    print(f"处理邮件 {msgid} 时出错: {e}")
+                    email_details.append({
+                        'uid': msgid,
+                        'error': str(e)
+                    })
 
         return email_details
+
+    # 添加邮件头解码函数

+ 91 - 12
notice.py

@@ -1,20 +1,99 @@
 import requests
+from pydantic import BaseModel, Field
+from enum import Enum
 
 
-class AiYuFeiFei:
-    def __init__(self, token):
-        self.url = f'https://iyuu.cn/{token}.send'
+class MessageType(Enum):
+    DEFAULT = 'DEFAULT',
+    TEXT = 'TEXT',
+    MARKDOWN = 'MARKDOWN',
+    EMAIL = 'EMAIL'
 
-    def send_notification(self, title, content):
-        """
-        发送通知
-        :param title: 通知标题
-        :param content: 通知内容
-        :return:
-        """
+
+class Email(BaseModel):
+    sender: str = Field(..., description='发件人')
+    recipient: str = Field(..., description='收件人')
+    subject: str = Field(..., description='主题')
+    sendTime: str = Field(..., description='发送时间')
+    content: str = Field(..., description='内容')
+
+
+class PushMessage(BaseModel):
+    title: str = Field(..., description='标题')
+    content: str = Field(..., description='通知内容')
+    messageType: MessageType = Field(..., description='消息类型')
+    text: str = Field(None, description='文本内容')
+    email: Email = Field(None, description='邮件内容')
+
+    def model_dump(self):
+        # 重写model_dump方法以正确处理枚举类型
+        data = super().model_dump()
+        data['messageType'] = self.messageType.value
+        return data
+
+
+def from_email(email: dict):
+    sender_display = email["sender"].replace(' ', '\n').replace('"', '')
+    recipient_display = email["recipient"].replace(' ', '\n')
+
+    print("邮件内容:", email)
+
+    return PushMessage(
+        title="新邮件通知",
+        content=f'您有一封来自[{email["sender_name"] if email["sender_name"] else email["sender_email"]}]的邮件',
+        messageType=MessageType.EMAIL,
+        email=Email(
+            sender=sender_display,
+            recipient=recipient_display,
+            subject=email['subject'],
+            sendTime=email['date'],
+            content=email['content'],
+        )
+    )
+
+
+class Push:
+    def __init__(self, config: dict):
+        self.config = config
+
+    def send_notification(self, data: PushMessage):
+        pass
+
+
+class AiYuFeiFei(Push):
+    def __init__(self, config: dict):
+        super().__init__(config)
+        self.url = f'https://iyuu.cn/{config["token"]}.send'
+
+    def send_notification(self, data: PushMessage):
         data = {
-            'text': title,
-            'desp': content
+            'text': data.title,
+            'desp': data.content
         }
         response = requests.post(self.url, json=data)
         print("爱语飞飞提醒发送结果:", response.json())
+
+
+class Yo(Push):
+
+    def __init__(self, config: dict):
+        super().__init__(config)
+
+    def send_notification(self, data: PushMessage):
+        headers = {
+            'x-api-key': self.config['token']
+        }
+
+        response = requests.post(self.config['url'], json=data.model_dump(), headers=headers)
+        print("Yo提醒发送结果:", response.json())
+
+
+class PushFactory:
+    @staticmethod
+    def create_push(config: dict) -> Push:
+        if config['type'] == 'Yo':
+            return Yo(config)
+        elif config['type'] == 'AiYuFeiFei':
+            return AiYuFeiFei(config)
+        else:
+            raise ValueError(f'不支持的推送类型: {config.type}')