| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- import requests
- from time import sleep
- import random
- import time
- import os
- import sys
- from datetime import datetime, timezone, timedelta
- from retry import retry
- import socket
- import asyncio
- import aiohttp
- country_code = 'jp' # 节点
- DOMAINS = [
- 'tmdb.org',
- 'api.tmdb.org',
- 'files.tmdb.org',
- 'themoviedb.org',
- 'api.themoviedb.org',
- 'www.themoviedb.org',
- 'auth.themoviedb.org',
- 'image.tmdb.org',
- 'images.tmdb.org',
- 'imdb.com',
- 'www.imdb.com',
- 'secure.imdb.com',
- 's.media-imdb.com',
- 'us.dd.imdb.com',
- 'www.imdb.to',
- 'origin-www.imdb.com',
- 'ia.media-imdb.com',
- 'thetvdb.com',
- 'api.thetvdb.com',
- 'ia.media-imdb.com',
- 'f.media-amazon.com',
- 'imdb-video.media-imdb.com'
- ]
- # 将 Tmdb_Host_TEMPLATE 修改为更通用的名称
- HOSTS_TEMPLATE = """# Fast DNS Hosts Start
- {content}
- # Update time: {update_time}
- # Fast DNS Hosts End\n"""
- def write_host_file(file_path: str, hosts_content: str, filename: str) -> None:
- # 修改文件名生成逻辑,使其更通用
- output_file_path = os.path.join(file_path, "fast_dns_hosts_" + filename)
- with open(output_file_path, "w", encoding='utf-8') as output_fb:
- output_fb.write(hosts_content)
- print(f"\n~最新{filename}地址已更新~")
- @retry(tries=3)
- def get_csrf_token(udp):
- """获取CSRF Token"""
- try:
- url = f'https://dnschecker.org/ajax_files/gen_csrf.php?udp={udp}'
- headers = {
- 'referer': 'https://dnschecker.org/country/{country_code}/',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0'
- }
- response = requests.get(url, headers=headers)
- if response.status_code == 200:
- csrf = response.json().get('csrf')
- print(f"获取到的CSRF Token: {csrf}")
- return csrf
- else:
- print(f"获取CSRF Token失败,HTTP状态码: {response.status_code}")
- return None
- except Exception as e:
- print(f"获取CSRF Token时发生错误: {str(e)}")
- return None
- async def async_get_domain_ips(session, domain, csrf_token, udp, argument):
- url = f'https://dnschecker.org/ajax_files/api/220/{argument}/{domain}?dns_key=country&dns_value={country_code}&v=0.36&cd_flag=1&upd={udp}'
- headers = {'csrftoken': csrf_token, 'referer': f'https://dnschecker.org/country/{country_code}/',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0'}
- try:
- async with session.get(url, headers=headers) as response:
- if response.status == 200:
- data = await response.json()
- if 'result' in data and 'ips' in data['result']:
- ips_str = data['result']['ips']
- if '<br />' in ips_str:
- return [ip.strip() for ip in ips_str.split('<br />') if ip.strip()]
- else:
- return [ips_str.strip()] if ips_str.strip() else []
- else:
- print(f"获取 {domain} 的IP列表失败:返回数据格式不正确")
- return []
- else:
- print(f"获取 {domain} 的IP列表失败,HTTP状态码: {response.status}")
- return []
- except Exception as e:
- print(f"获取 {domain} 的IP列表时发生错误: {str(e)}")
- return []
- def ping_ip(ip, port=80):
- print(f"使用TCP连接测试IP地址的延迟(毫秒)")
- try:
- print(f"\n开始 ping {ip}...")
- start_time = time.time()
- with socket.create_connection((ip, port), timeout=2) as sock:
- latency = (time.time() - start_time) * 1000 # 转换为毫秒
- print(f"IP: {ip} 的平均延迟: {latency}ms")
- return latency
- except Exception as e:
- print(f"Ping {ip} 时发生错误: {str(e)}")
- return float('inf')
- def find_fastest_ip(ips):
- """找出延迟最低的IP地址"""
- if not ips:
- return None
- fastest_ip = None
- min_latency = float('inf')
- ip_latencies = [] # 存储所有IP及其延迟
- for ip in ips:
- ip = ip.strip()
- if not ip:
- continue
- print(f"正在测试 IP: {ip}")
- latency = ping_ip(ip)
- ip_latencies.append((ip, latency))
- print(f"IP: {ip} 延迟: {latency}ms")
- if latency < min_latency:
- min_latency = latency
- fastest_ip = ip
- sleep(0.5)
- print("\n所有IP延迟情况:")
- for ip, latency in ip_latencies:
- print(f"IP: {ip} - 延迟: {latency}ms")
- if fastest_ip:
- print(f"\n最快的IP是: {fastest_ip},延迟: {min_latency}ms")
- return fastest_ip
- async def process_domain(session, domain, csrf_token, udp):
- print(f"\n正在处理域名: {domain}")
- ipv4_ips = await async_get_domain_ips(session, domain, csrf_token, udp, "A")
- if not ipv4_ips:
- print(f"无法获取 {domain} 的IP列表,跳过该域名")
- return None
- # 处理 IPv4 地址
- if ipv4_ips:
- fastest_ipv4 = find_fastest_ip(ipv4_ips)
- if fastest_ipv4:
- print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}")
- return [fastest_ipv4, domain]
- else:
- return [ipv4_ips[0], domain]
- async def main_async():
- print("开始检测域名的最快IP...")
- file_path = "/ql/data"
- udp = random.random() * 1000 + (int(time.time() * 1000) % 1000)
- # 获取CSRF Token
- csrf_token = get_csrf_token(udp)
- if not csrf_token:
- print("无法获取CSRF Token,程序退出")
- sys.exit(1)
- # 使用异步HTTP客户端
- async with aiohttp.ClientSession() as session:
- # 并行处理所有域名
- tasks = [process_domain(session, domain, csrf_token, udp) for domain in DOMAINS]
- results = await asyncio.gather(*tasks)
- # 过滤掉None结果(处理失败的域名)
- ipv4_results = [result for result in results if result is not None]
- # 保存结果到文件
- if not ipv4_results:
- print(f"程序出错:未获取任何domain及对应IP,请检查接口~")
- sys.exit(1)
- # 生成更新时间
- update_time = datetime.now(timezone(timedelta(hours=8))).replace(microsecond=0).isoformat()
- ipv4_hosts_content = HOSTS_TEMPLATE.format(
- content="\n".join(f"{ip:<27} {domain}" for ip, domain in ipv4_results),
- update_time=update_time) if ipv4_results else ""
- print(f"Ipv4: {ipv4_hosts_content}")
- # 写入文件
- if ipv4_hosts_content:
- write_host_file(file_path, ipv4_hosts_content, 'ipv4')
- def main():
- # 运行异步主函数
- asyncio.run(main_async())
- if __name__ == "__main__":
- main()
|