dns_optimizer.py 9.5 KB


  1. import requests
  2. from time import sleep
  3. import time
  4. import os
  5. import re
  6. import sys
  7. from datetime import datetime, timezone, timedelta
  8. from retry import retry
  9. import socket
  10. # 原始脚本地址: https://github.com/cnwikee/CheckTMDB.git
  11. DOMAINS = [
  12. 'tmdb.org',
  13. 'api.tmdb.org',
  14. 'files.tmdb.org',
  15. 'themoviedb.org',
  16. 'api.themoviedb.org',
  17. 'www.themoviedb.org',
  18. 'auth.themoviedb.org',
  19. 'image.tmdb.org',
  20. 'images.tmdb.org',
  21. 'imdb.com',
  22. 'www.imdb.com',
  23. 'secure.imdb.com',
  24. 's.media-imdb.com',
  25. 'us.dd.imdb.com',
  26. 'www.imdb.to',
  27. 'origin-www.imdb.com',
  28. 'ia.media-imdb.com',
  29. 'thetvdb.com',
  30. 'api.thetvdb.com',
  31. 'ia.media-imdb.com',
  32. 'f.media-amazon.com',
  33. 'imdb-video.media-imdb.com'
  34. ]
  35. # 将 Tmdb_Host_TEMPLATE 修改为更通用的名称
  36. HOSTS_TEMPLATE = """# Fast DNS Hosts Start
  37. {content}
  38. # Update time: {update_time}
  39. # Fast DNS Hosts End\n"""
  40. def write_host_file(file_path: str, hosts_content: str, filename: str) -> None:
  41. # 修改文件名生成逻辑,使其更通用
  42. output_file_path = os.path.join(file_path, "fast_dns_hosts_" + filename)
  43. with open(output_file_path, "w", encoding='utf-8') as output_fb:
  44. output_fb.write(hosts_content)
  45. print(f"\n~最新{filename}地址已更新~")
  46. def ping_ip(ip, port=80):
  47. print(f"使用TCP连接测试IP地址的延迟(毫秒)")
  48. try:
  49. print(f"\n开始 ping {ip}...")
  50. start_time = time.time()
  51. with socket.create_connection((ip, port), timeout=2) as sock:
  52. latency = (time.time() - start_time) * 1000 # 转换为毫秒
  53. print(f"IP: {ip} 的平均延迟: {latency}ms")
  54. return latency
  55. except Exception as e:
  56. print(f"Ping {ip} 时发生错误: {str(e)}")
  57. return float('inf')
  58. def find_fastest_ip(ips):
  59. """找出延迟最低的IP地址"""
  60. if not ips:
  61. return None
  62. fastest_ip = None
  63. min_latency = float('inf')
  64. ip_latencies = [] # 存储所有IP及其延迟
  65. for ip in ips:
  66. ip = ip.strip()
  67. if not ip:
  68. continue
  69. print(f"正在测试 IP: {ip}")
  70. latency = ping_ip(ip)
  71. ip_latencies.append((ip, latency))
  72. print(f"IP: {ip} 延迟: {latency}ms")
  73. if latency < min_latency:
  74. min_latency = latency
  75. fastest_ip = ip
  76. sleep(0.5)
  77. print("\n所有IP延迟情况:")
  78. for ip, latency in ip_latencies:
  79. print(f"IP: {ip} - 延迟: {latency}ms")
  80. if fastest_ip:
  81. print(f"\n最快的IP是: {fastest_ip},延迟: {min_latency}ms")
  82. return fastest_ip
  83. # async def process_domain(session, domain, csrf_token, udp):
  84. # print(f"\n正在处理域名: {domain}")
  85. # ipv4_ips = await async_get_domain_ips(session, domain, csrf_token, udp, "A")
  86. #
  87. # if not ipv4_ips:
  88. # print(f"无法获取 {domain} 的IP列表,跳过该域名")
  89. # return None
  90. #
  91. # # 处理 IPv4 地址
  92. # if ipv4_ips:
  93. # fastest_ipv4 = find_fastest_ip(ipv4_ips)
  94. # if fastest_ipv4:
  95. # print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}")
  96. # return [fastest_ipv4, domain]
  97. # else:
  98. # return [ipv4_ips[0], domain]
  99. def validate_ip(ip):
  100. """
  101. 验证IP是否为合法的IPv4或IPv6地址
  102. :param ip: 待验证的IP字符串
  103. :return: True(合法)/False(非法)
  104. """
  105. # IPv4正则(严格验证:每个段0-255,无前置零(除0.0.0.0等合法场景))
  106. ipv4_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
  107. # IPv6正则(兼容压缩格式、本地链路地址、IPv4映射地址等所有合法格式)
  108. ipv6_pattern = r'^(?:(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9]))$'
  109. # 忽略大小写验证IPv6,优先验证IPv4
  110. if re.match(ipv4_pattern, ip):
  111. return True
  112. elif re.match(ipv6_pattern, ip, re.IGNORECASE):
  113. return True
  114. else:
  115. return False
  116. @retry(tries=3)
  117. def get_domain_ips(domain, record_type):
  118. """
  119. 从Google DNS获取域名的A/AAAA记录IP列表
  120. :param domain: 目标域名(如 tmdb.org)
  121. :param record_type: 记录类型(A/AAAA,或数字1/28)
  122. :return: 去重后的IP列表
  123. """
  124. all_ips = [] # 存储所有DNS服务器返回的IP
  125. print(f"正在从Google DNS获取 {domain} 的{record_type}记录...")
  126. url = f'https://cloudflare-dns.com/dns-query'
  127. headers = {
  128. "accept": "application/dns-json"
  129. # "accept-encoding": "gzip, deflate, br, zstd",
  130. # "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
  131. # "content-type": "application/json; charset=UTF-8", # 关键:指定JSON格式负载
  132. # "referer": f"https://dns.google/query?name={domain}&rr_type={record_type}&ecs=",
  133. # "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36 Edg/141.0.0.0"
  134. }
  135. params = {
  136. 'name': domain,
  137. 'type': record_type
  138. }
  139. # 初始化IP列表(默认空列表,确保后续使用安全)
  140. ips_str = []
  141. try:
  142. # 改用GET请求(Google DNS resolve接口标准用法)
  143. response = requests.get(url, headers=headers, params=params, timeout=10)
  144. response.raise_for_status() # 主动抛出HTTP错误(如4xx/5xx)
  145. # 解析JSON响应
  146. data = response.json()
  147. if not isinstance(data, dict):
  148. print("返回数据不是字典格式,无法解析")
  149. return all_ips
  150. # 核心:提取Answer数组中的data字段(IP地址)
  151. answer_list = data.get("Answer", [])
  152. if not answer_list:
  153. print(f"未找到 {domain} 的{record_type}记录(Answer字段为空)")
  154. return all_ips
  155. # 遍历Answer数组,提取每个条目的data值(IP)
  156. for answer in answer_list:
  157. ip = answer.get("data")
  158. if not ip: # 过滤空值
  159. continue
  160. # 验证IP格式合法性
  161. if validate_ip(ip):
  162. all_ips.append(ip)
  163. print(f"提取到合法IP:{ip}")
  164. else:
  165. print(f"跳过非法IP格式:{ip}")
  166. except requests.exceptions.RequestException as e:
  167. # 捕获所有网络/请求异常
  168. print(f"请求Google DNS失败:{e}")
  169. if hasattr(e, 'response') and e.response:
  170. print(f"响应内容:{e.response.text[:500]}") # 打印前500字符避免过长
  171. except ValueError:
  172. # JSON解析失败
  173. print(f"响应内容不是有效的JSON格式:{response.text[:500]}")
  174. time.sleep(1)
  175. # 去重并返回(保持列表格式)
  176. unique_ips = list(set(all_ips))
  177. print(f"最终提取到 {domain} 的{record_type}记录IP(去重后):{unique_ips}")
  178. return unique_ips
  179. def main():
  180. print("开始检测域名的最快IP...")
  181. file_path = "/ql/data"
  182. ipv4_ips, ipv6_ips, ipv4_results, ipv6_results = [], [], [], []
  183. for domain in DOMAINS:
  184. print(f"\n正在处理域名: {domain}")
  185. ipv4_ips = get_domain_ips(domain, "A")
  186. # ipv6_ips = get_domain_ips(domain, "AAAA")
  187. if not ipv4_ips and not ipv6_ips:
  188. print(f"无法获取 {domain} 的IP列表,跳过该域名")
  189. continue
  190. # 处理 IPv4 地址
  191. if ipv4_ips:
  192. fastest_ipv4 = find_fastest_ip(ipv4_ips)
  193. if fastest_ipv4:
  194. ipv4_results.append([fastest_ipv4, domain])
  195. print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}")
  196. else:
  197. ipv4_results.append([ipv4_ips[0], domain])
  198. # 处理 IPv6 地址
  199. # if ipv6_ips:
  200. # fastest_ipv6 = find_fastest_ip(ipv6_ips)
  201. # if fastest_ipv6:
  202. # ipv6_results.append([fastest_ipv6, domain])
  203. # print(f"域名 {domain} 的最快IPv6是: {fastest_ipv6}")
  204. # else:
  205. # # 兜底:可能存在无法正确获取 fastest_ipv6 的情况,则将第一个IP赋值
  206. # ipv6_results.append([ipv6_ips[0], domain])
  207. sleep(1) # 避免请求过于频繁
  208. # 保存结果到文件
  209. if not ipv4_results and not ipv6_results:
  210. print(f"程序出错:未获取任何domain及对应IP,请检查接口~")
  211. sys.exit(1)
  212. # 生成更新时间
  213. update_time = datetime.now(timezone(timedelta(hours=8))).replace(microsecond=0).isoformat()
  214. ipv4_hosts_content = HOSTS_TEMPLATE.format(
  215. content="\n".join(f"{ip:<27} {domain}" for ip, domain in ipv4_results),
  216. update_time=update_time) if ipv4_results else ""
  217. # ipv6_hosts_content = HOSTS_TEMPLATE.format(
  218. # content="\n".join(f"{ip:<50} {domain}" for ip, domain in ipv6_results),
  219. # update_time=update_time) if ipv6_results else ""
  220. # 写入文件
  221. if ipv4_hosts_content:
  222. write_host_file(file_path, ipv4_hosts_content, 'ipv4')
  223. if __name__ == "__main__":
  224. main()