import requests from time import sleep import time import os import re import sys from datetime import datetime, timezone, timedelta from retry import retry import socket # 原始脚本地址: https://github.com/cnwikee/CheckTMDB.git DOMAINS = [ 'tmdb.org', 'api.tmdb.org', 'files.tmdb.org', 'themoviedb.org', 'api.themoviedb.org', 'www.themoviedb.org', 'auth.themoviedb.org', 'image.tmdb.org', 'images.tmdb.org', 'imdb.com', 'www.imdb.com', 'secure.imdb.com', 's.media-imdb.com', 'us.dd.imdb.com', 'www.imdb.to', 'origin-www.imdb.com', 'ia.media-imdb.com', 'thetvdb.com', 'api.thetvdb.com', 'ia.media-imdb.com', 'f.media-amazon.com', 'imdb-video.media-imdb.com' ] # 将 Tmdb_Host_TEMPLATE 修改为更通用的名称 HOSTS_TEMPLATE = """# Fast DNS Hosts Start {content} # Update time: {update_time} # Fast DNS Hosts End\n""" def write_host_file(file_path: str, hosts_content: str, filename: str) -> None: # 修改文件名生成逻辑,使其更通用 output_file_path = os.path.join(file_path, "fast_dns_hosts_" + filename) with open(output_file_path, "w", encoding='utf-8') as output_fb: output_fb.write(hosts_content) print(f"\n~最新{filename}地址已更新~") def ping_ip(ip, port=80): print(f"使用TCP连接测试IP地址的延迟(毫秒)") try: print(f"\n开始 ping {ip}...") start_time = time.time() with socket.create_connection((ip, port), timeout=2) as sock: latency = (time.time() - start_time) * 1000 # 转换为毫秒 print(f"IP: {ip} 的平均延迟: {latency}ms") return latency except Exception as e: print(f"Ping {ip} 时发生错误: {str(e)}") return float('inf') def find_fastest_ip(ips): """找出延迟最低的IP地址""" if not ips: return None fastest_ip = None min_latency = float('inf') ip_latencies = [] # 存储所有IP及其延迟 for ip in ips: ip = ip.strip() if not ip: continue print(f"正在测试 IP: {ip}") latency = ping_ip(ip) ip_latencies.append((ip, latency)) print(f"IP: {ip} 延迟: {latency}ms") if latency < min_latency: min_latency = latency fastest_ip = ip sleep(0.5) print("\n所有IP延迟情况:") for ip, latency in ip_latencies: print(f"IP: {ip} - 延迟: {latency}ms") if fastest_ip: print(f"\n最快的IP是: {fastest_ip},延迟: {min_latency}ms") return fastest_ip # async def process_domain(session, domain, csrf_token, udp): # print(f"\n正在处理域名: {domain}") # ipv4_ips = await async_get_domain_ips(session, domain, csrf_token, udp, "A") # # if not ipv4_ips: # print(f"无法获取 {domain} 的IP列表,跳过该域名") # return None # # # 处理 IPv4 地址 # if ipv4_ips: # fastest_ipv4 = find_fastest_ip(ipv4_ips) # if fastest_ipv4: # print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}") # return [fastest_ipv4, domain] # else: # return [ipv4_ips[0], domain] def validate_ip(ip): """ 验证IP是否为合法的IPv4或IPv6地址 :param ip: 待验证的IP字符串 :return: True(合法)/False(非法) """ # IPv4正则(严格验证:每个段0-255,无前置零(除0.0.0.0等合法场景)) ipv4_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' # IPv6正则(兼容压缩格式、本地链路地址、IPv4映射地址等所有合法格式) ipv6_pattern = r'^(?:(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9]))$' # 忽略大小写验证IPv6,优先验证IPv4 if re.match(ipv4_pattern, ip): return True elif re.match(ipv6_pattern, ip, re.IGNORECASE): return True else: return False @retry(tries=3) def get_domain_ips(domain, record_type): """ 从Google DNS获取域名的A/AAAA记录IP列表 :param domain: 目标域名(如 tmdb.org) :param record_type: 记录类型(A/AAAA,或数字1/28) :return: 去重后的IP列表 """ all_ips = [] # 存储所有DNS服务器返回的IP print(f"正在从Google DNS获取 {domain} 的{record_type}记录...") url = f'https://cloudflare-dns.com/dns-query' headers = { "accept": "application/dns-json" # "accept-encoding": "gzip, deflate, br, zstd", # "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", # "content-type": "application/json; charset=UTF-8", # 关键:指定JSON格式负载 # "referer": f"https://dns.google/query?name={domain}&rr_type={record_type}&ecs=", # "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36 Edg/141.0.0.0" } params = { 'name': domain, 'type': record_type } # 初始化IP列表(默认空列表,确保后续使用安全) ips_str = [] try: # 改用GET请求(Google DNS resolve接口标准用法) response = requests.get(url, headers=headers, params=params, timeout=10) response.raise_for_status() # 主动抛出HTTP错误(如4xx/5xx) # 解析JSON响应 data = response.json() if not isinstance(data, dict): print("返回数据不是字典格式,无法解析") return all_ips # 核心:提取Answer数组中的data字段(IP地址) answer_list = data.get("Answer", []) if not answer_list: print(f"未找到 {domain} 的{record_type}记录(Answer字段为空)") return all_ips # 遍历Answer数组,提取每个条目的data值(IP) for answer in answer_list: ip = answer.get("data") if not ip: # 过滤空值 continue # 验证IP格式合法性 if validate_ip(ip): all_ips.append(ip) print(f"提取到合法IP:{ip}") else: print(f"跳过非法IP格式:{ip}") except requests.exceptions.RequestException as e: # 捕获所有网络/请求异常 print(f"请求Google DNS失败:{e}") if hasattr(e, 'response') and e.response: print(f"响应内容:{e.response.text[:500]}") # 打印前500字符避免过长 except ValueError: # JSON解析失败 print(f"响应内容不是有效的JSON格式:{response.text[:500]}") time.sleep(1) # 去重并返回(保持列表格式) unique_ips = list(set(all_ips)) print(f"最终提取到 {domain} 的{record_type}记录IP(去重后):{unique_ips}") return unique_ips def main(): print("开始检测域名的最快IP...") file_path = "/ql/data" ipv4_ips, ipv6_ips, ipv4_results, ipv6_results = [], [], [], [] for domain in DOMAINS: print(f"\n正在处理域名: {domain}") ipv4_ips = get_domain_ips(domain, "A") # ipv6_ips = get_domain_ips(domain, "AAAA") if not ipv4_ips and not ipv6_ips: print(f"无法获取 {domain} 的IP列表,跳过该域名") continue # 处理 IPv4 地址 if ipv4_ips: fastest_ipv4 = find_fastest_ip(ipv4_ips) if fastest_ipv4: ipv4_results.append([fastest_ipv4, domain]) print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}") else: ipv4_results.append([ipv4_ips[0], domain]) # 处理 IPv6 地址 # if ipv6_ips: # fastest_ipv6 = find_fastest_ip(ipv6_ips) # if fastest_ipv6: # ipv6_results.append([fastest_ipv6, domain]) # print(f"域名 {domain} 的最快IPv6是: {fastest_ipv6}") # else: # # 兜底:可能存在无法正确获取 fastest_ipv6 的情况,则将第一个IP赋值 # ipv6_results.append([ipv6_ips[0], domain]) sleep(1) # 避免请求过于频繁 # 保存结果到文件 if not ipv4_results and not ipv6_results: print(f"程序出错:未获取任何domain及对应IP,请检查接口~") sys.exit(1) # 生成更新时间 update_time = datetime.now(timezone(timedelta(hours=8))).replace(microsecond=0).isoformat() ipv4_hosts_content = HOSTS_TEMPLATE.format( content="\n".join(f"{ip:<27} {domain}" for ip, domain in ipv4_results), update_time=update_time) if ipv4_results else "" # ipv6_hosts_content = HOSTS_TEMPLATE.format( # content="\n".join(f"{ip:<50} {domain}" for ip, domain in ipv6_results), # update_time=update_time) if ipv6_results else "" # 写入文件 if ipv4_hosts_content: write_host_file(file_path, ipv4_hosts_content, 'ipv4') if __name__ == "__main__": main()