import requests from time import sleep import random import time import os import sys from datetime import datetime, timezone, timedelta from retry import retry import socket import asyncio import aiohttp # 原始脚本地址: https://github.com/cnwikee/CheckTMDB.git country_code = 'jp' # 节点 DOMAINS = [ 'tmdb.org', 'api.tmdb.org', 'files.tmdb.org', 'themoviedb.org', 'api.themoviedb.org', 'www.themoviedb.org', 'auth.themoviedb.org', 'image.tmdb.org', 'images.tmdb.org', 'imdb.com', 'www.imdb.com', 'secure.imdb.com', 's.media-imdb.com', 'us.dd.imdb.com', 'www.imdb.to', 'origin-www.imdb.com', 'ia.media-imdb.com', 'thetvdb.com', 'api.thetvdb.com', 'ia.media-imdb.com', 'f.media-amazon.com', 'imdb-video.media-imdb.com' ] # 将 Tmdb_Host_TEMPLATE 修改为更通用的名称 HOSTS_TEMPLATE = """# Fast DNS Hosts Start {content} # Update time: {update_time} # Fast DNS Hosts End\n""" def write_host_file(file_path: str, hosts_content: str, filename: str) -> None: # 修改文件名生成逻辑,使其更通用 output_file_path = os.path.join(file_path, "fast_dns_hosts_" + filename) with open(output_file_path, "w", encoding='utf-8') as output_fb: output_fb.write(hosts_content) print(f"\n~最新{filename}地址已更新~") @retry(tries=3) def get_csrf_token(udp): """获取CSRF Token""" try: url = f'https://dnschecker.org/ajax_files/gen_csrf.php?udp={udp}' headers = { 'referer': 'https://dnschecker.org/country/{country_code}/', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0' } response = requests.get(url, headers=headers) if response.status_code == 200: csrf = response.json().get('csrf') print(f"获取到的CSRF Token: {csrf}") return csrf else: print(f"获取CSRF Token失败,HTTP状态码: {response.status_code}") return None except Exception as e: print(f"获取CSRF Token时发生错误: {str(e)}") return None async def async_get_domain_ips(session, domain, csrf_token, udp, argument): url = f'https://dnschecker.org/ajax_files/api/220/{argument}/{domain}?dns_key=country&dns_value={country_code}&v=0.36&cd_flag=1&upd={udp}' headers = {'csrftoken': csrf_token, 'referer': f'https://dnschecker.org/country/{country_code}/', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0'} try: async with session.get(url, headers=headers) as response: if response.status == 200: data = await response.json() if 'result' in data and 'ips' in data['result']: ips_str = data['result']['ips'] if '
' in ips_str: return [ip.strip() for ip in ips_str.split('
') if ip.strip()] else: return [ips_str.strip()] if ips_str.strip() else [] else: print(f"获取 {domain} 的IP列表失败:返回数据格式不正确") return [] else: print(f"获取 {domain} 的IP列表失败,HTTP状态码: {response.status}") return [] except Exception as e: print(f"获取 {domain} 的IP列表时发生错误: {str(e)}") return [] def ping_ip(ip, port=80): print(f"使用TCP连接测试IP地址的延迟(毫秒)") try: print(f"\n开始 ping {ip}...") start_time = time.time() with socket.create_connection((ip, port), timeout=2) as sock: latency = (time.time() - start_time) * 1000 # 转换为毫秒 print(f"IP: {ip} 的平均延迟: {latency}ms") return latency except Exception as e: print(f"Ping {ip} 时发生错误: {str(e)}") return float('inf') def find_fastest_ip(ips): """找出延迟最低的IP地址""" if not ips: return None fastest_ip = None min_latency = float('inf') ip_latencies = [] # 存储所有IP及其延迟 for ip in ips: ip = ip.strip() if not ip: continue print(f"正在测试 IP: {ip}") latency = ping_ip(ip) ip_latencies.append((ip, latency)) print(f"IP: {ip} 延迟: {latency}ms") if latency < min_latency: min_latency = latency fastest_ip = ip sleep(0.5) print("\n所有IP延迟情况:") for ip, latency in ip_latencies: print(f"IP: {ip} - 延迟: {latency}ms") if fastest_ip: print(f"\n最快的IP是: {fastest_ip},延迟: {min_latency}ms") return fastest_ip async def process_domain(session, domain, csrf_token, udp): print(f"\n正在处理域名: {domain}") ipv4_ips = await async_get_domain_ips(session, domain, csrf_token, udp, "A") if not ipv4_ips: print(f"无法获取 {domain} 的IP列表,跳过该域名") return None # 处理 IPv4 地址 if ipv4_ips: fastest_ipv4 = find_fastest_ip(ipv4_ips) if fastest_ipv4: print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}") return [fastest_ipv4, domain] else: return [ipv4_ips[0], domain] async def main_async(): print("开始检测域名的最快IP...") file_path = "/ql/data" udp = random.random() * 1000 + (int(time.time() * 1000) % 1000) # 获取CSRF Token csrf_token = get_csrf_token(udp) if not csrf_token: print("无法获取CSRF Token,程序退出") sys.exit(1) # 使用异步HTTP客户端 async with aiohttp.ClientSession() as session: # 并行处理所有域名 tasks = [process_domain(session, domain, csrf_token, udp) for domain in DOMAINS] results = await asyncio.gather(*tasks) # 过滤掉None结果(处理失败的域名) ipv4_results = [result for result in results if result is not None] # 保存结果到文件 if not ipv4_results: print(f"程序出错:未获取任何domain及对应IP,请检查接口~") sys.exit(1) # 生成更新时间 update_time = datetime.now(timezone(timedelta(hours=8))).replace(microsecond=0).isoformat() ipv4_hosts_content = HOSTS_TEMPLATE.format( content="\n".join(f"{ip:<27} {domain}" for ip, domain in ipv4_results), update_time=update_time) if ipv4_results else "" print(f"Ipv4: {ipv4_hosts_content}") # 写入文件 if ipv4_hosts_content: write_host_file(file_path, ipv4_hosts_content, 'ipv4') def main(): # 运行异步主函数 asyncio.run(main_async()) if __name__ == "__main__": main()