openwrt-dns/gfwlist_to_dns.py
2025-04-20 03:22:12 +08:00

272 lines
7.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import json
import logging
import os.path
import re
import subprocess
import sys
from urllib.request import urlopen
PROXY_DNS_IP = '127.0.0.1'
PROXY_DNS_PORT = '5353'
DNSMASQ_RULES_FILE = '/tmp/dnsmasq.d/gfwlist.conf'
SMARTDNS_DOMAIN_SET_FILE = '/etc/smartdns/domain-set/gfwlist.conf'
# https://github.com/gfwlist/gfwlist
GFWLIST_URL_LIST = [
"https://raw.githubusercontent.com/gfwlist/gfwlist/master/gfwlist.txt",
"https://pagure.io/gfwlist/raw/master/f/gfwlist.txt",
"https://gitlab.com/gfwlist/gfwlist/raw/master/gfwlist.txt",
"https://git.tuxfamily.org/gfwlist/gfwlist.git/plain/gfwlist.txt",
"http://repo.or.cz/gfwlist.git/blob_plain/HEAD:/gfwlist.txt"
]
PWD = os.path.dirname(os.path.realpath(__file__))
def get_gfwlist_text() -> str:
for url in GFWLIST_URL_LIST:
try:
logging.info('request {url}'.format(url=url))
with urlopen(url, timeout=15) as responsee:
return base64.b64decode(responsee.read()).decode('utf-8')
except:
pass
raise IOError("can't download gfwlist")
def is_comment(line: str) -> bool:
comment_re = re.compile(r'^!|\[AutoProxy')
return bool(comment_re.match(line))
def has_ip(line: str) -> bool:
# https://stackoverflow.com/questions/5284147/validating-ipv4-addresses-with-regexp
# https://stackoverflow.com/questions/53497/regular-expression-that-matches-valid-ipv6-addresses
ipv4_re = re.compile(r'((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\b){4}')
ipv6_re = re.compile(
r'(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))')
return bool(ipv4_re.search(line)) or bool(ipv6_re.search(line))
def is_exception(line: str) -> bool:
exception_re = re.compile(r'^@@')
return bool(exception_re.match(line))
def is_regular(line: str) -> bool:
regular_re = re.compile(r'^/')
return bool(regular_re.match(line))
def gfwlist_line_filter(line: str) -> bool:
line = line.strip()
return (line != '') and (not is_comment(line)) and (not has_ip(line)) \
and (not is_exception(line)) and (not is_regular(line))
def gfwlist_line_converter(line: str) -> str:
raw_line = line
line = line.strip()
line = re.sub(r'/$', '', line)
def invalid_rule_return():
logging.debug('invalid rule: ' + raw_line)
return ""
def convert_asterisk(line: str) -> str:
asterisk_re = re.compile(r'^[\w\-_]*\*[\w\-_]*\.')
# 替换开头的 *.
if re.match(asterisk_re, line):
line = asterisk_re.sub("", line)
# 移除中间含 * 的规则
if '*' in line:
return invalid_rule_return()
return line
# ||global.bing.com
# ||cdn*.i-scmp.com
if line.startswith('||'):
line = line.replace('||', "")
return convert_asterisk(line)
# |http://www.dmm.com/netgame
# |http://bbs.cantonese.asia/
# |http://www.dmm.com/netgame
# |http://*.1mobile.tw
# |http://*2.bahamut.com.tw
if line.startswith('|'):
line = line.replace('|', '')
line = re.sub(r'^http(s)?://', '', line)
# 移除含有 path 的规则
if '/' in line:
return invalid_rule_return()
return convert_asterisk(line)
# .casinobellini.com
# share.dmhy.org
# .ddns.net/
# bbs.sina.com%2F
# .amazon.com/Dalai-Lama
# amazon.com/Prisoner-State-Secret-Journal-Premier
# .keepandshare.com/visit/visit_page.php?i=688154
# .pentoy.hk/%E6%99%82%E4%BA%8B
# .ruanyifeng.com/blog*some_ways_to_break_the_great_firewall
# prisoner-state-secret-journal-premier
# q%3Dfreedom
# search*safeweb
# q=triangle
# ultrareach
# 移除非域名规则
if '.' not in line:
return invalid_rule_return()
# 移除 http 协议头
line = re.sub(r'^http(s)?://', '', line)
# 移除含 path 、含 params 的规则
for m in ['/', '?', '=']:
if m in line:
return invalid_rule_return()
# 移除非asci字符
if re.search(r'%\w\w', line):
return invalid_rule_return()
line = convert_asterisk(line)
# 移除域名最开头的 .
if line.startswith('.'):
line = re.sub(r'^\.', "", line)
return line
def is_valid_hostname(domain: str) -> bool:
# https://stackoverflow.com/questions/1418423/the-hostname-regex
domain_re = re.compile(
r'^(?=.{1,255}$)[0-9A-Za-z](?:(?:[0-9A-Za-z]|-){0,61}[0-9A-Za-z])?(?:\.[0-9A-Za-z](?:(?:[0-9A-Za-z]|-){0,61}[0-9A-Za-z])?)*\.?$')
return bool(domain_re.match(domain))
def hosts_deduplicate(hosts: list[str]) -> list[str]:
hosts = list(set(hosts))
for h in hosts.copy():
if not is_valid_hostname(h):
logging.warning('{host} is invalid!'.format(host=h))
hosts.remove(h)
hosts_copy = hosts.copy()
for v in hosts_copy:
for k in hosts_copy:
if k != v and k.endswith('.' + v):
logging.debug('found duplicate: {k} {v}'.format(k=k, v=v))
hosts.remove(k)
return hosts
def get_gfwlist_hosts() -> list[str]:
gfwlist_text = get_gfwlist_text()
gfwlist_lines = gfwlist_text.splitlines()
gfwlist_hosts = list(
filter(
lambda line: line != "",
map(
gfwlist_line_converter,
filter(gfwlist_line_filter, gfwlist_lines)
)
)
)
return gfwlist_hosts
def get_custom_proxy_hosts() -> list[str]:
cph_path = os.path.join(PWD, 'custom_proxy_hosts.json')
if not os.path.exists(cph_path):
return []
else:
with open(cph_path, 'r') as f:
return json.load(f)
def get_proxy_hosts() -> list[str]:
proxy_hosts = [
*get_gfwlist_hosts(),
*get_custom_proxy_hosts()
]
proxy_hosts = hosts_deduplicate(proxy_hosts)
logging.info('found {num} proxy host'.format(num=len(proxy_hosts)))
return sorted(proxy_hosts)
def get_dnsmasq_text() -> str:
rule_list = list(
map(
lambda host: "server=/{host}/{dns_ip}#{dns_port}".format(
host=host, dns_ip=PROXY_DNS_IP, dns_port=PROXY_DNS_PORT
),
get_proxy_hosts()
)
)
return '\n'.join(rule_list)
def write_dnsmasq():
dnsmasq_text = get_dnsmasq_text()
with open(DNSMASQ_RULES_FILE, 'w') as f:
f.write(dnsmasq_text)
def reload_dnsmasq():
subprocess.run(["/etc/init.d/dnsmasq", "reload"])
def get_smartdns_domain_set() -> str:
return '\n'.join(get_proxy_hosts())
def write_smartdns_domain_set():
domain_set_text = get_smartdns_domain_set()
with open(SMARTDNS_DOMAIN_SET_FILE, 'w') as f:
f.write(domain_set_text)
def reload_openwrt_smartdns():
subprocess.run(["/etc/init.d/smartdns", "reload"])
def reload_pc_smartdns():
subprocess.run(["systemctl", "restart", "smartdns.service"])
def run_openwrt():
write_smartdns_domain_set()
reload_openwrt_smartdns()
write_dnsmasq()
reload_dnsmasq()
def run_pc():
write_smartdns_domain_set()
reload_pc_smartdns()
if __name__ == '__main__':
import argparse
logging.basicConfig(stream=sys.stderr, level=logging.INFO, format="%(levelname)s:%(message)s")
parser = argparse.ArgumentParser()
parser.add_argument("where", choices=["openwrt", "pc"], help="运行环境openwrt 或 pc")
args = parser.parse_args()
if args.where == "openwrt":
run_openwrt()
elif args.where == "pc":
run_pc()