check_email.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import json
  2. from file_util import FileUtil
  3. from email_client import EmailClient
  4. from notice import PushFactory, PushMessage, MessageType, Email
  5. email_config_path = "./config/email/"
  6. def check_email(config: dict):
  7. """
  8. 检查新邮件
  9. :param config: 邮箱配置
  10. :return:
  11. """
  12. email_config = {
  13. 'email_account': config['account'],
  14. 'email_password': config['password'],
  15. 'connect_type': config['connect_type'],
  16. 'receive_server_address': config['receive_server_addr'],
  17. 'receive_server_port': config['receive_server_port']
  18. }
  19. print(f"正在检查 {email_config['email_account']} 的邮件...")
  20. # 检查新邮件
  21. client = EmailClient(email_config)
  22. try:
  23. new_emails = client.check_new_email(config['last_check_uid'])
  24. print(f"新邮件数量: {len(new_emails)}")
  25. return new_emails
  26. except Exception as e:
  27. print(e)
  28. print(f"检查邮件时出错: {e}")
  29. return []
  30. finally:
  31. client.disconnect()
  32. def to_message(email: dict):
  33. """
  34. 转换为推送消息
  35. :param email:
  36. :return:
  37. """
  38. sender_display = email["sender"].replace(' ', '\n').replace('"', '')
  39. recipient_display = email["recipient"].replace(' ', '\n')
  40. return PushMessage(
  41. title="新邮件通知",
  42. content=f'您有一封来自[{email["sender_name"] if email["sender_name"] else email["sender_email"]}]的邮件',
  43. messageType=MessageType.EMAIL,
  44. email=Email(
  45. sender=sender_display,
  46. recipient=recipient_display,
  47. subject=email['subject'],
  48. sendTime=email['date'],
  49. content=email['content'],
  50. )
  51. )
  52. if __name__ == '__main__':
  53. # 1. 读取所有的配置文件
  54. files = FileUtil.list_files(email_config_path)
  55. print(f"找到 {len(files)} 个配置文件")
  56. if len(files) == 0:
  57. print("没有找到配置文件")
  58. exit(1)
  59. # 2. 遍历所有配置文件
  60. for file in files:
  61. print(f"正在处理 {file}")
  62. config = FileUtil.read_json(file)
  63. # 3. 检查邮件
  64. email_config = config["email"]
  65. try:
  66. emails = check_email(email_config)
  67. print(f"[{email_config['account']}]新邮件数量: {len(emails)}")
  68. if len(emails) < 0:
  69. continue
  70. current_email = None
  71. for email in emails:
  72. current_email = email
  73. message = to_message(email)
  74. PushFactory.create_push(config['push']).send_notification(message)
  75. print(f"[{email_config['account']}]推送结果: 完成")
  76. if current_email and current_email['uid'] > email_config['last_check_uid']:
  77. email_config['last_check_uid'] = current_email['uid']
  78. FileUtil.write_json(file, config)
  79. print(f'[{email_config["account"]}]更新配置文件: {json.dumps(config, indent=4)}')
  80. except Exception as ex:
  81. print(f"[{email_config['account']}]检查邮件时出错: {ex}")