Переглянути джерело

feat(email): 添加邮件检查功能实现

- 新增 check_email.py 文件实现邮件检查核心逻辑
- 新增 file_util.py 文件提供文件操作工具类
- 在 email_client.py 中添加 disconnect 方法用于断开连接
- 修改 .gitignore 添加 config 目录忽略规则
- 实现邮件配置文件读取和处理功能
- 添加邮件推送通知转换功能
- 实现邮件连接管理和错误处理机制
孔明 2 днів тому
батько
коміт
7802b6aa22
4 змінених файлів з 232 додано та 1 видалено
  1. 1 1
      .gitignore
  2. 100 0
      check_email.py
  3. 6 0
      email_client.py
  4. 125 0
      file_util.py

+ 1 - 1
.gitignore

@@ -57,4 +57,4 @@ docs/_build/
 
 # PyBuilder
 target/
-
+config/

+ 100 - 0
check_email.py

@@ -0,0 +1,100 @@
+import json
+from file_util import FileUtil
+from email_client import EmailClient
+from notice import PushFactory, PushMessage, MessageType, Email
+
+email_config_path = "./config/email/"
+
+
+def check_email(config: dict):
+    """
+    检查新邮件
+    :param config: 邮箱配置
+    :return:
+    """
+
+    email_config = {
+        'email_account': config['account'],
+        'email_password': config['password'],
+        'connect_type': config['connect_type'],
+        'receive_server_address': config['receive_server_addr'],
+        'receive_server_port': config['receive_server_port']
+    }
+
+    print(f"正在检查 {email_config['email_account']} 的邮件...")
+
+    # 检查新邮件
+    client = EmailClient(email_config)
+    try:
+        new_emails = client.check_new_email(config['last_check_uid'])
+        print(f"新邮件数量: {len(new_emails)}")
+        return new_emails
+    except Exception as e:
+        print(e)
+        print(f"检查邮件时出错: {e}")
+        return []
+    finally:
+        client.disconnect()
+
+
+def to_message(email: dict):
+    """
+    转换为推送消息
+    :param email:
+    :return:
+    """
+
+    sender_display = email["sender"].replace(' ', '\n').replace('"', '')
+    recipient_display = email["recipient"].replace(' ', '\n')
+
+    return PushMessage(
+        title="新邮件通知",
+        content=f'您有一封来自[{email["sender_name"] if email["sender_name"] else email["sender_email"]}]的邮件',
+        messageType=MessageType.EMAIL,
+        email=Email(
+            sender=sender_display,
+            recipient=recipient_display,
+            subject=email['subject'],
+            sendTime=email['date'],
+            content=email['content'],
+        )
+    )
+
+
+if __name__ == '__main__':
+    # 1. 读取所有的配置文件
+    files = FileUtil.list_files(email_config_path)
+    print(f"找到 {len(files)} 个配置文件")
+    if len(files) == 0:
+        print("没有找到配置文件")
+        exit(1)
+
+    # 2. 遍历所有配置文件
+    for file in files:
+        print(f"正在处理 {file}")
+        config = FileUtil.read_json(file)
+
+        # 3. 检查邮件
+        email_config = config["email"]
+
+        try:
+            emails = check_email(email_config)
+            print(f"[{email_config['account']}]新邮件数量: {len(emails)}")
+
+            if len(emails) < 0:
+                continue
+
+            current_email = None
+            for email in emails:
+                current_email = email
+                message = to_message(email)
+                PushFactory.create_push(config['push']).send_notification(message)
+                print(f"[{email_config['account']}]推送结果: 完成")
+
+            if current_email and current_email['uid'] > email_config['last_check_uid']:
+                email_config['last_check_uid'] = current_email['uid']
+                FileUtil.write_json(file, config)
+                print(f'[{email_config["account"]}]更新配置文件: {json.dumps(config, indent=4)}')
+
+        except Exception as ex:
+            print(f"[{email_config['account']}]检查邮件时出错: {ex}")

+ 6 - 0
email_client.py

@@ -22,6 +22,12 @@ class EmailClient:
         # 设置标识信息
         self.imap_server.id_({"name": "IMAPClient", "version": "2.1.0"})
 
+    def disconnect(self):
+        """断开 IMAP 连接"""
+        if self.imap_server:
+            self.imap_server.logout()
+            self.imap_server = None
+
     def extract_links_from_html(self, html_content):
         """
         从HTML内容中提取超链接

+ 125 - 0
file_util.py

@@ -0,0 +1,125 @@
+import json
+import os
+
+
+class FileUtil:
+    """
+    文件操作工具类。
+    """
+
+    @staticmethod
+    def list_files(folder_path, recursive=False):
+        """
+        列出指定文件夹下的所有文件。
+
+        :param folder_path: 文件夹路径
+        :param recursive: 是否递归遍历子文件夹,默认为 False
+        :return: 文件路径列表
+        :raises FileNotFoundError: 如果文件夹不存在
+        :raises PermissionError: 如果无权限访问文件夹
+        """
+        if not os.path.exists(folder_path):
+            raise FileNotFoundError(f"文件夹未找到: {folder_path}")
+        if not os.path.isdir(folder_path):
+            raise ValueError(f"路径不是文件夹: {folder_path}")
+
+        files = []
+        try:
+            if recursive:
+                for root, _, filenames in os.walk(folder_path):
+                    for filename in filenames:
+                        files.append(os.path.join(root, filename))
+            else:
+                for entry in os.listdir(folder_path):
+                    full_path = os.path.join(folder_path, entry)
+                    if os.path.isfile(full_path):
+                        files.append(full_path)
+        except PermissionError as e:
+            raise PermissionError(f"无权限访问文件夹: {e}")
+
+        return files
+
+    @staticmethod
+    def read_file(file_path):
+        """
+        从指定文件中读取内容。
+
+        :param file_path: 文件路径
+        :return: 文件内容(字符串)
+        :raises FileNotFoundError: 如果文件不存在
+        :raises IOError: 如果读取文件时发生错误
+        """
+        try:
+            with open(file_path, 'r', encoding='utf-8') as file:
+                return file.read()
+        except FileNotFoundError:
+            raise FileNotFoundError(f"文件未找到: {file_path}")
+        except IOError as e:
+            raise IOError(f"读取文件时发生错误: {e}")
+
+    @staticmethod
+    def write_file(file_path, content):
+        """
+        将内容写入到指定文件中。
+
+        :param file_path: 文件路径
+        :param content: 要写入的内容(字符串)
+        :raises IOError: 如果写入文件时发生错误
+        """
+        # 确保文件夹存在
+        directory = os.path.dirname(file_path)
+        if directory and not os.path.exists(directory):
+            os.makedirs(directory)
+        try:
+            with open(file_path, 'w', encoding='utf-8') as file:
+                file.write(content)
+        except IOError as e:
+            raise IOError(f"写入文件时发生错误: {e}")
+
+    @staticmethod
+    def read_json(file_path):
+        """
+        从指定文件中读取内容并解析为 JSON。
+
+        :param file_path: 文件路径
+        :return: 解析后的 JSON 对象(字典或列表)
+        :raises FileNotFoundError: 如果文件不存在
+        :raises IOError: 如果读取文件时发生错误
+        :raises json.JSONDecodeError: 如果 JSON 解析失败
+        """
+        # 检查文件是否存在且非空
+        if not os.path.exists(file_path):
+            raise FileNotFoundError(f"文件未找到: {file_path}")
+        if os.path.getsize(file_path) == 0:
+            raise ValueError(f"文件内容为空: {file_path}")
+        try:
+            with open(file_path, 'r', encoding='utf-8') as file:
+                return json.load(file)
+        except FileNotFoundError:
+            raise FileNotFoundError(f"文件未找到: {file_path}")
+        except IOError as e:
+            raise IOError(f"读取文件时发生错误: {e}")
+        except json.JSONDecodeError as e:
+            raise json.JSONDecodeError(f"JSON 解析失败: {e.msg}", e.doc, e.pos)
+
+    @staticmethod
+    def write_json(file_path, data):
+        """
+        将 JSON 数据写入到指定文件中。
+
+        :param file_path: 文件路径
+        :param data: 要写入的 JSON 数据(字典、列表等)
+        :raises IOError: 如果写入文件时发生错误
+        :raises TypeError: 如果数据无法序列化为 JSON
+        """
+        # 确保文件夹存在
+        directory = os.path.dirname(file_path)
+        if directory and not os.path.exists(directory):
+            os.makedirs(directory)
+        try:
+            with open(file_path, 'w', encoding='utf-8') as file:
+                json.dump(data, file, ensure_ascii=False, indent=4)
+        except IOError as e:
+            raise IOError(f"写入文件时发生错误: {e}")
+        except TypeError as e:
+            raise TypeError(f"数据无法序列化为 JSON: {e}")