272 lines
7.9 KiB
Python
272 lines
7.9 KiB
Python
import base64
|
||
import json
|
||
import logging
|
||
import os.path
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
from urllib.request import urlopen
|
||
|
||
PROXY_DNS_IP = '127.0.0.1'
|
||
PROXY_DNS_PORT = '5353'
|
||
|
||
DNSMASQ_RULES_FILE = '/tmp/dnsmasq.d/gfwlist.conf'
|
||
SMARTDNS_DOMAIN_SET_FILE = '/etc/smartdns/domain-set/gfwlist.conf'
|
||
|
||
# https://github.com/gfwlist/gfwlist
|
||
GFWLIST_URL_LIST = [
|
||
"https://raw.githubusercontent.com/gfwlist/gfwlist/master/gfwlist.txt",
|
||
"https://pagure.io/gfwlist/raw/master/f/gfwlist.txt",
|
||
"https://gitlab.com/gfwlist/gfwlist/raw/master/gfwlist.txt",
|
||
"https://git.tuxfamily.org/gfwlist/gfwlist.git/plain/gfwlist.txt",
|
||
"http://repo.or.cz/gfwlist.git/blob_plain/HEAD:/gfwlist.txt"
|
||
]
|
||
|
||
PWD = os.path.dirname(os.path.realpath(__file__))
|
||
|
||
|
||
def get_gfwlist_text() -> str:
|
||
for url in GFWLIST_URL_LIST:
|
||
try:
|
||
logging.info('request {url}'.format(url=url))
|
||
with urlopen(url, timeout=15) as responsee:
|
||
return base64.b64decode(responsee.read()).decode('utf-8')
|
||
except:
|
||
pass
|
||
raise IOError("can't download gfwlist")
|
||
|
||
|
||
def is_comment(line: str) -> bool:
|
||
comment_re = re.compile(r'^!|\[AutoProxy')
|
||
return bool(comment_re.match(line))
|
||
|
||
|
||
def has_ip(line: str) -> bool:
|
||
# https://stackoverflow.com/questions/5284147/validating-ipv4-addresses-with-regexp
|
||
# https://stackoverflow.com/questions/53497/regular-expression-that-matches-valid-ipv6-addresses
|
||
ipv4_re = re.compile(r'((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\b){4}')
|
||
ipv6_re = re.compile(
|
||
r'(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))')
|
||
return bool(ipv4_re.search(line)) or bool(ipv6_re.search(line))
|
||
|
||
|
||
def is_exception(line: str) -> bool:
|
||
exception_re = re.compile(r'^@@')
|
||
return bool(exception_re.match(line))
|
||
|
||
|
||
def is_regular(line: str) -> bool:
|
||
regular_re = re.compile(r'^/')
|
||
return bool(regular_re.match(line))
|
||
|
||
|
||
def gfwlist_line_filter(line: str) -> bool:
|
||
line = line.strip()
|
||
return (line != '') and (not is_comment(line)) and (not has_ip(line)) \
|
||
and (not is_exception(line)) and (not is_regular(line))
|
||
|
||
|
||
def gfwlist_line_converter(line: str) -> str:
|
||
raw_line = line
|
||
|
||
line = line.strip()
|
||
line = re.sub(r'/$', '', line)
|
||
|
||
def invalid_rule_return():
|
||
logging.debug('invalid rule: ' + raw_line)
|
||
return ""
|
||
|
||
def convert_asterisk(line: str) -> str:
|
||
asterisk_re = re.compile(r'^[\w\-_]*\*[\w\-_]*\.')
|
||
# 替换开头的 *.
|
||
if re.match(asterisk_re, line):
|
||
line = asterisk_re.sub("", line)
|
||
# 移除中间含 * 的规则
|
||
if '*' in line:
|
||
return invalid_rule_return()
|
||
return line
|
||
|
||
# ||global.bing.com
|
||
# ||cdn*.i-scmp.com
|
||
if line.startswith('||'):
|
||
line = line.replace('||', "")
|
||
return convert_asterisk(line)
|
||
|
||
# |http://www.dmm.com/netgame
|
||
# |http://bbs.cantonese.asia/
|
||
# |http://www.dmm.com/netgame
|
||
# |http://*.1mobile.tw
|
||
# |http://*2.bahamut.com.tw
|
||
if line.startswith('|'):
|
||
line = line.replace('|', '')
|
||
line = re.sub(r'^http(s)?://', '', line)
|
||
# 移除含有 path 的规则
|
||
if '/' in line:
|
||
return invalid_rule_return()
|
||
return convert_asterisk(line)
|
||
|
||
# .casinobellini.com
|
||
# share.dmhy.org
|
||
# .ddns.net/
|
||
# bbs.sina.com%2F
|
||
# .amazon.com/Dalai-Lama
|
||
# amazon.com/Prisoner-State-Secret-Journal-Premier
|
||
# .keepandshare.com/visit/visit_page.php?i=688154
|
||
# .pentoy.hk/%E6%99%82%E4%BA%8B
|
||
# .ruanyifeng.com/blog*some_ways_to_break_the_great_firewall
|
||
# prisoner-state-secret-journal-premier
|
||
# q%3Dfreedom
|
||
# search*safeweb
|
||
# q=triangle
|
||
# ultrareach
|
||
|
||
# 移除非域名规则
|
||
if '.' not in line:
|
||
return invalid_rule_return()
|
||
|
||
# 移除 http 协议头
|
||
line = re.sub(r'^http(s)?://', '', line)
|
||
|
||
# 移除含 path 、含 params 的规则
|
||
for m in ['/', '?', '=']:
|
||
if m in line:
|
||
return invalid_rule_return()
|
||
# 移除非asci字符
|
||
if re.search(r'%\w\w', line):
|
||
return invalid_rule_return()
|
||
|
||
line = convert_asterisk(line)
|
||
|
||
# 移除域名最开头的 .
|
||
if line.startswith('.'):
|
||
line = re.sub(r'^\.', "", line)
|
||
|
||
return line
|
||
|
||
|
||
def is_valid_hostname(domain: str) -> bool:
|
||
# https://stackoverflow.com/questions/1418423/the-hostname-regex
|
||
domain_re = re.compile(
|
||
r'^(?=.{1,255}$)[0-9A-Za-z](?:(?:[0-9A-Za-z]|-){0,61}[0-9A-Za-z])?(?:\.[0-9A-Za-z](?:(?:[0-9A-Za-z]|-){0,61}[0-9A-Za-z])?)*\.?$')
|
||
return bool(domain_re.match(domain))
|
||
|
||
|
||
def hosts_deduplicate(hosts: list[str]) -> list[str]:
|
||
hosts = list(set(hosts))
|
||
for h in hosts.copy():
|
||
if not is_valid_hostname(h):
|
||
logging.warning('{host} is invalid!'.format(host=h))
|
||
hosts.remove(h)
|
||
|
||
hosts_copy = hosts.copy()
|
||
for v in hosts_copy:
|
||
for k in hosts_copy:
|
||
if k != v and k.endswith('.' + v):
|
||
logging.debug('found duplicate: {k} {v}'.format(k=k, v=v))
|
||
hosts.remove(k)
|
||
|
||
return hosts
|
||
|
||
|
||
def get_gfwlist_hosts() -> list[str]:
|
||
gfwlist_text = get_gfwlist_text()
|
||
gfwlist_lines = gfwlist_text.splitlines()
|
||
gfwlist_hosts = list(
|
||
filter(
|
||
lambda line: line != "",
|
||
map(
|
||
gfwlist_line_converter,
|
||
filter(gfwlist_line_filter, gfwlist_lines)
|
||
)
|
||
)
|
||
)
|
||
return gfwlist_hosts
|
||
|
||
|
||
def get_custom_proxy_hosts() -> list[str]:
|
||
cph_path = os.path.join(PWD, 'custom_proxy_hosts.json')
|
||
if not os.path.exists(cph_path):
|
||
return []
|
||
else:
|
||
with open(cph_path, 'r') as f:
|
||
return json.load(f)
|
||
|
||
|
||
def get_proxy_hosts() -> list[str]:
|
||
proxy_hosts = [
|
||
*get_gfwlist_hosts(),
|
||
*get_custom_proxy_hosts()
|
||
]
|
||
|
||
proxy_hosts = hosts_deduplicate(proxy_hosts)
|
||
logging.info('found {num} proxy host'.format(num=len(proxy_hosts)))
|
||
return sorted(proxy_hosts)
|
||
|
||
|
||
def get_dnsmasq_text() -> str:
|
||
rule_list = list(
|
||
map(
|
||
lambda host: "server=/{host}/{dns_ip}#{dns_port}".format(
|
||
host=host, dns_ip=PROXY_DNS_IP, dns_port=PROXY_DNS_PORT
|
||
),
|
||
get_proxy_hosts()
|
||
)
|
||
)
|
||
return '\n'.join(rule_list)
|
||
|
||
|
||
def write_dnsmasq():
|
||
dnsmasq_text = get_dnsmasq_text()
|
||
with open(DNSMASQ_RULES_FILE, 'w') as f:
|
||
f.write(dnsmasq_text)
|
||
|
||
|
||
def reload_dnsmasq():
|
||
subprocess.run(["/etc/init.d/dnsmasq", "reload"])
|
||
|
||
|
||
def get_smartdns_domain_set() -> str:
|
||
return '\n'.join(get_proxy_hosts())
|
||
|
||
|
||
def write_smartdns_domain_set():
|
||
domain_set_text = get_smartdns_domain_set()
|
||
with open(SMARTDNS_DOMAIN_SET_FILE, 'w') as f:
|
||
f.write(domain_set_text)
|
||
|
||
|
||
def reload_openwrt_smartdns():
|
||
subprocess.run(["/etc/init.d/smartdns", "reload"])
|
||
|
||
|
||
def reload_pc_smartdns():
|
||
subprocess.run(["systemctl", "restart", "smartdns.service"])
|
||
|
||
|
||
def run_openwrt():
|
||
write_smartdns_domain_set()
|
||
reload_openwrt_smartdns()
|
||
|
||
write_dnsmasq()
|
||
reload_dnsmasq()
|
||
|
||
|
||
def run_pc():
|
||
write_smartdns_domain_set()
|
||
reload_pc_smartdns()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
import argparse
|
||
|
||
logging.basicConfig(stream=sys.stderr, level=logging.INFO, format="%(levelname)s:%(message)s")
|
||
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("where", choices=["openwrt", "pc"], help="运行环境:openwrt 或 pc")
|
||
|
||
args = parser.parse_args()
|
||
if args.where == "openwrt":
|
||
run_openwrt()
|
||
elif args.where == "pc":
|
||
run_pc()
|