|
|
@@ -83,15 +83,89 @@ class Yo(Push):
|
|
|
}
|
|
|
|
|
|
response = requests.post(self.config['url'], json=data.model_dump(), headers=headers)
|
|
|
- print("Yo提醒发送结果:", response.json())
|
|
|
+ print("Yo 提醒发送结果:", response.json())
|
|
|
+
|
|
|
+
|
|
|
+class WeComWebhook(Push):
|
|
|
+ """
|
|
|
+ 企业微信 Webhook 推送
|
|
|
+ """
|
|
|
+ def __init__(self, config: dict):
|
|
|
+ super().__init__(config)
|
|
|
+ self.webhook_url = config.get('webhook_url', '')
|
|
|
+
|
|
|
+ def send_notification(self, data: PushMessage):
|
|
|
+ """
|
|
|
+ 发送企业微信 webhook 通知
|
|
|
+ 支持发送者、接收者、发送时间、主题等信息
|
|
|
+ """
|
|
|
+ if data.messageType == MessageType.EMAIL and data.email:
|
|
|
+ # 使用文本格式发送,兼容性更好
|
|
|
+ content = f"{data.title}\n\n" \
|
|
|
+ f"发件人:{data.email.sender}\n" \
|
|
|
+ f"收件人:{data.email.recipient}\n" \
|
|
|
+ f"发送时间:{data.email.sendTime}\n" \
|
|
|
+ f"主题:{data.email.subject}"
|
|
|
+ else:
|
|
|
+ content = f"{data.title}\n\n{data.content}"
|
|
|
+
|
|
|
+ payload = {
|
|
|
+ "msgtype": "text",
|
|
|
+ "text": {
|
|
|
+ "content": content
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ try:
|
|
|
+ response = requests.post(self.webhook_url, json=payload)
|
|
|
+ result = response.json()
|
|
|
+ print(f"企业微信 Webhook 推送结果:{result}")
|
|
|
+ return result
|
|
|
+ except Exception as e:
|
|
|
+ print(f"企业微信 Webhook 推送失败:{e}")
|
|
|
+ raise
|
|
|
|
|
|
|
|
|
class PushFactory:
|
|
|
@staticmethod
|
|
|
def create_push(config: dict) -> Push:
|
|
|
- if config['type'] == 'Yo':
|
|
|
+ push_type = config.get('type', '')
|
|
|
+ if push_type == 'WeComWebhook':
|
|
|
+ return WeComWebhook(config)
|
|
|
+ elif push_type == 'Yo':
|
|
|
return Yo(config)
|
|
|
- elif config['type'] == 'AiYuFeiFei':
|
|
|
+ elif push_type == 'AiYuFeiFei':
|
|
|
return AiYuFeiFei(config)
|
|
|
else:
|
|
|
- raise ValueError(f'不支持的推送类型: {config.type}')
|
|
|
+ raise ValueError(f'不支持的推送类型:{push_type}')
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ # 测试企业微信 Webhook 推送
|
|
|
+ webhook_config = {
|
|
|
+ 'type': 'WeComWebhook',
|
|
|
+ 'webhook_url': 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=d5b3282e-7531-462f-9b48-e117fb2797f6' # 替换为你的企业微信 webhook 地址
|
|
|
+ }
|
|
|
+
|
|
|
+ # 创建测试邮件消息
|
|
|
+ test_email = PushMessage(
|
|
|
+ title="新邮件通知",
|
|
|
+ content="您有一封来自 [test@example.com] 的邮件",
|
|
|
+ messageType=MessageType.EMAIL,
|
|
|
+ email=Email(
|
|
|
+ sender="张三 <zhangsan@example.com>",
|
|
|
+ recipient="李四 <lisi@example.com>",
|
|
|
+ subject="测试邮件主题",
|
|
|
+ sendTime="2026-03-25 10:30:00",
|
|
|
+ content="这是一封测试邮件的内容...\n\n请查收!"
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ # 测试推送
|
|
|
+ try:
|
|
|
+ push = PushFactory.create_push(webhook_config)
|
|
|
+ print("开始测试企业微信 Webhook 推送...")
|
|
|
+ result = push.send_notification(test_email)
|
|
|
+ print(f"测试完成!返回结果:{result}")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"测试失败:{e}")
|