Jelajahi Sumber

refactor(dns): 重构DNS解析逻辑并优化IP验证功能

- 移除原有的CSRF token获取机制和异步DNS查询方法
- 添加re模块用于IP地址格式验证
- 实现validate_ip函数,支持IPv4和IPv6地址格式验证
- 使用Google DNS API替换原有DNS查询方式
- 添加get_domain_ips函数,支持A/AAAA记录查询
- 重构主流程逻辑,移除异步处理改为同步执行
- 简化错误处理和重试机制
- 移除IPv6相关功能代码,专注于IPv4支持
孔明 1 Minggu lalu
induk
melakukan
b3a57a4203
1 mengubah file dengan 143 tambahan dan 85 penghapusan
  1. 143 85
      dns_optimizer.py

+ 143 - 85
dns_optimizer.py

@@ -3,6 +3,7 @@ from time import sleep
 import random
 import time
 import os
+import re
 import sys
 from datetime import datetime, timezone, timedelta
 from retry import retry
@@ -54,54 +55,6 @@ def write_host_file(file_path: str, hosts_content: str, filename: str) -> None:
         print(f"\n~最新{filename}地址已更新~")
 
 
-@retry(tries=3)
-def get_csrf_token(udp):
-    """获取CSRF Token"""
-    try:
-        url = f'https://dnschecker.org/ajax_files/gen_csrf.php?udp={udp}'
-        headers = {
-            'referer': 'https://dnschecker.org/country/{country_code}/',
-            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0'
-        }
-
-        response = requests.get(url, headers=headers)
-        if response.status_code == 200:
-            csrf = response.json().get('csrf')
-            print(f"获取到的CSRF Token: {csrf}")
-            return csrf
-        else:
-            print(f"获取CSRF Token失败,HTTP状态码: {response.status_code}")
-            return None
-    except Exception as e:
-        print(f"获取CSRF Token时发生错误: {str(e)}")
-        return None
-
-
-async def async_get_domain_ips(session, domain, csrf_token, udp, argument):
-    url = f'https://dnschecker.org/ajax_files/api/220/{argument}/{domain}?dns_key=country&dns_value={country_code}&v=0.36&cd_flag=1&upd={udp}'
-    headers = {'csrftoken': csrf_token, 'referer': f'https://dnschecker.org/country/{country_code}/',
-               'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0'}
-
-    try:
-        async with session.get(url, headers=headers) as response:
-            if response.status == 200:
-                data = await response.json()
-                if 'result' in data and 'ips' in data['result']:
-                    ips_str = data['result']['ips']
-                    if '<br />' in ips_str:
-                        return [ip.strip() for ip in ips_str.split('<br />') if ip.strip()]
-                    else:
-                        return [ips_str.strip()] if ips_str.strip() else []
-                else:
-                    print(f"获取 {domain} 的IP列表失败:返回数据格式不正确")
-                    return []
-            else:
-                print(f"获取 {domain} 的IP列表失败,HTTP状态码: {response.status}")
-                return []
-    except Exception as e:
-        print(f"获取 {domain} 的IP列表时发生错误: {str(e)}")
-        return []
-
 
 def ping_ip(ip, port=80):
     print(f"使用TCP连接测试IP地址的延迟(毫秒)")
@@ -152,45 +105,155 @@ def find_fastest_ip(ips):
     return fastest_ip
 
 
-async def process_domain(session, domain, csrf_token, udp):
-    print(f"\n正在处理域名: {domain}")
-    ipv4_ips = await async_get_domain_ips(session, domain, csrf_token, udp, "A")
+# async def process_domain(session, domain, csrf_token, udp):
+#     print(f"\n正在处理域名: {domain}")
+#     ipv4_ips = await async_get_domain_ips(session, domain, csrf_token, udp, "A")
+#
+#     if not ipv4_ips:
+#         print(f"无法获取 {domain} 的IP列表,跳过该域名")
+#         return None
+#
+#     # 处理 IPv4 地址
+#     if ipv4_ips:
+#         fastest_ipv4 = find_fastest_ip(ipv4_ips)
+#         if fastest_ipv4:
+#             print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}")
+#             return [fastest_ipv4, domain]
+#         else:
+#             return [ipv4_ips[0], domain]
+
+def validate_ip(ip):
+    """
+    验证IP是否为合法的IPv4或IPv6地址
+    :param ip: 待验证的IP字符串
+    :return: True(合法)/False(非法)
+    """
+    # IPv4正则(严格验证:每个段0-255,无前置零(除0.0.0.0等合法场景))
+    ipv4_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
+    # IPv6正则(兼容压缩格式、本地链路地址、IPv4映射地址等所有合法格式)
+    ipv6_pattern = r'^(?:(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9]))$'
+
+    # 忽略大小写验证IPv6,优先验证IPv4
+    if re.match(ipv4_pattern, ip):
+        return True
+    elif re.match(ipv6_pattern, ip, re.IGNORECASE):
+        return True
+    else:
+        return False
 
-    if not ipv4_ips:
-        print(f"无法获取 {domain} 的IP列表,跳过该域名")
-        return None
+@retry(tries=3)
+def get_domain_ips(domain, record_type):
+    """
+    从Google DNS获取域名的A/AAAA记录IP列表
+    :param domain: 目标域名(如 tmdb.org)
+    :param record_type: 记录类型(A/AAAA,或数字1/28)
+    :return: 去重后的IP列表
+    """
+    all_ips = []  # 存储所有DNS服务器返回的IP
+
+    print(f"正在从Google DNS获取 {domain} 的{record_type}记录...")
+    url = f'https://dns.google/resolve'
+    headers = {
+        "accept": "*/*",
+        "accept-encoding": "gzip, deflate, br, zstd",
+        "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
+        "content-type": "application/json; charset=UTF-8",  # 关键:指定JSON格式负载
+        "referer": f"https://dns.google/query?name={domain}&rr_type={record_type}&ecs=",
+        "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36 Edg/141.0.0.0"
+    }
+
+    params = {
+        'name': domain,
+        'type': record_type
+    }
+
+    # 初始化IP列表(默认空列表,确保后续使用安全)
+    ips_str = []
 
-    # 处理 IPv4 地址
-    if ipv4_ips:
-        fastest_ipv4 = find_fastest_ip(ipv4_ips)
-        if fastest_ipv4:
-            print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}")
-            return [fastest_ipv4, domain]
-        else:
-            return [ipv4_ips[0], domain]
+    try:
+        # 改用GET请求(Google DNS resolve接口标准用法)
+        response = requests.get(url, headers=headers, params=params, timeout=10)
+        response.raise_for_status()  # 主动抛出HTTP错误(如4xx/5xx)
+
+        # 解析JSON响应
+        data = response.json()
+        if not isinstance(data, dict):
+            print("返回数据不是字典格式,无法解析")
+            return all_ips
+
+        # 核心:提取Answer数组中的data字段(IP地址)
+        answer_list = data.get("Answer", [])
+        if not answer_list:
+            print(f"未找到 {domain} 的{record_type}记录(Answer字段为空)")
+            return all_ips
+
+        # 遍历Answer数组,提取每个条目的data值(IP)
+        for answer in answer_list:
+            ip = answer.get("data")
+            if not ip:  # 过滤空值
+                continue
+
+            # 验证IP格式合法性
+            if validate_ip(ip):
+                all_ips.append(ip)
+                print(f"提取到合法IP:{ip}")
+            else:
+                print(f"跳过非法IP格式:{ip}")
+
+    except requests.exceptions.RequestException as e:
+        # 捕获所有网络/请求异常
+        print(f"请求Google DNS失败:{e}")
+        if hasattr(e, 'response') and e.response:
+            print(f"响应内容:{e.response.text[:500]}")  # 打印前500字符避免过长
+    except ValueError:
+        # JSON解析失败
+        print(f"响应内容不是有效的JSON格式:{response.text[:500]}")
+    time.sleep(1)
+    # 去重并返回(保持列表格式)
+    unique_ips = list(set(all_ips))
+    print(f"最终提取到 {domain} 的{record_type}记录IP(去重后):{unique_ips}")
+    return unique_ips
 
 
-async def main_async():
+def main():
     print("开始检测域名的最快IP...")
     file_path = "/ql/data"
-    udp = random.random() * 1000 + (int(time.time() * 1000) % 1000)
-    # 获取CSRF Token
-    csrf_token = get_csrf_token(udp)
-    if not csrf_token:
-        print("无法获取CSRF Token,程序退出")
-        sys.exit(1)
 
-    # 使用异步HTTP客户端
-    async with aiohttp.ClientSession() as session:
-        # 并行处理所有域名
-        tasks = [process_domain(session, domain, csrf_token, udp) for domain in DOMAINS]
-        results = await asyncio.gather(*tasks)
+    ipv4_ips, ipv6_ips, ipv4_results, ipv6_results = [], [], [], []
+
+    for domain in DOMAINS:
+        print(f"\n正在处理域名: {domain}")
+        ipv4_ips = get_domain_ips(domain, "A")
+        # ipv6_ips = get_domain_ips(domain, "AAAA")
+
+        if not ipv4_ips and not ipv6_ips:
+            print(f"无法获取 {domain} 的IP列表,跳过该域名")
+            continue
+
+        # 处理 IPv4 地址
+        if ipv4_ips:
+            fastest_ipv4 = find_fastest_ip(ipv4_ips)
+            if fastest_ipv4:
+                ipv4_results.append([fastest_ipv4, domain])
+                print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}")
+            else:
+                ipv4_results.append([ipv4_ips[0], domain])
+
+        # 处理 IPv6 地址
+        # if ipv6_ips:
+        #     fastest_ipv6 = find_fastest_ip(ipv6_ips)
+        #     if fastest_ipv6:
+        #         ipv6_results.append([fastest_ipv6, domain])
+        #         print(f"域名 {domain} 的最快IPv6是: {fastest_ipv6}")
+        #     else:
+        #         # 兜底:可能存在无法正确获取 fastest_ipv6 的情况,则将第一个IP赋值
+        #         ipv6_results.append([ipv6_ips[0], domain])
+
+        sleep(1)  # 避免请求过于频繁
 
-        # 过滤掉None结果(处理失败的域名)
-        ipv4_results = [result for result in results if result is not None]
 
     # 保存结果到文件
-    if not ipv4_results:
+    if not ipv4_results and not ipv6_results:
         print(f"程序出错:未获取任何domain及对应IP,请检查接口~")
         sys.exit(1)
 
@@ -200,18 +263,13 @@ async def main_async():
     ipv4_hosts_content = HOSTS_TEMPLATE.format(
         content="\n".join(f"{ip:<27} {domain}" for ip, domain in ipv4_results),
         update_time=update_time) if ipv4_results else ""
-
-    print(f"Ipv4: {ipv4_hosts_content}")
+    # ipv6_hosts_content = HOSTS_TEMPLATE.format(
+    #     content="\n".join(f"{ip:<50} {domain}" for ip, domain in ipv6_results),
+    #     update_time=update_time) if ipv6_results else ""
 
     # 写入文件
     if ipv4_hosts_content:
         write_host_file(file_path, ipv4_hosts_content, 'ipv4')
 
-
-def main():
-    # 运行异步主函数
-    asyncio.run(main_async())
-
-
 if __name__ == "__main__":
     main()