| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 | import requestsfrom time import sleepimport randomimport timeimport osimport sysfrom datetime import datetime, timezone, timedeltafrom retry import retryimport socketimport asyncioimport aiohttpcountry_code = 'jp'  # 节点DOMAINS = [    'tmdb.org',    'api.tmdb.org',    'files.tmdb.org',    'themoviedb.org',    'api.themoviedb.org',    'www.themoviedb.org',    'auth.themoviedb.org',    'image.tmdb.org',    'images.tmdb.org',    'imdb.com',    'www.imdb.com',    'secure.imdb.com',    's.media-imdb.com',    'us.dd.imdb.com',    'www.imdb.to',    'origin-www.imdb.com',    'ia.media-imdb.com',    'thetvdb.com',    'api.thetvdb.com',    'ia.media-imdb.com',    'f.media-amazon.com',    'imdb-video.media-imdb.com']# 将 Tmdb_Host_TEMPLATE 修改为更通用的名称HOSTS_TEMPLATE = """# Fast DNS Hosts Start{content}# Update time: {update_time}# Fast DNS Hosts End\n"""def write_host_file(file_path: str, hosts_content: str, filename: str) -> None:    # 修改文件名生成逻辑,使其更通用    output_file_path = os.path.join(file_path, "fast_dns_hosts_" + filename)    with open(output_file_path, "w", encoding='utf-8') as output_fb:        output_fb.write(hosts_content)        print(f"\n~最新{filename}地址已更新~")@retry(tries=3)def get_csrf_token(udp):    """获取CSRF Token"""    try:        url = f'https://dnschecker.org/ajax_files/gen_csrf.php?udp={udp}'        headers = {            'referer': 'https://dnschecker.org/country/{country_code}/',            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0'        }        response = requests.get(url, headers=headers)        if response.status_code == 200:            csrf = response.json().get('csrf')            print(f"获取到的CSRF Token: {csrf}")            return csrf        else:            print(f"获取CSRF Token失败,HTTP状态码: {response.status_code}")            return None    except Exception as e:        print(f"获取CSRF Token时发生错误: {str(e)}")        return Noneasync def async_get_domain_ips(session, domain, csrf_token, udp, argument):    url = f'https://dnschecker.org/ajax_files/api/220/{argument}/{domain}?dns_key=country&dns_value={country_code}&v=0.36&cd_flag=1&upd={udp}'    headers = {'csrftoken': csrf_token, 'referer': f'https://dnschecker.org/country/{country_code}/',               'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0'}    try:        async with session.get(url, headers=headers) as response:            if response.status == 200:                data = await response.json()                if 'result' in data and 'ips' in data['result']:                    ips_str = data['result']['ips']                    if '<br />' in ips_str:                        return [ip.strip() for ip in ips_str.split('<br />') if ip.strip()]                    else:                        return [ips_str.strip()] if ips_str.strip() else []                else:                    print(f"获取 {domain} 的IP列表失败:返回数据格式不正确")                    return []            else:                print(f"获取 {domain} 的IP列表失败,HTTP状态码: {response.status}")                return []    except Exception as e:        print(f"获取 {domain} 的IP列表时发生错误: {str(e)}")        return []def ping_ip(ip, port=80):    print(f"使用TCP连接测试IP地址的延迟(毫秒)")    try:        print(f"\n开始 ping {ip}...")        start_time = time.time()        with socket.create_connection((ip, port), timeout=2) as sock:            latency = (time.time() - start_time) * 1000  # 转换为毫秒            print(f"IP: {ip} 的平均延迟: {latency}ms")            return latency    except Exception as e:        print(f"Ping {ip} 时发生错误: {str(e)}")        return float('inf')def find_fastest_ip(ips):    """找出延迟最低的IP地址"""    if not ips:        return None    fastest_ip = None    min_latency = float('inf')    ip_latencies = []  # 存储所有IP及其延迟    for ip in ips:        ip = ip.strip()        if not ip:            continue        print(f"正在测试 IP: {ip}")        latency = ping_ip(ip)        ip_latencies.append((ip, latency))        print(f"IP: {ip} 延迟: {latency}ms")        if latency < min_latency:            min_latency = latency            fastest_ip = ip        sleep(0.5)    print("\n所有IP延迟情况:")    for ip, latency in ip_latencies:        print(f"IP: {ip} - 延迟: {latency}ms")    if fastest_ip:        print(f"\n最快的IP是: {fastest_ip},延迟: {min_latency}ms")    return fastest_ipasync def process_domain(session, domain, csrf_token, udp):    print(f"\n正在处理域名: {domain}")    ipv4_ips = await async_get_domain_ips(session, domain, csrf_token, udp, "A")    if not ipv4_ips:        print(f"无法获取 {domain} 的IP列表,跳过该域名")        return None    # 处理 IPv4 地址    if ipv4_ips:        fastest_ipv4 = find_fastest_ip(ipv4_ips)        if fastest_ipv4:            print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}")            return [fastest_ipv4, domain]        else:            return [ipv4_ips[0], domain]async def main_async():    print("开始检测域名的最快IP...")    file_path = "/ql/data"    udp = random.random() * 1000 + (int(time.time() * 1000) % 1000)    # 获取CSRF Token    csrf_token = get_csrf_token(udp)    if not csrf_token:        print("无法获取CSRF Token,程序退出")        sys.exit(1)    # 使用异步HTTP客户端    async with aiohttp.ClientSession() as session:        # 并行处理所有域名        tasks = [process_domain(session, domain, csrf_token, udp) for domain in DOMAINS]        results = await asyncio.gather(*tasks)        # 过滤掉None结果(处理失败的域名)        ipv4_results = [result for result in results if result is not None]    # 保存结果到文件    if not ipv4_results:        print(f"程序出错:未获取任何domain及对应IP,请检查接口~")        sys.exit(1)    # 生成更新时间    update_time = datetime.now(timezone(timedelta(hours=8))).replace(microsecond=0).isoformat()    ipv4_hosts_content = HOSTS_TEMPLATE.format(        content="\n".join(f"{ip:<27} {domain}" for ip, domain in ipv4_results),        update_time=update_time) if ipv4_results else ""    print(f"Ipv4: {ipv4_hosts_content}")    # 写入文件    if ipv4_hosts_content:        write_host_file(file_path, ipv4_hosts_content, 'ipv4')def main():    # 运行异步主函数    asyncio.run(main_async())if __name__ == "__main__":    main()
 |