import requests
from time import sleep
import random
import time
import os
import sys
from datetime import datetime, timezone, timedelta
from retry import retry
import socket
import asyncio
import aiohttp
# 原始脚本地址: https://github.com/cnwikee/CheckTMDB.git
country_code = 'jp' # 节点
DOMAINS = [
'tmdb.org',
'api.tmdb.org',
'files.tmdb.org',
'themoviedb.org',
'api.themoviedb.org',
'www.themoviedb.org',
'auth.themoviedb.org',
'image.tmdb.org',
'images.tmdb.org',
'imdb.com',
'www.imdb.com',
'secure.imdb.com',
's.media-imdb.com',
'us.dd.imdb.com',
'www.imdb.to',
'origin-www.imdb.com',
'ia.media-imdb.com',
'thetvdb.com',
'api.thetvdb.com',
'ia.media-imdb.com',
'f.media-amazon.com',
'imdb-video.media-imdb.com'
]
# 将 Tmdb_Host_TEMPLATE 修改为更通用的名称
HOSTS_TEMPLATE = """# Fast DNS Hosts Start
{content}
# Update time: {update_time}
# Fast DNS Hosts End\n"""
def write_host_file(file_path: str, hosts_content: str, filename: str) -> None:
# 修改文件名生成逻辑,使其更通用
output_file_path = os.path.join(file_path, "fast_dns_hosts_" + filename)
with open(output_file_path, "w", encoding='utf-8') as output_fb:
output_fb.write(hosts_content)
print(f"\n~最新{filename}地址已更新~")
@retry(tries=3)
def get_csrf_token(udp):
"""获取CSRF Token"""
try:
url = f'https://dnschecker.org/ajax_files/gen_csrf.php?udp={udp}'
headers = {
'referer': 'https://dnschecker.org/country/{country_code}/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0'
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
csrf = response.json().get('csrf')
print(f"获取到的CSRF Token: {csrf}")
return csrf
else:
print(f"获取CSRF Token失败,HTTP状态码: {response.status_code}")
return None
except Exception as e:
print(f"获取CSRF Token时发生错误: {str(e)}")
return None
async def async_get_domain_ips(session, domain, csrf_token, udp, argument):
url = f'https://dnschecker.org/ajax_files/api/220/{argument}/{domain}?dns_key=country&dns_value={country_code}&v=0.36&cd_flag=1&upd={udp}'
headers = {'csrftoken': csrf_token, 'referer': f'https://dnschecker.org/country/{country_code}/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0'}
try:
async with session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
if 'result' in data and 'ips' in data['result']:
ips_str = data['result']['ips']
if '
' in ips_str:
return [ip.strip() for ip in ips_str.split('
') if ip.strip()]
else:
return [ips_str.strip()] if ips_str.strip() else []
else:
print(f"获取 {domain} 的IP列表失败:返回数据格式不正确")
return []
else:
print(f"获取 {domain} 的IP列表失败,HTTP状态码: {response.status}")
return []
except Exception as e:
print(f"获取 {domain} 的IP列表时发生错误: {str(e)}")
return []
def ping_ip(ip, port=80):
print(f"使用TCP连接测试IP地址的延迟(毫秒)")
try:
print(f"\n开始 ping {ip}...")
start_time = time.time()
with socket.create_connection((ip, port), timeout=2) as sock:
latency = (time.time() - start_time) * 1000 # 转换为毫秒
print(f"IP: {ip} 的平均延迟: {latency}ms")
return latency
except Exception as e:
print(f"Ping {ip} 时发生错误: {str(e)}")
return float('inf')
def find_fastest_ip(ips):
"""找出延迟最低的IP地址"""
if not ips:
return None
fastest_ip = None
min_latency = float('inf')
ip_latencies = [] # 存储所有IP及其延迟
for ip in ips:
ip = ip.strip()
if not ip:
continue
print(f"正在测试 IP: {ip}")
latency = ping_ip(ip)
ip_latencies.append((ip, latency))
print(f"IP: {ip} 延迟: {latency}ms")
if latency < min_latency:
min_latency = latency
fastest_ip = ip
sleep(0.5)
print("\n所有IP延迟情况:")
for ip, latency in ip_latencies:
print(f"IP: {ip} - 延迟: {latency}ms")
if fastest_ip:
print(f"\n最快的IP是: {fastest_ip},延迟: {min_latency}ms")
return fastest_ip
async def process_domain(session, domain, csrf_token, udp):
print(f"\n正在处理域名: {domain}")
ipv4_ips = await async_get_domain_ips(session, domain, csrf_token, udp, "A")
if not ipv4_ips:
print(f"无法获取 {domain} 的IP列表,跳过该域名")
return None
# 处理 IPv4 地址
if ipv4_ips:
fastest_ipv4 = find_fastest_ip(ipv4_ips)
if fastest_ipv4:
print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}")
return [fastest_ipv4, domain]
else:
return [ipv4_ips[0], domain]
async def main_async():
print("开始检测域名的最快IP...")
file_path = "/ql/data"
udp = random.random() * 1000 + (int(time.time() * 1000) % 1000)
# 获取CSRF Token
csrf_token = get_csrf_token(udp)
if not csrf_token:
print("无法获取CSRF Token,程序退出")
sys.exit(1)
# 使用异步HTTP客户端
async with aiohttp.ClientSession() as session:
# 并行处理所有域名
tasks = [process_domain(session, domain, csrf_token, udp) for domain in DOMAINS]
results = await asyncio.gather(*tasks)
# 过滤掉None结果(处理失败的域名)
ipv4_results = [result for result in results if result is not None]
# 保存结果到文件
if not ipv4_results:
print(f"程序出错:未获取任何domain及对应IP,请检查接口~")
sys.exit(1)
# 生成更新时间
update_time = datetime.now(timezone(timedelta(hours=8))).replace(microsecond=0).isoformat()
ipv4_hosts_content = HOSTS_TEMPLATE.format(
content="\n".join(f"{ip:<27} {domain}" for ip, domain in ipv4_results),
update_time=update_time) if ipv4_results else ""
print(f"Ipv4: {ipv4_hosts_content}")
# 写入文件
if ipv4_hosts_content:
write_host_file(file_path, ipv4_hosts_content, 'ipv4')
def main():
# 运行异步主函数
asyncio.run(main_async())
if __name__ == "__main__":
main()