check_new_email.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import json
  2. from mysql_db import DBManager
  3. from email_client import EmailClient
  4. from notice import PushFactory, PushMessage, MessageType, Email
  5. EMAIL_CONFIG_SQL = '''
  6. select email_address, -- 邮件地址
  7. email_password, -- 邮件密码
  8. connection_type, -- 连接类型
  9. receive_server_address, -- 接收邮件服务器地址
  10. receive_server_port, -- 接收邮件服务器端口号
  11. push_config, -- 爱语飞飞token
  12. last_check_uid -- 上次检查的UID
  13. from check_email_config;
  14. '''
  15. EMAIL_UPDATE_LAST_CHECK_UID_SQL = '''
  16. update check_email_config
  17. set last_check_uid = %s
  18. where email_address = %s;
  19. '''
  20. def fetch_all_data(db_manager):
  21. """
  22. 获取所有数据
  23. :param db_manager: 数据库管理器
  24. :return:
  25. """
  26. try:
  27. with db_manager.managed_cursor() as cursor:
  28. cursor.execute(EMAIL_CONFIG_SQL)
  29. records = cursor.fetchall()
  30. if not records:
  31. return []
  32. return [
  33. {
  34. "email_account": record[0],
  35. "email_password": record[1],
  36. "connection_type": record[2],
  37. "receive_server_address": record[3],
  38. "receive_server_port": record[4],
  39. "push_config": record[5],
  40. "last_check_uid": record[6]
  41. }
  42. for record in records
  43. ]
  44. except Exception as e:
  45. print(f"查询邮箱配置失败: {e}")
  46. return []
  47. def update_last_check_uid(db_manager, email_account, last_check_uid):
  48. """
  49. 更新检查邮件的UID
  50. :param db_manager: 数据库管理器
  51. :param email_account: 邮箱账号
  52. :param last_check_uid: 最后检查的UID
  53. :return:
  54. """
  55. try:
  56. with db_manager.managed_cursor() as cursor:
  57. cursor.execute(EMAIL_UPDATE_LAST_CHECK_UID_SQL, (last_check_uid, email_account))
  58. db_manager.connection.commit()
  59. print(f"更新 {email_account} 的本次检测UID为: {last_check_uid}")
  60. except Exception as e:
  61. print(f"更新UID失败: {e}")
  62. db_manager.connection.rollback()
  63. def check_new_email(email_config: dict):
  64. """
  65. 检查新邮件
  66. :param email_config: 邮箱配置
  67. :return:
  68. """
  69. if 'push_config' not in email_config:
  70. print("没有配置推送服务")
  71. return
  72. push_config = json.loads(email_config['push_config'])
  73. email_account = email_config['email_account']
  74. print(f"正在检查 {email_account} 的邮件...")
  75. # 检查新邮件
  76. client = EmailClient(email_config)
  77. new_emails = client.check_new_email(email_config['last_check_uid'])
  78. print(f"新邮件数量: {len(new_emails)}")
  79. # 发送通知
  80. current_email = None
  81. try:
  82. for email in new_emails:
  83. current_email = email
  84. message = to_message(email)
  85. PushFactory.create_push(push_config).send_notification(message)
  86. except Exception as e:
  87. print(f"邮箱账户{email_account} 发送邮件[{current_email['subject']}]通知失败: {e}")
  88. # 更新最大UID
  89. if current_email and current_email['uid'] > email_config['last_check_uid']:
  90. update_last_check_uid(db_manager, email_account, current_email['uid'])
  91. def to_message(email: dict):
  92. sender_display = email["sender"].replace(' ', '\n').replace('"', '')
  93. recipient_display = email["recipient"].replace(' ', '\n')
  94. return PushMessage(
  95. title="新邮件通知",
  96. content=f'您有一封来自[{email["sender_name"] if email["sender_name"] else email["sender_email"]}]的邮件',
  97. messageType=MessageType.EMAIL,
  98. email=Email(
  99. sender=sender_display,
  100. recipient=recipient_display,
  101. subject=email['subject'],
  102. sendTime=email['date'],
  103. content=email['content'],
  104. )
  105. )
  106. if __name__ == '__main__':
  107. db_manager = DBManager()
  108. records = fetch_all_data(db_manager)
  109. if not records:
  110. print("没有查询到邮箱配置")
  111. else:
  112. for row in records:
  113. check_new_email(row)