refactor dnscrypt parse

add test
This commit is contained in:
bgme 2025-02-07 16:50:00 +08:00
parent ee37b9e95e
commit a53b9c7a8e
10 changed files with 608 additions and 540 deletions

6
Makefile Normal file
View file

@ -0,0 +1,6 @@
test:
coverage run -m unittest discover testing; \
coverage report; \
coverage html
clean:
rm -r .coverage htmlcov

321
dnscrypt/__init__.py Normal file
View file

@ -0,0 +1,321 @@
import base64
import json
from .parser import props, LP_decode, LP_pk, VLP_hashs, VLP_bootstrap_ipi
# Document
# https://dnscrypt.info/stamps-specifications
# https://dnscrypt.info/protocol
# https://github.com/DNSCrypt/dnscrypt-resolvers/blob/21fcbaf858112c63fed2a504714cc829bd654483/utils/format.py#L101-L141
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
if b[0] == 0x01:
return DNSCrypt.parse(stamp)
elif b[0] == 0x02:
return DNSoverHTTPS.parse(stamp)
elif b[0] == 0x03:
return DNSoverTLS.parse(stamp)
elif b[0] == 0x04:
return DNSoverQUIC.parse(stamp)
elif b[0] == 0x05:
return ObliviousDoH.parse(stamp)
elif b[0] == 0x81:
return DNSCryptRelay.parse(stamp)
elif b[0] == 0x85:
return ObliviousDoHRelay.parse(stamp)
elif b[0] == 0x00:
return PlainDNS.parse(stamp)
class Base:
def to_json(self):
return json.dumps(
self,
default=lambda o: o.__dict__
)
def __str__(self):
return self.__class__.__name__ + '<' + self.to_json() + '>'
class DNSCrypt(Base):
dnssec: bool = False
nolog: bool = False
nofilter: bool = False
addr: str = None
pk: str = None
provider: str = None
@staticmethod
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x01
i = 0
if b[i] != 0x01:
raise ValueError("DNSCrypt: 0x01")
i = i + 1
parsed = DNSCrypt()
# props
i, parsed.dnssec, parsed.nolog, parsed.nofilter = props(b, i)
# LP(addr [:port])
i, parsed.addr = LP_decode(b, i)
# LP(pk)
i, parsed.pk = LP_pk(b, i)
# LP(providerName)
i, parsed.provider = LP_decode(b, i)
return parsed
class DNSoverHTTPS(Base):
dnssec: bool = False
nolog: bool = False
nofilter: bool = False
addr: str = None
hashs: list[str] = []
hostname: str = None
path: str = None
bootstrap_ipi: list[str] = []
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x02
i = 0
if b[i] != 0x02:
raise ValueError("DNSoverHTTPS: 0x02")
i = i + 1
parsed = DNSoverHTTPS()
# props
i, parsed.dnssec, parsed.nolog, parsed.nofilter = props(b, i)
# LP(addr)
i, parsed.addr = LP_decode(b, i)
# VLP(hash1, hash2, ...hashn)
i, parsed.hashs = VLP_hashs(b, i)
# LP(hostname [:port])
i, parsed.hostname = LP_decode(b, i)
# LP(path)
i, parsed.path = LP_decode(b, i)
# VLP(bootstrap_ip1, bootstrap_ip2, ...bootstrap_ipn) (optional)
if i < len(b):
i, parsed.bootstrap_ipi = VLP_bootstrap_ipi(b, i)
return parsed
class DNSoverTLS(Base):
dnssec: bool = False
nolog: bool = False
nofilter: bool = False
addr: str = None
hashs: list[str] = []
hostname: str = None
bootstrap_ipi: list[str] = []
@staticmethod
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x03
i = 0
if b[i] != 0x03:
raise ValueError()
i = i + 1
parsed = DNSoverTLS()
# props
i, parsed.dnssec, parsed.nolog, parsed.nofilter = props(b, i)
# LP(addr)
i, parsed.addr = LP_decode(b, i)
# VLP(hash1, hash2, ...hashn)
i, parsed.hashs = VLP_hashs(b, i)
# LP(hostname[:port])
i, parsed.hostname = LP_decode(b, i)
# VLP(bootstrap_ip1, bootstrap_ip2, ...bootstrap_ipn) (optional)
if i < len(b):
i, parsed.bootstrap_ipi = VLP_bootstrap_ipi(b, i)
return parsed
class DNSoverQUIC(Base):
dnssec: bool = False
nolog: bool = False
nofilter: bool = False
addr: str = None
hashs: list[str] = []
hostname: str = None
bootstrap_ipi: list[str] = []
@staticmethod
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x04
i = 0
if b[i] != 0x04:
raise ValueError()
i = i + 1
parsed = DNSoverQUIC()
# props
i, parsed.dnssec, parsed.nolog, parsed.nofilter = props(b, i)
# LP(addr)
i, parsed.addr = LP_decode(b, i)
# VLP(hash1, hash2, ...hashn)
i, parsed.hashs = VLP_hashs(b, i)
# LP(hostname[:port])
i, parsed.hostname = LP_decode(b, i)
# VLP(bootstrap_ip1, bootstrap_ip2, ...bootstrap_ipn) (optional)
if i < len(b):
i, parsed.bootstrap_ipi = VLP_bootstrap_ipi(b, i)
return parsed
class ObliviousDoH(Base):
dnssec: bool = False
nolog: bool = False
nofilter: bool = False
hostname: str = None
path: str = None
@staticmethod
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x05
i = 0
if b[i] != 0x05:
raise ValueError()
i = i + 1
parsed = ObliviousDoH()
# props
i, parsed.dnssec, parsed.nolog, parsed.nofilter = props(b, i)
# LP(hostname [:port])
i, parsed.hostname = LP_decode(b, i)
# LP(path)
i, parsed.path = LP_decode(b, i)
return parsed
class DNSCryptRelay(Base):
addr: str = None
@staticmethod
def parse(stamp):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x81
i = 0
if b[i] != 0x81:
raise ValueError()
i = i + 1
parsed = DNSCryptRelay()
# LP(addr)
i, parsed.addr = LP_decode(b, i)
return parsed
class ObliviousDoHRelay(Base):
dnssec: bool = False
nolog: bool = False
nofilter: bool = False
addr: str = None
hashs: list[str] = []
hostname: str = None
path: str = None
bootstrap_ipi: list[str] = []
@staticmethod
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x85
i = 0
if b[i] != 0x85:
raise ValueError()
i = i + 1
parsed = ObliviousDoHRelay()
# props
i, parsed.dnssec, parsed.nolog, parsed.nofilter = props(b, i)
# LP(addr)
i, parsed.addr = LP_decode(b, i)
# VLP(hash1, hash2, ...hashn)
i, parsed.hashs = VLP_hashs(b, i)
# LP(hostname [:port])
i, parsed.hostname = LP_decode(b, i)
# LP(path)
i, parsed.path = LP_decode(b, i)
# VLP(bootstrap_ip1, bootstrap_ip2, ...bootstrap_ipn) (optional)
if i < len(b):
i, parsed.bootstrap_ipi = VLP_bootstrap_ipi(b, i)
return parsed
class PlainDNS(Base):
dnssec: bool = False
nolog: bool = False
nofilter: bool = False
addr: str = None
@staticmethod
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x00
i = 0
if b[i] != 0x00:
raise ValueError()
i = i + 1
parsed = PlainDNS()
# props
i, parsed.dnssec, parsed.nolog, parsed.nofilter = props(b, i)
# LP(addr)
i, parsed.addr = LP_decode(b, i)
return parsed

91
dnscrypt/parser.py Normal file
View file

@ -0,0 +1,91 @@
def props(b: bytes, i: int):
'''
``props`` is a little-endian 64 bit value that represents informal properties about the resolver. It is a logical OR
combination of the following values:
- ``1``: the server supports DNSSEC
- ``2``: the server doesnt keep logs
- ``4``: the server doesnt intentionally block domains
For example, a server that supports DNSSEC, stores logs, but doesnt block anything on its own should set ``props``
as the following 8 bytes sequence: ``[ 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 ]``.
'''
props = b[i]
dnssec = not not ((props >> 0) & 1)
nolog = not not ((props >> 1) & 1)
nofilter = not not ((props >> 2) & 1)
i = i + 8
return i, dnssec, nolog, nofilter
def LP(b: bytes, i: int):
'''
``a || b`` is the concatenation of ``a`` and ``b``
``len(x)`` is a single byte representation of the length of ``x``, in bytes. Strings dont have to be zero-terminated
and do not require invidual encoding.
``LP(x)`` is ``len(x) || x``, i.e ``x`` prefixed by its length.
'''
x_len = b[i]
i = i + 1
x = b[i:i + x_len]
i = i + x_len
return i, x
def LP_decode(b: bytes, i: int, encoding='utf-8'):
i, x = LP(b, i)
return i, x.decode(encoding)
def LP_pk(b: bytes, i: int):
i, x = LP(b, i)
if len(x) != 32:
raise ValueError("LP(pk)")
hpk = x.hex().upper()
hpks = []
for j in range(0, 16):
hpks.append(hpk[j * 4: j * 4 + 4])
pk = ":".join(hpks)
return i, pk
def VLP(b: bytes, i: int):
'''
``a | b`` is the result of the logical ``OR`` operation between ``a`` and ``b``.
``vlen(x)`` is equal to ``len(x)`` if ``x`` is the last element of a set, and ``0x80 | len(x)`` if there are more
elements in the set.
``VLP(x1, x2, ...xn)`` encodes a set, as ``vlen(x1) || x1 || vlen(x2) || x2 ... || vlen(xn) || xn``.
Since ``vlen(xn) == len(xn)`` (length of the last element doesnt have the high bit set), for a set with a single
element, we have ``VLP(x) == LP(x)``.
'''
xx = []
last_element = False
while True:
if b[i] & 0x80 == 0:
last_element = True
x_len = b[i]
else:
x_len = b[i] ^ 0x80
i = i + 1
if x_len > 0:
x = b[i:i + x_len]
xx.append(x)
i = i + x_len
if last_element:
return i, xx
def VLP_hashs(b: bytes, i: int):
i, xx = VLP(b, i)
hashs = list(map(lambda x: x.hex(), xx))
return i, hashs
def VLP_bootstrap_ipi(b: bytes, i: int):
i, xx = VLP(b, i)
bootstrap_ipi = list(map(lambda x: x.decode("utf-8"), xx))
return i, bootstrap_ipi

View file

@ -1,8 +1,8 @@
import subprocess
import logging
import subprocess
from urllib.request import urlopen
from utils.dnsstamps import parse, DNSoverHTTPS
from dnscrypt import parse, DNSoverHTTPS
SMARTDNS_GFW_CONF_FILE = '/etc/smartdns/conf.d/gfw.conf'
@ -80,4 +80,4 @@ def main():
if __name__ == '__main__':
main()
main()

80
poetry.lock generated Normal file
View file

@ -0,0 +1,80 @@
# This file is automatically @generated by Poetry 1.8.5 and should not be changed by hand.
[[package]]
name = "coverage"
version = "7.6.10"
description = "Code coverage measurement for Python"
optional = false
python-versions = ">=3.9"
files = [
{ file = "coverage-7.6.10-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:5c912978f7fbf47ef99cec50c4401340436d200d41d714c7a4766f377c5b7b78" },
{ file = "coverage-7.6.10-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:a01ec4af7dfeb96ff0078ad9a48810bb0cc8abcb0115180c6013a6b26237626c" },
{ file = "coverage-7.6.10-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a3b204c11e2b2d883946fe1d97f89403aa1811df28ce0447439178cc7463448a" },
{ file = "coverage-7.6.10-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:32ee6d8491fcfc82652a37109f69dee9a830e9379166cb73c16d8dc5c2915165" },
{ file = "coverage-7.6.10-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:675cefc4c06e3b4c876b85bfb7c59c5e2218167bbd4da5075cbe3b5790a28988" },
{ file = "coverage-7.6.10-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:f4f620668dbc6f5e909a0946a877310fb3d57aea8198bde792aae369ee1c23b5" },
{ file = "coverage-7.6.10-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:4eea95ef275de7abaef630c9b2c002ffbc01918b726a39f5a4353916ec72d2f3" },
{ file = "coverage-7.6.10-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:e2f0280519e42b0a17550072861e0bc8a80a0870de260f9796157d3fca2733c5" },
{ file = "coverage-7.6.10-cp310-cp310-win32.whl", hash = "sha256:bc67deb76bc3717f22e765ab3e07ee9c7a5e26b9019ca19a3b063d9f4b874244" },
{ file = "coverage-7.6.10-cp310-cp310-win_amd64.whl", hash = "sha256:0f460286cb94036455e703c66988851d970fdfd8acc2a1122ab7f4f904e4029e" },
{ file = "coverage-7.6.10-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ea3c8f04b3e4af80e17bab607c386a830ffc2fb88a5484e1df756478cf70d1d3" },
{ file = "coverage-7.6.10-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:507a20fc863cae1d5720797761b42d2d87a04b3e5aeb682ef3b7332e90598f43" },
{ file = "coverage-7.6.10-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d37a84878285b903c0fe21ac8794c6dab58150e9359f1aaebbeddd6412d53132" },
{ file = "coverage-7.6.10-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a534738b47b0de1995f85f582d983d94031dffb48ab86c95bdf88dc62212142f" },
{ file = "coverage-7.6.10-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0d7a2bf79378d8fb8afaa994f91bfd8215134f8631d27eba3e0e2c13546ce994" },
{ file = "coverage-7.6.10-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:6713ba4b4ebc330f3def51df1d5d38fad60b66720948112f114968feb52d3f99" },
{ file = "coverage-7.6.10-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:ab32947f481f7e8c763fa2c92fd9f44eeb143e7610c4ca9ecd6a36adab4081bd" },
{ file = "coverage-7.6.10-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:7bbd8c8f1b115b892e34ba66a097b915d3871db7ce0e6b9901f462ff3a975377" },
{ file = "coverage-7.6.10-cp311-cp311-win32.whl", hash = "sha256:299e91b274c5c9cdb64cbdf1b3e4a8fe538a7a86acdd08fae52301b28ba297f8" },
{ file = "coverage-7.6.10-cp311-cp311-win_amd64.whl", hash = "sha256:489a01f94aa581dbd961f306e37d75d4ba16104bbfa2b0edb21d29b73be83609" },
{ file = "coverage-7.6.10-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:27c6e64726b307782fa5cbe531e7647aee385a29b2107cd87ba7c0105a5d3853" },
{ file = "coverage-7.6.10-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:c56e097019e72c373bae32d946ecf9858fda841e48d82df7e81c63ac25554078" },
{ file = "coverage-7.6.10-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c7827a5bc7bdb197b9e066cdf650b2887597ad124dd99777332776f7b7c7d0d0" },
{ file = "coverage-7.6.10-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:204a8238afe787323a8b47d8be4df89772d5c1e4651b9ffa808552bdf20e1d50" },
{ file = "coverage-7.6.10-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e67926f51821b8e9deb6426ff3164870976fe414d033ad90ea75e7ed0c2e5022" },
{ file = "coverage-7.6.10-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:e78b270eadb5702938c3dbe9367f878249b5ef9a2fcc5360ac7bff694310d17b" },
{ file = "coverage-7.6.10-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:714f942b9c15c3a7a5fe6876ce30af831c2ad4ce902410b7466b662358c852c0" },
{ file = "coverage-7.6.10-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:abb02e2f5a3187b2ac4cd46b8ced85a0858230b577ccb2c62c81482ca7d18852" },
{ file = "coverage-7.6.10-cp312-cp312-win32.whl", hash = "sha256:55b201b97286cf61f5e76063f9e2a1d8d2972fc2fcfd2c1272530172fd28c359" },
{ file = "coverage-7.6.10-cp312-cp312-win_amd64.whl", hash = "sha256:e4ae5ac5e0d1e4edfc9b4b57b4cbecd5bc266a6915c500f358817a8496739247" },
{ file = "coverage-7.6.10-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:05fca8ba6a87aabdd2d30d0b6c838b50510b56cdcfc604d40760dae7153b73d9" },
{ file = "coverage-7.6.10-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:9e80eba8801c386f72e0712a0453431259c45c3249f0009aff537a517b52942b" },
{ file = "coverage-7.6.10-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a372c89c939d57abe09e08c0578c1d212e7a678135d53aa16eec4430adc5e690" },
{ file = "coverage-7.6.10-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ec22b5e7fe7a0fa8509181c4aac1db48f3dd4d3a566131b313d1efc102892c18" },
{ file = "coverage-7.6.10-cp313-cp313-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:26bcf5c4df41cad1b19c84af71c22cbc9ea9a547fc973f1f2cc9a290002c8b3c" },
{ file = "coverage-7.6.10-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:4e4630c26b6084c9b3cb53b15bd488f30ceb50b73c35c5ad7871b869cb7365fd" },
{ file = "coverage-7.6.10-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:2396e8116db77789f819d2bc8a7e200232b7a282c66e0ae2d2cd84581a89757e" },
{ file = "coverage-7.6.10-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:79109c70cc0882e4d2d002fe69a24aa504dec0cc17169b3c7f41a1d341a73694" },
{ file = "coverage-7.6.10-cp313-cp313-win32.whl", hash = "sha256:9e1747bab246d6ff2c4f28b4d186b205adced9f7bd9dc362051cc37c4a0c7bd6" },
{ file = "coverage-7.6.10-cp313-cp313-win_amd64.whl", hash = "sha256:254f1a3b1eef5f7ed23ef265eaa89c65c8c5b6b257327c149db1ca9d4a35f25e" },
{ file = "coverage-7.6.10-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:2ccf240eb719789cedbb9fd1338055de2761088202a9a0b73032857e53f612fe" },
{ file = "coverage-7.6.10-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:0c807ca74d5a5e64427c8805de15b9ca140bba13572d6d74e262f46f50b13273" },
{ file = "coverage-7.6.10-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2bcfa46d7709b5a7ffe089075799b902020b62e7ee56ebaed2f4bdac04c508d8" },
{ file = "coverage-7.6.10-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:4e0de1e902669dccbf80b0415fb6b43d27edca2fbd48c74da378923b05316098" },
{ file = "coverage-7.6.10-cp313-cp313t-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3f7b444c42bbc533aaae6b5a2166fd1a797cdb5eb58ee51a92bee1eb94a1e1cb" },
{ file = "coverage-7.6.10-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:b330368cb99ef72fcd2dc3ed260adf67b31499584dc8a20225e85bfe6f6cfed0" },
{ file = "coverage-7.6.10-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:9a7cfb50515f87f7ed30bc882f68812fd98bc2852957df69f3003d22a2aa0abf" },
{ file = "coverage-7.6.10-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:6f93531882a5f68c28090f901b1d135de61b56331bba82028489bc51bdd818d2" },
{ file = "coverage-7.6.10-cp313-cp313t-win32.whl", hash = "sha256:89d76815a26197c858f53c7f6a656686ec392b25991f9e409bcef020cd532312" },
{ file = "coverage-7.6.10-cp313-cp313t-win_amd64.whl", hash = "sha256:54a5f0f43950a36312155dae55c505a76cd7f2b12d26abeebbe7a0b36dbc868d" },
{ file = "coverage-7.6.10-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:656c82b8a0ead8bba147de9a89bda95064874c91a3ed43a00e687f23cc19d53a" },
{ file = "coverage-7.6.10-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ccc2b70a7ed475c68ceb548bf69cec1e27305c1c2606a5eb7c3afff56a1b3b27" },
{ file = "coverage-7.6.10-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a5e37dc41d57ceba70956fa2fc5b63c26dba863c946ace9705f8eca99daecdc4" },
{ file = "coverage-7.6.10-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0aa9692b4fdd83a4647eeb7db46410ea1322b5ed94cd1715ef09d1d5922ba87f" },
{ file = "coverage-7.6.10-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:aa744da1820678b475e4ba3dfd994c321c5b13381d1041fe9c608620e6676e25" },
{ file = "coverage-7.6.10-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:c0b1818063dc9e9d838c09e3a473c1422f517889436dd980f5d721899e66f315" },
{ file = "coverage-7.6.10-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:59af35558ba08b758aec4d56182b222976330ef8d2feacbb93964f576a7e7a90" },
{ file = "coverage-7.6.10-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:7ed2f37cfce1ce101e6dffdfd1c99e729dd2ffc291d02d3e2d0af8b53d13840d" },
{ file = "coverage-7.6.10-cp39-cp39-win32.whl", hash = "sha256:4bcc276261505d82f0ad426870c3b12cb177752834a633e737ec5ee79bbdff18" },
{ file = "coverage-7.6.10-cp39-cp39-win_amd64.whl", hash = "sha256:457574f4599d2b00f7f637a0700a6422243b3565509457b2dbd3f50703e11f59" },
{ file = "coverage-7.6.10-pp39.pp310-none-any.whl", hash = "sha256:fd34e7b3405f0cc7ab03d54a334c17a9e802897580d964bd8c2001f4b9fd488f" },
{ file = "coverage-7.6.10.tar.gz", hash = "sha256:7fb105327c8f8f0682e29843e2ff96af9dcbe5bab8eeb4b398c6a33a16d80a23" },
]
[package.extras]
toml = ["tomli"]
[metadata]
lock-version = "2.0"
python-versions = "^3.13"
content-hash = "bb288bb587807b6be40745ef9083750cdf5bf09de9a5d44fb95d778b7aaa69b3"

17
pyproject.toml Normal file
View file

@ -0,0 +1,17 @@
[tool.poetry]
name = "openwrt-dns"
version = "0.1.0"
description = ""
authors = ["bgme <i@bgme.me>"]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.13"
[tool.poetry.group.dev.dependencies]
coverage = "^7.6.10"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

57
testing/test_dnscrypt.py Normal file
View file

@ -0,0 +1,57 @@
import unittest
from dnscrypt import parse, DNSCrypt, DNSoverHTTPS, DNSoverTLS, DNSoverQUIC, ObliviousDoH, DNSCryptRelay, \
ObliviousDoHRelay, PlainDNS
def comman_parse_test(test_case, stamp, instance, except_result_json):
result = parse(stamp)
test_case.assertIsInstance(result, instance)
test_case.assertEqual(except_result_json, result.to_json())
class TestDNSCryptStampParse(unittest.TestCase):
def test_DNSCrypt(self):
stamp = "sdns://AQMAAAAAAAAAETk0LjE0MC4xNC4xNTo1NDQzILgxXdexS27jIKRw3C7Wsao5jMnlhvhdRUXWuMm1AFq6ITIuZG5zY3J5cHQuZmFtaWx5Lm5zMS5hZGd1YXJkLmNvbQ"
except_result_json = '{"dnssec": true, "nolog": true, "nofilter": false, "addr": "94.140.14.15:5443", "pk": "B831:5DD7:B14B:6EE3:20A4:70DC:2ED6:B1AA:398C:C9E5:86F8:5D45:45D6:B8C9:B500:5ABA", "provider": "2.dnscrypt.family.ns1.adguard.com"}'
comman_parse_test(self, stamp, DNSCrypt, except_result_json)
def test_DNSoverHTTPS(self):
stamp = "sdns://AgMAAAAAAAAADjE2My40Ny4xMTcuMTc2oMwQYNOcgym2K2-8fQ1t-TCYabmB5-Y5LVzY-kCPTYDmoPf1ryiAHod9ffOivij-FJ8ydKftKfE2_VA845jLqAsNoLNeBZUM-9gln5N1uhAYcLjDxMDsWlKXV-YxZ-neJqnooEROvWe7g_iAezkh6TiskXi4gr1QqtsRIx8ETPXwjffOoOZEumlj4zX-dly5l2sSsQ61QpS0JHd2TMs6OsyjrLL8ICquP7e_BeTIHEGU3KRFEdT5rzBHhuwa5yGECc9ioINVEGFkbC5hZGZpbHRlci5uZXQKL2Rucy1xdWVyeQ"
except_result_json = '{"dnssec": true, "nolog": true, "nofilter": false, "addr": "163.47.117.176", "hashs": ["cc1060d39c8329b62b6fbc7d0d6df9309869b981e7e6392d5cd8fa408f4d80e6", "f7f5af28801e877d7df3a2be28fe149f3274a7ed29f136fd503ce398cba80b0d", "b35e05950cfbd8259f9375ba101870b8c3c4c0ec5a529757e63167e9de26a9e8", "444ebd67bb83f8807b3921e938ac9178b882bd50aadb11231f044cf5f08df7ce", "e644ba6963e335fe765cb9976b12b10eb54294b42477764ccb3a3acca3acb2fc", "2aae3fb7bf05e4c81c4194dca44511d4f9af304786ec1ae7218409cf62a08355"], "hostname": "adl.adfilter.net", "path": "/dns-query"}'
comman_parse_test(self, stamp, DNSoverHTTPS, except_result_json)
def test_DNSoverTLS(self):
stamp = "sdns://AwcAAAAAAAAABzEuMS4xLjEAD29uZS5vbmUub25lLm9uZQ"
except_result_json = '{"dnssec": true, "nolog": true, "nofilter": true, "addr": "1.1.1.1", "hashs": [], "hostname": "one.one.one.one"}'
comman_parse_test(self, stamp, DNSoverTLS, except_result_json)
def test_DNSoverQUIC(self):
stamp = "sdns://BAcAAAAAAAAABzEuMS4xLjEAD29uZS5vbmUub25lLm9uZQ"
except_result_json = '{"dnssec": true, "nolog": true, "nofilter": true, "addr": "1.1.1.1", "hashs": [], "hostname": "one.one.one.one"}'
comman_parse_test(self, stamp, DNSoverQUIC, except_result_json)
def test_ObliviousDoH(self):
stamp = "sdns://BQcAAAAAAAAADWpwLnRpYXJhcC5vcmcFL29kb2g"
except_result_json = '{"dnssec": true, "nolog": true, "nofilter": true, "hostname": "jp.tiarap.org", "path": "/odoh"}'
comman_parse_test(self, stamp, ObliviousDoH, except_result_json)
def test_DNSCryptRelay(self):
stamp = "sdns://gQ04Ni4xMDYuNzQuMjE5"
except_result_json = '{"addr": "86.106.74.219"}'
comman_parse_test(self, stamp, DNSCryptRelay, except_result_json)
def test_ObliviousDoHRelay(self):
stamp = "sdns://hQcAAAAAAAAADDg5LjM4LjEzMS4zOAAYb2RvaC1ubC5hbGVrYmVyZy5uZXQ6NDQzBi9wcm94eQ"
except_result_json = '{"dnssec": true, "nolog": true, "nofilter": true, "addr": "89.38.131.38", "hashs": [], "hostname": "odoh-nl.alekberg.net:443", "path": "/proxy"}'
comman_parse_test(self, stamp, ObliviousDoHRelay, except_result_json)
def test_PlainDNS(self):
stamp = "sdns://AAUAAAAAAAAABzEuMS4xLjE"
except_result_json = '{"dnssec": true, "nolog": false, "nofilter": true, "addr": "1.1.1.1"}'
comman_parse_test(self, stamp, PlainDNS, except_result_json)
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,14 @@
import unittest
from dnscrypt_to_smartdns import get_smartdns_config
class MyTestCase(unittest.TestCase):
def test_get_smartdns_config(self):
conf_text = get_smartdns_config()
# print(conf_text)
self.assertIsInstance(conf_text, str)
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,19 @@
import unittest
from gfwlist_to_dns import get_dnsmasq_text, get_smartdns_domain_set
class MyTestCase(unittest.TestCase):
def test_get_dnsmasq_text(self):
dnsmasq_text = get_dnsmasq_text()
# print(dnsmasq_text)
self.assertIsInstance(dnsmasq_text, str)
def test_get_smartdns_domain_set(self):
domain_set = get_smartdns_domain_set()
# print(domain_set)
self.assertIsInstance(domain_set, str)
if __name__ == '__main__':
unittest.main()

View file

@ -1,537 +0,0 @@
import base64
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
if b[0] == 0x01:
return DNSCrypt.parse(stamp)
elif b[0] == 0x02:
return DNSoverHTTPS.parse(stamp)
elif b[0] == 0x03:
return DNSoverTLS.parse(stamp)
elif b[0] == 0x04:
return DNSoverQUIC.parse(stamp)
elif b[0] == 0x05:
return ObliviousDoH.parse(stamp)
elif b[0] == 0x81:
return DNSCryptRelay.parse(stamp)
elif b[0] == 0x85:
return ObliviousDoHrelay.parse(stamp)
elif b[0] == 0x00:
return PlainDNS.parse(stamp)
# Code from https://github.com/DNSCrypt/dnscrypt-resolvers/blob/21fcbaf858112c63fed2a504714cc829bd654483/utils/format.py#L101-L141
class DNSCrypt:
dnssec: bool = False
nolog: bool = False
nofilter: bool = False
addr: str = None
pk: str = None
provider: str = None
@staticmethod
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x01
i = 0
if b[i] != 0x01:
raise ValueError()
i = i + 1
parsed = DNSCrypt()
# props
props = b[i]
parsed.dnssec = not not ((props >> 0) & 1)
parsed.nolog = not not ((props >> 1) & 1)
parsed.nofilter = not not ((props >> 2) & 1)
i = i + 8
# LP(addr [:port])
addr_len = b[i]
i = i + 1
parsed.addr = b[i:i + addr_len].decode("utf-8")
i = i + addr_len
# LP(pk)
pk_len = b[i]
i = i + 1
if pk_len != 32:
raise ValueError()
hpk = b[i:i + pk_len].hex().upper()
hpks = []
for j in range(0, 16):
hpks.append(hpk[j * 4: j * 4 + 4])
parsed.pk = ":".join(hpks)
i = i + pk_len
# LP(providerName)
provider_len = b[i]
i = i + 1
parsed.provider = b[i:i + provider_len].decode("utf-8")
i = i + provider_len
return parsed
class DNSoverHTTPS:
dnssec: bool = False
nolog: bool = False
nofilter: bool = False
addr: str = None
hashs: list[str] = []
hostname: str = None
path: str = None
bootstrap_ipi: list[str] = []
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x02
i = 0
if b[i] != 0x02:
raise ValueError()
i = i + 1
parsed = DNSoverHTTPS()
# props
props = b[i]
parsed.dnssec = not not ((props >> 0) & 1)
parsed.nolog = not not ((props >> 1) & 1)
parsed.nofilter = not not ((props >> 2) & 1)
i = i + 8
# LP(addr)
addr_len = b[i]
i = i + 1
parsed.addr = b[i:i + addr_len].decode("utf-8")
i = i + addr_len
# VLP(hash1, hash2, ...hashn)
last_element = False
while True:
if b[i] & 0x80 == 0:
last_element = True
hashx_len = b[i]
else:
hashx_len = b[i] ^ 0x80
if hashx_len != 0 and hashx_len != 32:
raise ValueError()
i = i + 1
if hashx_len > 0:
hashx = b[i:i + hashx_len].hex()
parsed.hashs.append(hashx)
i = i + hashx_len
if last_element:
break
# LP(hostname [:port])
hostname_len = b[i]
i = i + 1
parsed.hostname = b[i:i + hostname_len].decode("utf-8")
i = i + hostname_len
# LP(path)
path_len = b[i]
i = i + 1
parsed.path = b[i:i + path_len].decode("utf-8")
i = i + path_len
# VLP(bootstrap_ip1, bootstrap_ip2, ...bootstrap_ipn) (optional)
if i < len(b):
last_element = False
while True:
if b[i] & 0x80 == 0:
last_element = True
bootstrap_ipx_len = b[i]
else:
bootstrap_ipx_len = b[i] ^ 0x80
i = i + 1
if bootstrap_ipx_len > 0:
bootstrap_ipx = b[i:i + bootstrap_ipx_len].decode("utf-8")
parsed.bootstrap_ipi.append(bootstrap_ipx)
i = i + bootstrap_ipx_len
if last_element:
break
return parsed
class DNSoverTLS:
dnssec: bool = False
nolog: bool = False
nofilter: bool = False
addr: str = None
hashs: list[str] = []
hostname: str = None
bootstrap_ipi: list[str] = []
@staticmethod
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x03
i = 0
if b[i] != 0x03:
raise ValueError()
i = i + 1
parsed = DNSoverTLS()
# props
props = b[i]
parsed.dnssec = not not ((props >> 0) & 1)
parsed.nolog = not not ((props >> 1) & 1)
parsed.nofilter = not not ((props >> 2) & 1)
i = i + 8
# LP(addr)
addr_len = b[i]
i = i + 1
parsed.addr = b[i:i + addr_len].decode("utf-8")
i = i + addr_len
# VLP(hash1, hash2, ...hashn)
last_element = False
while True:
if b[i] & 0x80 == 0:
last_element = True
hashx_len = b[i]
else:
hashx_len = b[i] ^ 0x80
if hashx_len != 0 and hashx_len != 32:
raise ValueError()
i = i + 1
if hashx_len > 0:
hashx = b[i:i + hashx_len].hex()
parsed.hashs.append(hashx)
i = i + hashx_len
if last_element:
break
# LP(hostname[:port])
hostname_len = b[i]
i = i + 1
parsed.hostname = b[i:i + hostname_len].decode("utf-8")
i = i + hostname_len
# VLP(bootstrap_ip1, bootstrap_ip2, ...bootstrap_ipn) (optional)
if i < len(b):
last_element = False
while True:
if b[i] & 0x80 == 0:
last_element = True
bootstrap_ipx_len = b[i]
else:
bootstrap_ipx_len = b[i] ^ 0x80
i = i + 1
if bootstrap_ipx_len > 0:
bootstrap_ipx = b[i:i + bootstrap_ipx_len].decode("utf-8")
parsed.bootstrap_ipi.append(bootstrap_ipx)
i = i + bootstrap_ipx_len
if last_element:
break
return parsed
class DNSoverQUIC:
dnssec: bool = False
nolog: bool = False
nofilter: bool = False
addr: str = None
hashs: list[str] = []
hostname: str = None
bootstrap_ipi: list[str] = []
@staticmethod
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x04
i = 0
if b[i] != 0x04:
raise ValueError()
i = i + 1
parsed = DNSoverQUIC()
# props
props = b[i]
parsed.dnssec = not not ((props >> 0) & 1)
parsed.nolog = not not ((props >> 1) & 1)
parsed.nofilter = not not ((props >> 2) & 1)
i = i + 8
# LP(addr)
addr_len = b[i]
i = i + 1
parsed.addr = b[i:i + addr_len].decode("utf-8")
i = i + addr_len
# VLP(hash1, hash2, ...hashn)
last_element = False
while True:
if b[i] & 0x80 == 0:
last_element = True
hashx_len = b[i]
else:
hashx_len = b[i] ^ 0x80
if hashx_len != 0 and hashx_len != 32:
raise ValueError()
i = i + 1
if hashx_len > 0:
hashx = b[i:i + hashx_len].hex()
parsed.hashs.append(hashx)
i = i + hashx_len
if last_element:
break
# LP(hostname[:port])
hostname_len = b[i]
i = i + 1
parsed.hostname = b[i:i + hostname_len].decode("utf-8")
i = i + hostname_len
# VLP(bootstrap_ip1, bootstrap_ip2, ...bootstrap_ipn) (optional)
if i < len(b):
last_element = False
while True:
if b[i] & 0x80 == 0:
last_element = True
bootstrap_ipx_len = b[i]
else:
bootstrap_ipx_len = b[i] ^ 0x80
i = i + 1
if bootstrap_ipx_len > 0:
bootstrap_ipx = b[i:i + bootstrap_ipx_len].decode("utf-8")
parsed.bootstrap_ipi.append(bootstrap_ipx)
i = i + bootstrap_ipx_len
if last_element:
break
return parsed
class ObliviousDoH:
dnssec: bool = False
nolog: bool = False
nofilter: bool = False
hostname: str = None
path: str = None
@staticmethod
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x05
i = 0
if b[i] != 0x05:
raise ValueError()
i = i + 1
parsed = ObliviousDoH()
# props
props = b[i]
parsed.dnssec = not not ((props >> 0) & 1)
parsed.nolog = not not ((props >> 1) & 1)
parsed.nofilter = not not ((props >> 2) & 1)
i = i + 8
# LP(hostname [:port])
hostname_len = b[i]
i = i + 1
parsed.hostname = b[i:i + hostname_len].decode("utf-8")
i = i + hostname_len
# LP(path)
path_len = b[i]
i = i + 1
parsed.path = b[i:i + path_len].decode("utf-8")
i = i + path_len
return parsed
class DNSCryptRelay:
addr: str = None
@staticmethod
def parse(stamp):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x81
i = 0
if b[i] != 0x81:
raise ValueError()
i = i + 1
parsed = DNSCryptRelay()
# LP(addr)
addr_len = b[i]
i = i + 1
parsed.addr = b[i:i + addr_len].decode("utf-8")
i = i + addr_len
return parsed
class ObliviousDoHrelay:
dnssec: bool = False
nolog: bool = False
nofilter: bool = False
addr: str = None
hashs: list[str] = []
hostname: str = None
path: str = None
bootstrap_ipi: list[str] = []
@staticmethod
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x85
i = 0
if b[i] != 0x85:
raise ValueError()
i = i + 1
parsed = ObliviousDoHrelay()
# props
props = b[i]
parsed.dnssec = not not ((props >> 0) & 1)
parsed.nolog = not not ((props >> 1) & 1)
parsed.nofilter = not not ((props >> 2) & 1)
i = i + 8
# LP(addr)
addr_len = b[i]
i = i + 1
parsed.addr = b[i:i + addr_len].decode("utf-8")
i = i + addr_len
# VLP(hash1, hash2, ...hashn)
last_element = False
while True:
if b[i] & 0x80 == 0:
last_element = True
hashx_len = b[i]
else:
hashx_len = b[i] ^ 0x80
if hashx_len != 0 and hashx_len != 32:
raise ValueError()
i = i + 1
if hashx_len > 0:
hashx = b[i:i + hashx_len].hex()
parsed.hashs.append(hashx)
i = i + hashx_len
if last_element:
break
# LP(hostname [:port])
hostname_len = b[i]
i = i + 1
parsed.hostname = b[i:i + hostname_len].decode("utf-8")
i = i + hostname_len
# LP(path)
path_len = b[i]
i = i + 1
parsed.path = b[i:i + path_len].decode("utf-8")
i = i + path_len
# VLP(bootstrap_ip1, bootstrap_ip2, ...bootstrap_ipn) (optional)
if i < len(b):
last_element = False
while True:
if b[i] & 0x80 == 0:
last_element = True
bootstrap_ipx_len = b[i]
else:
bootstrap_ipx_len = b[i] ^ 0x80
i = i + 1
if bootstrap_ipx_len > 0:
bootstrap_ipx = b[i:i + bootstrap_ipx_len].decode("utf-8")
parsed.bootstrap_ipi.append(bootstrap_ipx)
i = i + bootstrap_ipx_len
if last_element:
break
return parsed
class PlainDNS:
dnssec: bool = False
nolog: bool = False
nofilter: bool = False
addr: str = None
@staticmethod
def parse(stamp: str):
b = base64.urlsafe_b64decode(stamp.removeprefix("sdns://") + "==")
# 0x00
i = 0
if b[i] != 0x00:
raise ValueError()
i = i + 1
parsed = PlainDNS()
# props
props = b[i]
parsed.dnssec = not not ((props >> 0) & 1)
parsed.nolog = not not ((props >> 1) & 1)
parsed.nofilter = not not ((props >> 2) & 1)
i = i + 8
# LP(addr)
addr_len = b[i]
i = i + 1
parsed.addr = b[i:i + addr_len].decode("utf-8")
i = i + addr_len
return parsed
if __name__ == '__main__':
# DNSCrypt
t = parse(
"sdns://AQMAAAAAAAAAETk0LjE0MC4xNC4xNTo1NDQzILgxXdexS27jIKRw3C7Wsao5jMnlhvhdRUXWuMm1AFq6ITIuZG5zY3J5cHQuZmFtaWx5Lm5zMS5hZGd1YXJkLmNvbQ")
print(t)
# DoH
t = parse("sdns://AgcAAAAAAAAADTIxNy4xNjkuMjAuMjIADWRucy5hYS5uZXQudWsKL2Rucy1xdWVyeQ")
t = parse(
"sdns://AgMAAAAAAAAADjE2My40Ny4xMTcuMTc2oMwQYNOcgym2K2-8fQ1t-TCYabmB5-Y5LVzY-kCPTYDmoPf1ryiAHod9ffOivij-FJ8ydKftKfE2_VA845jLqAsNoLNeBZUM-9gln5N1uhAYcLjDxMDsWlKXV-YxZ-neJqnooEROvWe7g_iAezkh6TiskXi4gr1QqtsRIx8ETPXwjffOoOZEumlj4zX-dly5l2sSsQ61QpS0JHd2TMs6OsyjrLL8ICquP7e_BeTIHEGU3KRFEdT5rzBHhuwa5yGECc9ioINVEGFkbC5hZGZpbHRlci5uZXQKL2Rucy1xdWVyeQ")
print(t)
# DoT
t = parse("sdns://AwcAAAAAAAAABzEuMS4xLjEAD29uZS5vbmUub25lLm9uZQ")
print(t)
# DoQ
t = parse("sdns://BAcAAAAAAAAABzEuMS4xLjEAD29uZS5vbmUub25lLm9uZQ")
print(t)
# oDoH
t = parse("sdns://BQcAAAAAAAAADWpwLnRpYXJhcC5vcmcFL29kb2g")
print(t)
# DNSCrypt relay
t = parse("sdns://gQ04Ni4xMDYuNzQuMjE5")
print(t)
# oDoH relay
t = parse("sdns://hQcAAAAAAAAADDg5LjM4LjEzMS4zOAAYb2RvaC1ubC5hbGVrYmVyZy5uZXQ6NDQzBi9wcm94eQ")
print(t)
# Plain DNS
t = parse("sdns://AAUAAAAAAAAABzEuMS4xLjE")
print(t)