Browse Source

检查邮箱最新邮件并通过爱语飞飞发送通知

孔明 2 months ago
parent
commit
6a49557d4f
5 changed files with 261 additions and 0 deletions
  1. 111 0
      check_new_email.py
  2. 51 0
      email_client.py
  3. 79 0
      mysql_db.py
  4. 20 0
      notice.py
  5. BIN
      requirements.txt

+ 111 - 0
check_new_email.py

@@ -0,0 +1,111 @@
+from mysql_db import DBManager
+from email_client import EmailClient
+from notice import AiYuFeiFei
+
+EMAIL_CONFIG_SQL = '''
+            select email_address,    -- 邮件地址
+                email_password,         -- 邮件密码
+                connection_type,        -- 连接类型
+                receive_server_address, -- 接收邮件服务器地址
+                receive_server_port,    -- 接收邮件服务器端口号
+                iyuu_token,             -- 爱语飞飞token
+                last_check_uid          -- 上次检查的UID
+            from check_email_config;
+        '''
+
+EMAIL_UPDATE_LAST_CHECK_UID_SQL = '''
+            update check_email_config
+                set last_check_uid = %s
+            where email_address = %s;
+        '''
+
+
+def fetch_all_data(db_manager):
+    """
+    获取所有数据
+    :param db_manager: 数据库管理器
+    :return:
+    """
+    try:
+        with db_manager.managed_cursor() as cursor:
+            cursor.execute(EMAIL_CONFIG_SQL)
+            records = cursor.fetchall()
+            if not records:
+                return []
+            return [
+                {
+                    "email_account": record[0],
+                    "email_password": record[1],
+                    "connection_type": record[2],
+                    "receive_server_address": record[3],
+                    "receive_server_port": record[4],
+                    "iyuu_token": record[5],
+                    "last_check_uid": record[6]
+                }
+                for record in records
+            ]
+
+    except Exception as e:
+        print(f"查询邮箱配置失败: {e}")
+        return []
+
+
+def update_last_check_uid(db_manager, email_account, last_check_uid):
+    """
+    更新检查邮件的UID
+    :param db_manager: 数据库管理器
+    :param email_account: 邮箱账号
+    :param last_check_uid: 最后检查的UID
+    :return:
+    """
+    try:
+        with db_manager.managed_cursor() as cursor:
+            cursor.execute(EMAIL_UPDATE_LAST_CHECK_UID_SQL, (last_check_uid, email_account))
+            db_manager.connection.commit()
+            print(f"更新 {email_account} 的本次检测UID为: {last_check_uid}")
+    except Exception as e:
+        print(f"更新UID失败: {e}")
+        db_manager.connection.rollback()
+
+
+def check_new_email(email_config: dict):
+    """
+    检查新邮件
+    :param email_config: 邮箱配置
+    :return:
+    """
+    email_account = email_config['email_account']
+    print(f"正在检查 {email_account} 的邮件...")
+
+    # 检查新邮件
+    client = EmailClient(email_config)
+    new_emails = client.check_new_email(email_config['last_check_uid'])
+    print(f"新邮件数量: {len(new_emails)}")
+
+    # 发送通知
+    current_email = None
+    try:
+        notice = AiYuFeiFei(email_config['iyuu_token'])
+        index = email_account.index("@")
+        title = f"来自【{email_account[:index]}】新的邮件"
+        for email in new_emails:
+            current_email = email
+            context = f'发件人:{email["sender"]} \n主题:{email["subject"]} \n发送时间:{email["date"]}'
+            notice.send_notification(title, context)
+
+    except Exception as e:
+        print(f"邮箱账户{email_account} 发送邮件[{current_email['subject']}]通知失败: {e}")
+
+    # 更新最大UID
+    if current_email and current_email['uid'] > email_config['last_check_uid']:
+        update_last_check_uid(db_manager, email_account, current_email['uid'])
+
+
+if __name__ == '__main__':
+    db_manager = DBManager()
+    records = fetch_all_data(db_manager)
+    if not records:
+        print("没有查询到邮箱配置")
+    else:
+        for row in records:
+            check_new_email(row)

+ 51 - 0
email_client.py

@@ -0,0 +1,51 @@
+from imapclient import IMAPClient
+from email import policy
+from email.parser import BytesParser
+from datetime import datetime
+
+
+class EmailClient:
+    def __init__(self, email_config: dict):
+        self.config = email_config
+        self.imap_server = None
+
+    def connect(self):
+        # 连接到 IMAP 服务器
+        self.imap_server = IMAPClient(self.config['receive_server_address'], self.config['receive_server_port'])
+
+        # 登录
+        self.imap_server.login(self.config['email_account'], self.config['email_password'])
+
+        # 设置标识信息
+        self.imap_server.id_({"name": "IMAPClient", "version": "2.1.0"})
+
+    def check_new_email(self, last_check_uid=None):
+        """
+        检查新邮件
+        :return:
+        """
+        if not self.imap_server:
+            self.connect()
+        self.imap_server.select_folder('INBOX')
+        uids = self.imap_server.search('UNSEEN')
+
+        if last_check_uid is not None:
+            new_uid = [uid for uid in uids if uid > last_check_uid]
+        else:
+            new_uid = uids
+
+        email_details = []
+        if new_uid:
+            response = self.imap_server.fetch(new_uid, ['BODY[]'])
+            for msgid, data in sorted(response.items(), key=lambda x: x[0]):
+                email_message = BytesParser(policy=policy.default).parsebytes(data[b'BODY[]'])
+
+                email_details.append({
+                    'uid': msgid,
+                    'subject': email_message['subject'],
+                    'sender': email_message['from'],
+                    'date': datetime.strptime(email_message['date'], "%a, %d %b %Y %H:%M:%S %z").strftime(
+                        '%Y-%m-%d %H:%M:%S'),
+                })
+
+        return email_details

+ 79 - 0
mysql_db.py

@@ -0,0 +1,79 @@
+import os
+import mysql
+from mysql.connector import Error
+from contextlib import contextmanager
+
+
+class DBManager:
+    def __init__(self):
+        # 从环境变量获取数据库配置
+        self.host = os.getenv('MYSQL_HOST', '100.102.238.76')
+        self.port = int(os.getenv('MYSQL_PORT', '3307'))
+        self.database = os.getenv('MYSQL_DATABASE', 'qinglong')
+        self.user = os.getenv('MYSQL_USER', 'root')
+        self.password = os.getenv('MYSQL_PASSWORD', 'root')
+        self.connection = None
+
+    def connect(self):
+        """建立数据库连接"""
+        try:
+            self.connection = mysql.connector.connect(
+                host=self.host,
+                port=self.port,
+                database=self.database,
+                user=self.user,
+                password=self.password,
+                autocommit=False  # 手动控制事务
+            )
+            if self.connection.is_connected():
+                print("成功连接到MySQL数据库")
+        except Error as e:
+            print(f"连接数据库时出错: {e}")
+            self.connection = None
+
+    @contextmanager
+    def managed_cursor(self):
+        """上下文管理器,自动管理连接和游标"""
+        try:
+            if self.connection is None or not self.connection.is_connected():
+                self.connect()
+
+            if self.connection is None:
+                raise Exception("数据库连接失败")
+
+            cursor = self.connection.cursor()
+            yield cursor  # 在此处返回游标给调用方
+        except Error as e:
+            print(f"数据库操作失败: {e}")
+            # 尝试重新连接
+            self.connect()
+            if self.connection and self.connection.is_connected():
+                cursor = self.connection.cursor()
+                yield cursor
+            else:
+                raise
+        finally:
+            try:
+                if 'cursor' in locals() and cursor:
+                    cursor.close()
+            except:
+                pass  # 忽略关闭游标时的异常
+
+
+# db_manager = DBManager()
+# sql = '''
+#             select email_address,    -- 邮件地址
+#                     email_password,         -- 邮件密码
+#                     connection_type,        -- 连接类型
+#                     receive_server_address, -- 接收邮件服务器地址
+#                     receive_server_port,    -- 接收邮件服务器端口号
+#                     iyuu_token,             -- 爱语飞飞token
+#                     last_check_uid          -- 上次检查的UID
+#                 from check_email_config;
+#             '''
+#
+# with db_manager.managed_cursor() as cursor:
+#     cursor.execute(sql)
+#     result = cursor.fetchall()
+#     for row in result:
+#         print(row)

+ 20 - 0
notice.py

@@ -0,0 +1,20 @@
+import requests
+
+
+class AiYuFeiFei:
+    def __init__(self, token):
+        self.url = f'https://iyuu.cn/{token}.send'
+
+    def send_notification(self, title, content):
+        """
+        发送通知
+        :param title: 通知标题
+        :param content: 通知内容
+        :return:
+        """
+        data = {
+            'text': title,
+            'desp': content
+        }
+        response = requests.post(self.url, json=data)
+        print("爱语飞飞提醒发送结果:", response.json())

BIN
requirements.txt