Преглед на файлове

feat: 新增 fast-dns-hosts 文件生成功能

- 新增 dns_optimizer.py 文件,用于生成最快的 DNS hosts 文件
- 实现了获取 CSRF Token、查询域名 IP、测量 IP 延迟等功能- 优化了文件名生成逻辑,使其更通用
- 使用异步编程提高域名处理效率
孔明 преди 2 месеца
родител
ревизия
7401155a85
променени са 1 файла, в които са добавени 215 реда и са изтрити 0 реда
  1. 215 0
      dns_optimizer.py

+ 215 - 0
dns_optimizer.py

@@ -0,0 +1,215 @@
+import requests
+from time import sleep
+import random
+import time
+import os
+import sys
+from datetime import datetime, timezone, timedelta
+from retry import retry
+import socket
+import asyncio
+import aiohttp
+
+country_code = 'jp'  # 节点
+
+DOMAINS = [
+    'tmdb.org',
+    'api.tmdb.org',
+    'files.tmdb.org',
+    'themoviedb.org',
+    'api.themoviedb.org',
+    'www.themoviedb.org',
+    'auth.themoviedb.org',
+    'image.tmdb.org',
+    'images.tmdb.org',
+    'imdb.com',
+    'www.imdb.com',
+    'secure.imdb.com',
+    's.media-imdb.com',
+    'us.dd.imdb.com',
+    'www.imdb.to',
+    'origin-www.imdb.com',
+    'ia.media-imdb.com',
+    'thetvdb.com',
+    'api.thetvdb.com',
+    'ia.media-imdb.com',
+    'f.media-amazon.com',
+    'imdb-video.media-imdb.com'
+]
+
+# 将 Tmdb_Host_TEMPLATE 修改为更通用的名称
+HOSTS_TEMPLATE = """# Fast DNS Hosts Start
+{content}
+# Update time: {update_time}
+# Fast DNS Hosts End\n"""
+
+
+def write_host_file(file_path: str, hosts_content: str, filename: str) -> None:
+    # 修改文件名生成逻辑,使其更通用
+    output_file_path = os.path.join(file_path, "fast_dns_hosts_" + filename)
+    with open(output_file_path, "w", encoding='utf-8') as output_fb:
+        output_fb.write(hosts_content)
+        print(f"\n~最新{filename}地址已更新~")
+
+
+@retry(tries=3)
+def get_csrf_token(udp):
+    """获取CSRF Token"""
+    try:
+        url = f'https://dnschecker.org/ajax_files/gen_csrf.php?udp={udp}'
+        headers = {
+            'referer': 'https://dnschecker.org/country/{country_code}/',
+            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0'
+        }
+
+        response = requests.get(url, headers=headers)
+        if response.status_code == 200:
+            csrf = response.json().get('csrf')
+            print(f"获取到的CSRF Token: {csrf}")
+            return csrf
+        else:
+            print(f"获取CSRF Token失败,HTTP状态码: {response.status_code}")
+            return None
+    except Exception as e:
+        print(f"获取CSRF Token时发生错误: {str(e)}")
+        return None
+
+
+async def async_get_domain_ips(session, domain, csrf_token, udp, argument):
+    url = f'https://dnschecker.org/ajax_files/api/220/{argument}/{domain}?dns_key=country&dns_value={country_code}&v=0.36&cd_flag=1&upd={udp}'
+    headers = {'csrftoken': csrf_token, 'referer': f'https://dnschecker.org/country/{country_code}/',
+               'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0'}
+
+    try:
+        async with session.get(url, headers=headers) as response:
+            if response.status == 200:
+                data = await response.json()
+                if 'result' in data and 'ips' in data['result']:
+                    ips_str = data['result']['ips']
+                    if '<br />' in ips_str:
+                        return [ip.strip() for ip in ips_str.split('<br />') if ip.strip()]
+                    else:
+                        return [ips_str.strip()] if ips_str.strip() else []
+                else:
+                    print(f"获取 {domain} 的IP列表失败:返回数据格式不正确")
+                    return []
+            else:
+                print(f"获取 {domain} 的IP列表失败,HTTP状态码: {response.status}")
+                return []
+    except Exception as e:
+        print(f"获取 {domain} 的IP列表时发生错误: {str(e)}")
+        return []
+
+
+def ping_ip(ip, port=80):
+    print(f"使用TCP连接测试IP地址的延迟(毫秒)")
+    try:
+        print(f"\n开始 ping {ip}...")
+        start_time = time.time()
+        with socket.create_connection((ip, port), timeout=2) as sock:
+            latency = (time.time() - start_time) * 1000  # 转换为毫秒
+            print(f"IP: {ip} 的平均延迟: {latency}ms")
+            return latency
+    except Exception as e:
+        print(f"Ping {ip} 时发生错误: {str(e)}")
+        return float('inf')
+
+
+def find_fastest_ip(ips):
+    """找出延迟最低的IP地址"""
+    if not ips:
+        return None
+
+    fastest_ip = None
+    min_latency = float('inf')
+    ip_latencies = []  # 存储所有IP及其延迟
+
+    for ip in ips:
+        ip = ip.strip()
+        if not ip:
+            continue
+
+        print(f"正在测试 IP: {ip}")
+        latency = ping_ip(ip)
+        ip_latencies.append((ip, latency))
+        print(f"IP: {ip} 延迟: {latency}ms")
+
+        if latency < min_latency:
+            min_latency = latency
+            fastest_ip = ip
+
+        sleep(0.5)
+
+    print("\n所有IP延迟情况:")
+    for ip, latency in ip_latencies:
+        print(f"IP: {ip} - 延迟: {latency}ms")
+
+    if fastest_ip:
+        print(f"\n最快的IP是: {fastest_ip},延迟: {min_latency}ms")
+
+    return fastest_ip
+
+
+async def process_domain(session, domain, csrf_token, udp):
+    print(f"\n正在处理域名: {domain}")
+    ipv4_ips = await async_get_domain_ips(session, domain, csrf_token, udp, "A")
+
+    if not ipv4_ips:
+        print(f"无法获取 {domain} 的IP列表,跳过该域名")
+        return None
+
+    # 处理 IPv4 地址
+    if ipv4_ips:
+        fastest_ipv4 = find_fastest_ip(ipv4_ips)
+        if fastest_ipv4:
+            print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}")
+            return [fastest_ipv4, domain]
+        else:
+            return [ipv4_ips[0], domain]
+
+
+async def main_async():
+    print("开始检测域名的最快IP...")
+    file_path = "/ql/data"
+    udp = random.random() * 1000 + (int(time.time() * 1000) % 1000)
+    # 获取CSRF Token
+    csrf_token = get_csrf_token(udp)
+    if not csrf_token:
+        print("无法获取CSRF Token,程序退出")
+        sys.exit(1)
+
+    # 使用异步HTTP客户端
+    async with aiohttp.ClientSession() as session:
+        # 并行处理所有域名
+        tasks = [process_domain(session, domain, csrf_token, udp) for domain in DOMAINS]
+        results = await asyncio.gather(*tasks)
+
+        # 过滤掉None结果(处理失败的域名)
+        ipv4_results = [result for result in results if result is not None]
+
+    # 保存结果到文件
+    if not ipv4_results:
+        print(f"程序出错:未获取任何domain及对应IP,请检查接口~")
+        sys.exit(1)
+
+    # 生成更新时间
+    update_time = datetime.now(timezone(timedelta(hours=8))).replace(microsecond=0).isoformat()
+
+    ipv4_hosts_content = HOSTS_TEMPLATE.format(
+        content="\n".join(f"{ip:<27} {domain}" for ip, domain in ipv4_results),
+        update_time=update_time) if ipv4_results else ""
+
+    print(f"Ipv4: {ipv4_hosts_content}")
+
+    # 写入文件
+    if ipv4_hosts_content:
+        write_host_file(file_path, ipv4_hosts_content, 'ipv4')
+
+
+def main():
+    # 运行异步主函数
+    asyncio.run(main_async())
+
+
+if __name__ == "__main__":
+    main()