146 lines
3.7 KiB
Python
146 lines
3.7 KiB
Python
import logging
|
||
import subprocess
|
||
from http.client import HTTPException
|
||
from urllib.error import URLError
|
||
from urllib.request import urlopen, Request
|
||
|
||
from dnscrypt import parse, DNSoverHTTPS
|
||
|
||
SMARTDNS_GFW_CONF_FILE = '/etc/smartdns/conf.d/gfw-server.conf'
|
||
|
||
# https://github.com/DNSCrypt/dnscrypt-resolvers
|
||
PUBLIC_RESOLVER_URL_LIST = [
|
||
"https://download.dnscrypt.info/resolvers-list/v3/public-resolvers.md",
|
||
"https://raw.githubusercontent.com/DNSCrypt/dnscrypt-resolvers/master/v3/public-resolvers.md"
|
||
"https://dnsr.evilvibes.com/v3/public-resolvers.md"
|
||
]
|
||
|
||
|
||
def get_public_resolver_md() -> str:
|
||
for url in PUBLIC_RESOLVER_URL_LIST:
|
||
try:
|
||
logging.info('request {url}'.format(url=url))
|
||
with urlopen(url, timeout=15) as responsee:
|
||
return responsee.read().decode('utf-8')
|
||
except:
|
||
pass
|
||
raise IOError("can't download public-resolvers.md")
|
||
|
||
|
||
def get_stamps():
|
||
resolver_md: str = get_public_resolver_md()
|
||
lines = resolver_md.splitlines()
|
||
stamps = list(
|
||
map(
|
||
parse,
|
||
filter(lambda x: x.startswith('sdns://'), lines)
|
||
)
|
||
)
|
||
return stamps
|
||
|
||
|
||
def get_not_china_doh_list():
|
||
stamps = get_stamps()
|
||
|
||
def is_match(s) -> bool:
|
||
if isinstance(s, DNSoverHTTPS) is False:
|
||
return False
|
||
if s.nofilter is False:
|
||
return False
|
||
if s.dnssec is False:
|
||
return False
|
||
return True
|
||
|
||
return list(filter(
|
||
is_match,
|
||
stamps
|
||
))
|
||
|
||
|
||
# def build_dns_query(domain, record_type):
|
||
# # require: dnspython
|
||
# import dns.message
|
||
# dnsq = dns.message.make_query(
|
||
# qname=domain,
|
||
# rdtype=record_type,
|
||
# want_dnssec=False,
|
||
# )
|
||
# return dnsq
|
||
|
||
TESTED_URL = set()
|
||
|
||
|
||
def doh_tester(hostname, path):
|
||
# www.google.com A
|
||
dnsq = b'\xf9\x04\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x06google\x03com\x00\x00\x01\x00\x01'
|
||
url = 'https://' + hostname + path
|
||
if url in TESTED_URL:
|
||
return False
|
||
request = Request(url, data=dnsq)
|
||
request.add_header('accept', 'application/dns-message')
|
||
request.add_header('content-type', 'application/dns-message')
|
||
try:
|
||
with urlopen(request, timeout=3) as response:
|
||
response.read()
|
||
print('{url}\tok'.format(url=url))
|
||
TESTED_URL.add(url)
|
||
return True
|
||
except (URLError, TimeoutError, ConnectionError, HTTPException) as e:
|
||
print('{url}\tfailed\t{error}'.format(url=url, error=e))
|
||
TESTED_URL.add(url)
|
||
return False
|
||
|
||
|
||
def get_final_doh_list():
|
||
stamps = get_not_china_doh_list()
|
||
return list(filter(
|
||
lambda x: doh_tester(x.hostname, x.path),
|
||
stamps
|
||
))
|
||
|
||
|
||
def get_smartdns_config():
|
||
stamps = get_final_doh_list()
|
||
lines = set(map(
|
||
lambda x: 'server-https https://' + x.hostname + x.path + ' -group GFW -exclude-default-group',
|
||
stamps
|
||
))
|
||
return '\n'.join(lines)
|
||
|
||
|
||
def write_smartdns_config():
|
||
conf_txt = get_smartdns_config()
|
||
with open(SMARTDNS_GFW_CONF_FILE, 'w') as f:
|
||
f.write(conf_txt)
|
||
|
||
|
||
def reload_openwrt_smartdns():
|
||
subprocess.run(["/etc/init.d/smartdns", "reload"])
|
||
|
||
|
||
def reload_pc_smartdns():
|
||
subprocess.run(["systemctl", "restart", "smartdns.service"])
|
||
|
||
|
||
def run_openwrt():
|
||
write_smartdns_config()
|
||
reload_openwrt_smartdns()
|
||
|
||
|
||
def run_pc():
|
||
write_smartdns_config()
|
||
reload_pc_smartdns()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("where", choices=["openwrt", "pc"], help="运行环境:openwrt 或 pc")
|
||
|
||
args = parser.parse_args()
|
||
if args.where == "openwrt":
|
||
run_openwrt()
|
||
elif args.where == "pc":
|
||
run_pc()
|