| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- import json
- from file_util import FileUtil
- from email_client import EmailClient
- from notice import PushFactory, PushMessage, MessageType, Email
- email_config_path = "./config/email/"
- def check_email(config: dict):
- """
- 检查新邮件
- :param config: 邮箱配置
- :return:
- """
- email_config = {
- 'email_account': config['account'],
- 'email_password': config['password'],
- 'connect_type': config['connect_type'],
- 'receive_server_address': config['receive_server_addr'],
- 'receive_server_port': config['receive_server_port']
- }
- print(f"正在检查 {email_config['email_account']} 的邮件...")
- # 检查新邮件
- client = EmailClient(email_config)
- try:
- new_emails = client.check_new_email(config['last_check_uid'])
- print(f"新邮件数量: {len(new_emails)}")
- return new_emails
- except Exception as e:
- print(e)
- print(f"检查邮件时出错: {e}")
- return []
- finally:
- client.disconnect()
- def to_message(email: dict):
- """
- 转换为推送消息
- :param email:
- :return:
- """
- sender_display = email["sender"].replace(' ', '\n').replace('"', '')
- recipient_display = email["recipient"].replace(' ', '\n')
- return PushMessage(
- title="新邮件通知",
- content=f'您有一封来自[{email["sender_name"] if email["sender_name"] else email["sender_email"]}]的邮件',
- messageType=MessageType.EMAIL,
- email=Email(
- sender=sender_display,
- recipient=recipient_display,
- subject=email['subject'],
- sendTime=email['date'],
- content=email['content'],
- )
- )
- if __name__ == '__main__':
- # 1. 读取所有的配置文件
- files = FileUtil.list_files(email_config_path)
- print(f"找到 {len(files)} 个配置文件")
- if len(files) == 0:
- print("没有找到配置文件")
- exit(1)
- # 2. 遍历所有配置文件
- for file in files:
- print(f"正在处理 {file}")
- config = FileUtil.read_json(file)
- # 3. 检查邮件
- email_config = config["email"]
- try:
- emails = check_email(email_config)
- print(f"[{email_config['account']}]新邮件数量: {len(emails)}")
- if len(emails) < 0:
- continue
- current_email = None
- for email in emails:
- current_email = email
- message = to_message(email)
- PushFactory.create_push(config['push']).send_notification(message)
- print(f"[{email_config['account']}]推送结果: 完成")
- if current_email and current_email['uid'] > email_config['last_check_uid']:
- email_config['last_check_uid'] = current_email['uid']
- FileUtil.write_json(file, config)
- print(f'[{email_config["account"]}]更新配置文件: {json.dumps(config, indent=4)}')
- except Exception as ex:
- print(f"[{email_config['account']}]检查邮件时出错: {ex}")
|