dns_optimizer.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. import requests
  2. from time import sleep
  3. import random
  4. import time
  5. import os
  6. import sys
  7. from datetime import datetime, timezone, timedelta
  8. from retry import retry
  9. import socket
  10. import asyncio
  11. import aiohttp
  12. # 原始脚本地址: https://github.com/cnwikee/CheckTMDB.git
  13. country_code = 'jp' # 节点
  14. DOMAINS = [
  15. 'tmdb.org',
  16. 'api.tmdb.org',
  17. 'files.tmdb.org',
  18. 'themoviedb.org',
  19. 'api.themoviedb.org',
  20. 'www.themoviedb.org',
  21. 'auth.themoviedb.org',
  22. 'image.tmdb.org',
  23. 'images.tmdb.org',
  24. 'imdb.com',
  25. 'www.imdb.com',
  26. 'secure.imdb.com',
  27. 's.media-imdb.com',
  28. 'us.dd.imdb.com',
  29. 'www.imdb.to',
  30. 'origin-www.imdb.com',
  31. 'ia.media-imdb.com',
  32. 'thetvdb.com',
  33. 'api.thetvdb.com',
  34. 'ia.media-imdb.com',
  35. 'f.media-amazon.com',
  36. 'imdb-video.media-imdb.com'
  37. ]
  38. # 将 Tmdb_Host_TEMPLATE 修改为更通用的名称
  39. HOSTS_TEMPLATE = """# Fast DNS Hosts Start
  40. {content}
  41. # Update time: {update_time}
  42. # Fast DNS Hosts End\n"""
  43. def write_host_file(file_path: str, hosts_content: str, filename: str) -> None:
  44. # 修改文件名生成逻辑,使其更通用
  45. output_file_path = os.path.join(file_path, "fast_dns_hosts_" + filename)
  46. with open(output_file_path, "w", encoding='utf-8') as output_fb:
  47. output_fb.write(hosts_content)
  48. print(f"\n~最新{filename}地址已更新~")
  49. @retry(tries=3)
  50. def get_csrf_token(udp):
  51. """获取CSRF Token"""
  52. try:
  53. url = f'https://dnschecker.org/ajax_files/gen_csrf.php?udp={udp}'
  54. headers = {
  55. 'referer': 'https://dnschecker.org/country/{country_code}/',
  56. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0'
  57. }
  58. response = requests.get(url, headers=headers)
  59. if response.status_code == 200:
  60. csrf = response.json().get('csrf')
  61. print(f"获取到的CSRF Token: {csrf}")
  62. return csrf
  63. else:
  64. print(f"获取CSRF Token失败,HTTP状态码: {response.status_code}")
  65. return None
  66. except Exception as e:
  67. print(f"获取CSRF Token时发生错误: {str(e)}")
  68. return None
  69. async def async_get_domain_ips(session, domain, csrf_token, udp, argument):
  70. url = f'https://dnschecker.org/ajax_files/api/220/{argument}/{domain}?dns_key=country&dns_value={country_code}&v=0.36&cd_flag=1&upd={udp}'
  71. headers = {'csrftoken': csrf_token, 'referer': f'https://dnschecker.org/country/{country_code}/',
  72. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0'}
  73. try:
  74. async with session.get(url, headers=headers) as response:
  75. if response.status == 200:
  76. data = await response.json()
  77. if 'result' in data and 'ips' in data['result']:
  78. ips_str = data['result']['ips']
  79. if '<br />' in ips_str:
  80. return [ip.strip() for ip in ips_str.split('<br />') if ip.strip()]
  81. else:
  82. return [ips_str.strip()] if ips_str.strip() else []
  83. else:
  84. print(f"获取 {domain} 的IP列表失败:返回数据格式不正确")
  85. return []
  86. else:
  87. print(f"获取 {domain} 的IP列表失败,HTTP状态码: {response.status}")
  88. return []
  89. except Exception as e:
  90. print(f"获取 {domain} 的IP列表时发生错误: {str(e)}")
  91. return []
  92. def ping_ip(ip, port=80):
  93. print(f"使用TCP连接测试IP地址的延迟(毫秒)")
  94. try:
  95. print(f"\n开始 ping {ip}...")
  96. start_time = time.time()
  97. with socket.create_connection((ip, port), timeout=2) as sock:
  98. latency = (time.time() - start_time) * 1000 # 转换为毫秒
  99. print(f"IP: {ip} 的平均延迟: {latency}ms")
  100. return latency
  101. except Exception as e:
  102. print(f"Ping {ip} 时发生错误: {str(e)}")
  103. return float('inf')
  104. def find_fastest_ip(ips):
  105. """找出延迟最低的IP地址"""
  106. if not ips:
  107. return None
  108. fastest_ip = None
  109. min_latency = float('inf')
  110. ip_latencies = [] # 存储所有IP及其延迟
  111. for ip in ips:
  112. ip = ip.strip()
  113. if not ip:
  114. continue
  115. print(f"正在测试 IP: {ip}")
  116. latency = ping_ip(ip)
  117. ip_latencies.append((ip, latency))
  118. print(f"IP: {ip} 延迟: {latency}ms")
  119. if latency < min_latency:
  120. min_latency = latency
  121. fastest_ip = ip
  122. sleep(0.5)
  123. print("\n所有IP延迟情况:")
  124. for ip, latency in ip_latencies:
  125. print(f"IP: {ip} - 延迟: {latency}ms")
  126. if fastest_ip:
  127. print(f"\n最快的IP是: {fastest_ip},延迟: {min_latency}ms")
  128. return fastest_ip
  129. async def process_domain(session, domain, csrf_token, udp):
  130. print(f"\n正在处理域名: {domain}")
  131. ipv4_ips = await async_get_domain_ips(session, domain, csrf_token, udp, "A")
  132. if not ipv4_ips:
  133. print(f"无法获取 {domain} 的IP列表,跳过该域名")
  134. return None
  135. # 处理 IPv4 地址
  136. if ipv4_ips:
  137. fastest_ipv4 = find_fastest_ip(ipv4_ips)
  138. if fastest_ipv4:
  139. print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}")
  140. return [fastest_ipv4, domain]
  141. else:
  142. return [ipv4_ips[0], domain]
  143. async def main_async():
  144. print("开始检测域名的最快IP...")
  145. file_path = "/ql/data"
  146. udp = random.random() * 1000 + (int(time.time() * 1000) % 1000)
  147. # 获取CSRF Token
  148. csrf_token = get_csrf_token(udp)
  149. if not csrf_token:
  150. print("无法获取CSRF Token,程序退出")
  151. sys.exit(1)
  152. # 使用异步HTTP客户端
  153. async with aiohttp.ClientSession() as session:
  154. # 并行处理所有域名
  155. tasks = [process_domain(session, domain, csrf_token, udp) for domain in DOMAINS]
  156. results = await asyncio.gather(*tasks)
  157. # 过滤掉None结果(处理失败的域名)
  158. ipv4_results = [result for result in results if result is not None]
  159. # 保存结果到文件
  160. if not ipv4_results:
  161. print(f"程序出错:未获取任何domain及对应IP,请检查接口~")
  162. sys.exit(1)
  163. # 生成更新时间
  164. update_time = datetime.now(timezone(timedelta(hours=8))).replace(microsecond=0).isoformat()
  165. ipv4_hosts_content = HOSTS_TEMPLATE.format(
  166. content="\n".join(f"{ip:<27} {domain}" for ip, domain in ipv4_results),
  167. update_time=update_time) if ipv4_results else ""
  168. print(f"Ipv4: {ipv4_hosts_content}")
  169. # 写入文件
  170. if ipv4_hosts_content:
  171. write_host_file(file_path, ipv4_hosts_content, 'ipv4')
  172. def main():
  173. # 运行异步主函数
  174. asyncio.run(main_async())
  175. if __name__ == "__main__":
  176. main()