||
- import requests
- from time import sleep
- import time
- import os
- import re
- import sys
- from datetime import datetime, timezone, timedelta
- from retry import retry
- import socket
- # 原始脚本地址: https://github.com/cnwikee/CheckTMDB.git
- DOMAINS = [
- 'tmdb.org',
- 'api.tmdb.org',
- 'files.tmdb.org',
- 'themoviedb.org',
- 'api.themoviedb.org',
- 'www.themoviedb.org',
- 'auth.themoviedb.org',
- 'image.tmdb.org',
- 'images.tmdb.org',
- 'imdb.com',
- 'www.imdb.com',
- 'secure.imdb.com',
- 's.media-imdb.com',
- 'us.dd.imdb.com',
- 'www.imdb.to',
- 'origin-www.imdb.com',
- 'ia.media-imdb.com',
- 'thetvdb.com',
- 'api.thetvdb.com',
- 'ia.media-imdb.com',
- 'f.media-amazon.com',
- 'imdb-video.media-imdb.com'
- ]
- # 将 Tmdb_Host_TEMPLATE 修改为更通用的名称
- HOSTS_TEMPLATE = """# Fast DNS Hosts Start
- {content}
- # Update time: {update_time}
- # Fast DNS Hosts End\n"""
- def write_host_file(file_path: str, hosts_content: str, filename: str) -> None:
- # 修改文件名生成逻辑,使其更通用
- output_file_path = os.path.join(file_path, "fast_dns_hosts_" + filename)
- with open(output_file_path, "w", encoding='utf-8') as output_fb:
- output_fb.write(hosts_content)
- print(f"\n~最新{filename}地址已更新~")
- def ping_ip(ip, port=80):
- print(f"使用TCP连接测试IP地址的延迟(毫秒)")
- try:
- print(f"\n开始 ping {ip}...")
- start_time = time.time()
- with socket.create_connection((ip, port), timeout=2) as sock:
- latency = (time.time() - start_time) * 1000 # 转换为毫秒
- print(f"IP: {ip} 的平均延迟: {latency}ms")
- return latency
- except Exception as e:
- print(f"Ping {ip} 时发生错误: {str(e)}")
- return float('inf')
- def find_fastest_ip(ips):
- """找出延迟最低的IP地址"""
- if not ips:
- return None
- fastest_ip = None
- min_latency = float('inf')
- ip_latencies = [] # 存储所有IP及其延迟
- for ip in ips:
- ip = ip.strip()
- if not ip:
- continue
- print(f"正在测试 IP: {ip}")
- latency = ping_ip(ip)
- ip_latencies.append((ip, latency))
- print(f"IP: {ip} 延迟: {latency}ms")
- if latency < min_latency:
- min_latency = latency
- fastest_ip = ip
- sleep(0.5)
- print("\n所有IP延迟情况:")
- for ip, latency in ip_latencies:
- print(f"IP: {ip} - 延迟: {latency}ms")
- if fastest_ip:
- print(f"\n最快的IP是: {fastest_ip},延迟: {min_latency}ms")
- return fastest_ip
- # async def process_domain(session, domain, csrf_token, udp):
- # print(f"\n正在处理域名: {domain}")
- # ipv4_ips = await async_get_domain_ips(session, domain, csrf_token, udp, "A")
- #
- # if not ipv4_ips:
- # print(f"无法获取 {domain} 的IP列表,跳过该域名")
- # return None
- #
- # # 处理 IPv4 地址
- # if ipv4_ips:
- # fastest_ipv4 = find_fastest_ip(ipv4_ips)
- # if fastest_ipv4:
- # print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}")
- # return [fastest_ipv4, domain]
- # else:
- # return [ipv4_ips[0], domain]
- def validate_ip(ip):
- """
- 验证IP是否为合法的IPv4或IPv6地址
- :param ip: 待验证的IP字符串
- :return: True(合法)/False(非法)
- """
- # IPv4正则(严格验证:每个段0-255,无前置零(除0.0.0.0等合法场景))
- ipv4_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
- # IPv6正则(兼容压缩格式、本地链路地址、IPv4映射地址等所有合法格式)
- ipv6_pattern = r'^(?:(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9]))$'
- # 忽略大小写验证IPv6,优先验证IPv4
- if re.match(ipv4_pattern, ip):
- return True
- elif re.match(ipv6_pattern, ip, re.IGNORECASE):
- return True
- else:
- return False
- @retry(tries=3)
- def get_domain_ips(domain, record_type):
- """
- 从Google DNS获取域名的A/AAAA记录IP列表
- :param domain: 目标域名(如 tmdb.org)
- :param record_type: 记录类型(A/AAAA,或数字1/28)
- :return: 去重后的IP列表
- """
- all_ips = [] # 存储所有DNS服务器返回的IP
- print(f"正在从Google DNS获取 {domain} 的{record_type}记录...")
- url = f'https://cloudflare-dns.com/dns-query'
- headers = {
- "accept": "application/dns-json"
- # "accept-encoding": "gzip, deflate, br, zstd",
- # "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
- # "content-type": "application/json; charset=UTF-8", # 关键:指定JSON格式负载
- # "referer": f"https://dns.google/query?name={domain}&rr_type={record_type}&ecs=",
- # "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36 Edg/141.0.0.0"
- }
- params = {
- 'name': domain,
- 'type': record_type
- }
- # 初始化IP列表(默认空列表,确保后续使用安全)
- ips_str = []
- try:
- # 改用GET请求(Google DNS resolve接口标准用法)
- response = requests.get(url, headers=headers, params=params, timeout=10)
- response.raise_for_status() # 主动抛出HTTP错误(如4xx/5xx)
- # 解析JSON响应
- data = response.json()
- if not isinstance(data, dict):
- print("返回数据不是字典格式,无法解析")
- return all_ips
- # 核心:提取Answer数组中的data字段(IP地址)
- answer_list = data.get("Answer", [])
- if not answer_list:
- print(f"未找到 {domain} 的{record_type}记录(Answer字段为空)")
- return all_ips
- # 遍历Answer数组,提取每个条目的data值(IP)
- for answer in answer_list:
- ip = answer.get("data")
- if not ip: # 过滤空值
- continue
- # 验证IP格式合法性
- if validate_ip(ip):
- all_ips.append(ip)
- print(f"提取到合法IP:{ip}")
- else:
- print(f"跳过非法IP格式:{ip}")
- except requests.exceptions.RequestException as e:
- # 捕获所有网络/请求异常
- print(f"请求Google DNS失败:{e}")
- if hasattr(e, 'response') and e.response:
- print(f"响应内容:{e.response.text[:500]}") # 打印前500字符避免过长
- except ValueError:
- # JSON解析失败
- print(f"响应内容不是有效的JSON格式:{response.text[:500]}")
- time.sleep(1)
- # 去重并返回(保持列表格式)
- unique_ips = list(set(all_ips))
- print(f"最终提取到 {domain} 的{record_type}记录IP(去重后):{unique_ips}")
- return unique_ips
- def main():
- print("开始检测域名的最快IP...")
- file_path = "/ql/data"
- ipv4_ips, ipv6_ips, ipv4_results, ipv6_results = [], [], [], []
- for domain in DOMAINS:
- print(f"\n正在处理域名: {domain}")
- ipv4_ips = get_domain_ips(domain, "A")
- # ipv6_ips = get_domain_ips(domain, "AAAA")
- if not ipv4_ips and not ipv6_ips:
- print(f"无法获取 {domain} 的IP列表,跳过该域名")
- continue
- # 处理 IPv4 地址
- if ipv4_ips:
- fastest_ipv4 = find_fastest_ip(ipv4_ips)
- if fastest_ipv4:
- ipv4_results.append([fastest_ipv4, domain])
- print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}")
- else:
- ipv4_results.append([ipv4_ips[0], domain])
- # 处理 IPv6 地址
- # if ipv6_ips:
- # fastest_ipv6 = find_fastest_ip(ipv6_ips)
- # if fastest_ipv6:
- # ipv6_results.append([fastest_ipv6, domain])
- # print(f"域名 {domain} 的最快IPv6是: {fastest_ipv6}")
- # else:
- # # 兜底:可能存在无法正确获取 fastest_ipv6 的情况,则将第一个IP赋值
- # ipv6_results.append([ipv6_ips[0], domain])
- sleep(1) # 避免请求过于频繁
- # 保存结果到文件
- if not ipv4_results and not ipv6_results:
- print(f"程序出错:未获取任何domain及对应IP,请检查接口~")
- sys.exit(1)
- # 生成更新时间
- update_time = datetime.now(timezone(timedelta(hours=8))).replace(microsecond=0).isoformat()
- ipv4_hosts_content = HOSTS_TEMPLATE.format(
- content="\n".join(f"{ip:<27} {domain}" for ip, domain in ipv4_results),
- update_time=update_time) if ipv4_results else ""
- # ipv6_hosts_content = HOSTS_TEMPLATE.format(
- # content="\n".join(f"{ip:<50} {domain}" for ip, domain in ipv6_results),
- # update_time=update_time) if ipv6_results else ""
- # 写入文件
- if ipv4_hosts_content:
- write_host_file(file_path, ipv4_hosts_content, 'ipv4')
- if __name__ == "__main__":
- main()
|